Skip to main content

Databricks Sink Node

The databricksSink node in ZephFlow enables you to ingest processed data into Databricks Unity Catalog tables using the Databricks SQL Compute engine.

Overview

The Databricks sink writes pipeline output to a Databricks Unity Catalog table using a "Stage and Load" approach: data is converted to Parquet, uploaded to a Databricks Volume, and then loaded into the target table via a COPY INTO SQL command on a SQL Warehouse. This ensures ACID compliance and governance within the Databricks ecosystem.

Key Features

  • Unity Catalog Integration: Write data to governed Databricks tables
  • SQL Warehouse Execution: Leverages Databricks SQL Compute for transactional loads
  • Stage and Load: Parquet staging via Databricks Volumes for reliable ingestion
  • Schema Validation: Validates Avro schema against the target table at startup
  • Configurable Batching: Control batch size for throughput tuning

Parameters

ParameterTypeDescriptionRequired
tableNameStringFully qualified table name (e.g., catalog.schema.table)Yes
warehouseIdStringDatabricks SQL Warehouse ID to execute the COPY INTO commandYes
avroSchemaMap<String, Object>Avro schema as a map defining the record structureYes

Config Object

The full config object for the Databricks sink node (DatabricksSinkDto.Config):

FieldTypeRequiredDefaultDescription
tableNameStringYesFully qualified table name (e.g., catalog.schema.table)
warehouseIdStringYesDatabricks SQL Warehouse ID for executing COPY INTO commands
avroSchemaMap<String, Object>YesAvro schema defining the record structure
databricksCredentialIdStringYesCredential ID for Databricks authentication
volumePathStringYesDatabricks Volume path for staging Parquet files (e.g., /Volumes/<catalog>/<schema>/<volume>/[path])
batchSizeintNo10000Number of records accumulated before triggering a Stage and Load
flushIntervalMillislongNo30000Maximum time in milliseconds between flushes
cleanupAfterCopybooleanNotrueRemove temporary staged files after successful COPY INTO
copyOptionsMap<String, String>No{}Additional options for the COPY INTO SQL command
formatOptionsMap<String, String>No{}Format options for the COPY INTO SQL command

When to Use

  • Choose Databricks Sink if you need Unity Catalog governance, ACID compliance, and immediate query access for analysts.
  • Choose Delta Lake Sink if you need a raw data landing zone, want to minimize compute costs, or write to storage not coupled to a Databricks workspace.

Java SDK Usage

Basic Usage

DatabricksSinkDto.Config databricksConfig = DatabricksSinkDto.Config.builder()
.tableName("prod.finance.revenue_reports")
.warehouseId("my-sql-warehouse-id")
.databricksCredentialId("my-databricks-credential")
.volumePath("/Volumes/prod/finance/staging/parquet")
.avroSchema(Map.of(
"type", "record",
"name", "RevenueReport",
"fields", List.of(
Map.of("name", "report_id", "type", "string"),
Map.of("name", "amount", "type", "double"),
Map.of("name", "region", "type", "string")
)
))
.build();

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.appendNode("databrickssink", databricksConfig);

With Filtering and Transformation

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka:9092", "raw-events", "consumer-group", EncodingType.JSON_OBJECT, null)
.filter("$.status == \"completed\"")
.eval("dict(order_id=$.id, total=$.amount * 100, region=upper($.region))")
.appendNode("databrickssink", databricksConfig);
  • deltalakeSink: Write data to Delta Lake tables on cloud storage
  • s3Sink: Write data to S3 in JSON or CSV formats
  • kafkaSink: Write data to Kafka topics