JDBC Sink Node
The jdbcsink node writes records to any JDBC-compatible relational database table. It supports INSERT and UPSERT (insert-or-update) write modes and processes records in configurable batches within a single transaction.
Supported databases include PostgreSQL, MySQL, MariaDB, Microsoft SQL Server, and Oracle.
Key Features
- INSERT mode: appends every record as a new row
- UPSERT mode: inserts new rows or updates existing ones on conflict (PostgreSQL
ON CONFLICT ... DO UPDATE) - Transactional batching: all rows in a batch are committed atomically — a failure rolls back the entire batch
- Schema support: optionally qualify the target table with a schema name
Configuration
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
jdbcUrl | String | Yes | — | JDBC connection URL, e.g. jdbc:postgresql://host:5432/db |
credentialId | String | No | — | ID of a username/password credential in jobContext.otherProperties |
tableName | String | Yes | — | Target table name |
schemaName | String | No | — | Schema name (e.g. public). Omit to use the database default |
writeMode | String | No | INSERT | Write mode: INSERT or UPSERT |
upsertKeyColumns | String | No | — | Comma-separated column names used as the conflict key for UPSERT |
batchSize | int | No | 1000 | Number of rows per batch / transaction |
driverClassName | String | No | — | Explicit JDBC driver class to load (auto-detected from URL if omitted) |
Write Modes
INSERT
Every incoming record is inserted as a new row. Duplicate key violations will cause the batch to fail and roll back.
config:
jdbcUrl: "jdbc:postgresql://localhost:5432/mydb"
credentialId: "pg-cred"
tableName: "events_processed"
writeMode: "INSERT"
UPSERT
On primary key or unique constraint conflict, existing rows are updated with the new values. Columns not present in the incoming record are left unchanged.
config:
jdbcUrl: "jdbc:postgresql://localhost:5432/mydb"
credentialId: "pg-cred"
tableName: "devices"
writeMode: "UPSERT"
upsertKeyColumns: "device_id"
For composite keys, list all key columns comma-separated:
upsertKeyColumns: "tenant_id,device_id"
The UPSERT syntax uses INSERT ... ON CONFLICT (...) DO UPDATE SET .... Ensure the conflict columns have a UNIQUE constraint or are the primary key.
DAG Example
jobContext:
otherProperties:
pg-cred:
username: myuser
password: mypassword
metricTags: {}
dlqConfig:
dag:
- id: "source"
commandName: "jdbcsource"
config:
jdbcUrl: "jdbc:postgresql://localhost:5432/source_db"
credentialId: "pg-cred"
query: "SELECT device_id, tenant_id, status, updated_at FROM devices ORDER BY updated_at"
outputs:
- "sink"
- id: "sink"
commandName: "jdbcsink"
config:
jdbcUrl: "jdbc:postgresql://localhost:5432/dest_db"
credentialId: "pg-cred"
tableName: "devices"
writeMode: "UPSERT"
upsertKeyColumns: "tenant_id,device_id"
Column Mapping
The sink writes columns in the order they appear in each incoming record. Column names in the record must match the target table's column names exactly (case-sensitive in quoted identifiers). All identifier names are automatically double-quoted to handle reserved words and mixed-case names.
Related Nodes
- jdbcsource: Read records from a JDBC database
- kafkasink: Stream records to Kafka topics