Skip to main content

JDBC Source Node

The jdbcsource node reads data from any JDBC-compatible relational database. It supports both batch mode (one-time full table read) and streaming mode (continuous polling with watermark-based incremental fetching).

Supported databases include PostgreSQL, MySQL, MariaDB, Microsoft SQL Server, and Oracle. The driver is loaded automatically from the JDBC URL prefix.

Key Features

  • Batch mode: executes the query once and terminates when all rows are consumed
  • Streaming mode: polls continuously on a configurable interval, tracking progress via a watermark column
  • Multi-database: PostgreSQL, MySQL, MariaDB, SQL Server, Oracle supported out of the box
  • Credential store: username/password looked up by ID — not stored inline in the DAG

Configuration

FieldTypeRequiredDefaultDescription
jdbcUrlStringYesJDBC connection URL, e.g. jdbc:postgresql://host:5432/db
credentialIdStringNoID of a username/password credential in jobContext.otherProperties
queryStringYesSQL query to execute. Use :watermark placeholder in streaming mode
watermarkColumnStringNoColumn used to track progress. Setting this enables streaming mode
pollIntervalMslongNo60000Milliseconds between polls in streaming mode
fetchSizeintNo1000Maximum rows returned per fetch / JDBC fetch size hint
driverClassNameStringNoExplicit JDBC driver class to load (auto-detected from URL if omitted)

Modes

Batch Mode

When watermarkColumn is not set, the source runs the query once, emits all rows, then terminates. Use this for one-shot ETL pipelines.

config:
jdbcUrl: "jdbc:postgresql://localhost:5432/mydb"
credentialId: "pg-cred"
query: "SELECT id, name, value FROM events ORDER BY id"

Streaming Mode

When watermarkColumn is set, the source polls the database every pollIntervalMs milliseconds. The :watermark placeholder in the query is replaced with the highest value seen in watermarkColumn from the previous fetch. On the first run, all rows are returned.

The query must order results by the watermark column so the highest value advances correctly.

config:
jdbcUrl: "jdbc:postgresql://localhost:5432/mydb"
credentialId: "pg-cred"
query: "SELECT id, name, value FROM events WHERE id > :watermark ORDER BY id"
watermarkColumn: "id"
pollIntervalMs: 5000

For timestamp-based watermarks:

config:
jdbcUrl: "jdbc:postgresql://localhost:5432/mydb"
credentialId: "pg-cred"
query: "SELECT * FROM logs WHERE updated_at > :watermark ORDER BY updated_at"
watermarkColumn: "updated_at"
pollIntervalMs: 10000

DAG Example

jobContext:
otherProperties:
pg-cred:
username: myuser
password: mypassword
metricTags: {}
dlqConfig:

dag:
- id: "source"
commandName: "jdbcsource"
config:
jdbcUrl: "jdbc:postgresql://localhost:5432/mydb"
credentialId: "pg-cred"
query: "SELECT id, name, value FROM events WHERE id > :watermark ORDER BY id"
watermarkColumn: "id"
pollIntervalMs: 5000
outputs:
- "sink"

- id: "sink"
commandName: "stdout"
config:
encodingType: "JSON_OBJECT"

Credentials

Credentials are referenced by credentialId and resolved from jobContext.otherProperties. The credential object must have username and password fields:

jobContext:
otherProperties:
my-db-cred:
username: myuser
password: mypassword

If credentialId is omitted, the connection is made without credentials (useful for databases configured with OS-level auth).

  • jdbcsink: Write records back to a JDBC database
  • kafkasource: Stream-based source for Kafka topics