Skip to main content

JDBC Sink Node

The jdbcsink node writes records to any JDBC-compatible relational database table. It supports INSERT and UPSERT (insert-or-update) write modes and processes records in configurable batches within a single transaction.

Supported databases include PostgreSQL, MySQL, MariaDB, Microsoft SQL Server, and Oracle.

Key Features

  • INSERT mode: appends every record as a new row
  • UPSERT mode: inserts new rows or updates existing ones on conflict (PostgreSQL ON CONFLICT ... DO UPDATE)
  • Transactional batching: all rows in a batch are committed atomically — a failure rolls back the entire batch
  • Schema support: optionally qualify the target table with a schema name

Configuration

FieldTypeRequiredDefaultDescription
jdbcUrlStringYesJDBC connection URL, e.g. jdbc:postgresql://host:5432/db
credentialIdStringNoID of a username/password credential in jobContext.otherProperties
tableNameStringYesTarget table name
schemaNameStringNoSchema name (e.g. public). Omit to use the database default
writeModeStringNoINSERTWrite mode: INSERT or UPSERT
upsertKeyColumnsStringNoComma-separated column names used as the conflict key for UPSERT
batchSizeintNo1000Number of rows per batch / transaction
driverClassNameStringNoExplicit JDBC driver class to load (auto-detected from URL if omitted)

Write Modes

INSERT

Every incoming record is inserted as a new row. Duplicate key violations will cause the batch to fail and roll back.

config:
jdbcUrl: "jdbc:postgresql://localhost:5432/mydb"
credentialId: "pg-cred"
tableName: "events_processed"
writeMode: "INSERT"

UPSERT

On primary key or unique constraint conflict, existing rows are updated with the new values. Columns not present in the incoming record are left unchanged.

config:
jdbcUrl: "jdbc:postgresql://localhost:5432/mydb"
credentialId: "pg-cred"
tableName: "devices"
writeMode: "UPSERT"
upsertKeyColumns: "device_id"

For composite keys, list all key columns comma-separated:

upsertKeyColumns: "tenant_id,device_id"
PostgreSQL note

The UPSERT syntax uses INSERT ... ON CONFLICT (...) DO UPDATE SET .... Ensure the conflict columns have a UNIQUE constraint or are the primary key.

DAG Example

jobContext:
otherProperties:
pg-cred:
username: myuser
password: mypassword
metricTags: {}
dlqConfig:

dag:
- id: "source"
commandName: "jdbcsource"
config:
jdbcUrl: "jdbc:postgresql://localhost:5432/source_db"
credentialId: "pg-cred"
query: "SELECT device_id, tenant_id, status, updated_at FROM devices ORDER BY updated_at"
outputs:
- "sink"

- id: "sink"
commandName: "jdbcsink"
config:
jdbcUrl: "jdbc:postgresql://localhost:5432/dest_db"
credentialId: "pg-cred"
tableName: "devices"
writeMode: "UPSERT"
upsertKeyColumns: "tenant_id,device_id"

Column Mapping

The sink writes columns in the order they appear in each incoming record. Column names in the record must match the target table's column names exactly (case-sensitive in quoted identifiers). All identifier names are automatically double-quoted to handle reserved words and mixed-case names.