Databricks Sink Node
The databricksSink node in ZephFlow enables you to ingest processed data into Databricks Unity Catalog tables using the Databricks SQL Compute engine.
Overview
The Databricks sink writes pipeline output to a Databricks Unity Catalog table using a "Stage and Load" approach: data is converted to Parquet, uploaded to a Databricks Volume, and then loaded into the target table via a COPY INTO SQL command on a SQL Warehouse. This ensures ACID compliance and governance within the Databricks ecosystem.
Key Features
- Unity Catalog Integration: Write data to governed Databricks tables
- SQL Warehouse Execution: Leverages Databricks SQL Compute for transactional loads
- Stage and Load: Parquet staging via Databricks Volumes for reliable ingestion
- Schema Validation: Validates Avro schema against the target table at startup
- Configurable Batching: Control batch size for throughput tuning
Parameters
| Parameter | Type | Description | Required |
|---|---|---|---|
tableName | String | Fully qualified table name (e.g., catalog.schema.table) | Yes |
warehouseId | String | Databricks SQL Warehouse ID to execute the COPY INTO command | Yes |
avroSchema | Map<String, Object> | Avro schema as a map defining the record structure | Yes |
Config Object
The full config object for the Databricks sink node (DatabricksSinkDto.Config):
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
tableName | String | Yes | — | Fully qualified table name (e.g., catalog.schema.table) |
warehouseId | String | Yes | — | Databricks SQL Warehouse ID for executing COPY INTO commands |
avroSchema | Map<String, Object> | Yes | — | Avro schema defining the record structure |
databricksCredentialId | String | Yes | — | Credential ID for Databricks authentication |
volumePath | String | Yes | — | Databricks Volume path for staging Parquet files (e.g., /Volumes/<catalog>/<schema>/<volume>/[path]) |
batchSize | int | No | 10000 | Number of records accumulated before triggering a Stage and Load |
flushIntervalMillis | long | No | 30000 | Maximum time in milliseconds between flushes |
cleanupAfterCopy | boolean | No | true | Remove temporary staged files after successful COPY INTO |
copyOptions | Map<String, String> | No | {} | Additional options for the COPY INTO SQL command |
formatOptions | Map<String, String> | No | {} | Format options for the COPY INTO SQL command |
When to Use
- Choose Databricks Sink if you need Unity Catalog governance, ACID compliance, and immediate query access for analysts.
- Choose Delta Lake Sink if you need a raw data landing zone, want to minimize compute costs, or write to storage not coupled to a Databricks workspace.
Java SDK Usage
Basic Usage
DatabricksSinkDto.Config databricksConfig = DatabricksSinkDto.Config.builder()
.tableName("prod.finance.revenue_reports")
.warehouseId("my-sql-warehouse-id")
.databricksCredentialId("my-databricks-credential")
.volumePath("/Volumes/prod/finance/staging/parquet")
.avroSchema(Map.of(
"type", "record",
"name", "RevenueReport",
"fields", List.of(
Map.of("name", "report_id", "type", "string"),
Map.of("name", "amount", "type", "double"),
Map.of("name", "region", "type", "string")
)
))
.build();
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.appendNode("databrickssink", databricksConfig);
With Filtering and Transformation
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka:9092", "raw-events", "consumer-group", EncodingType.JSON_OBJECT, null)
.filter("$.status == \"completed\"")
.eval("dict(order_id=$.id, total=$.amount * 100, region=upper($.region))")
.appendNode("databrickssink", databricksConfig);
Related Nodes
- deltalakeSink: Write data to Delta Lake tables on cloud storage
- s3Sink: Write data to S3 in JSON or CSV formats
- kafkaSink: Write data to Kafka topics