Skip to main content

Delta Lake Sink Node

The deltalakeSink node in ZephFlow enables you to write processed data directly to a Delta Lake table on cloud storage or HDFS.

warning

When using S3 as the cloud storage backend, Delta Lake does not support distributed writing. Deploying a data pipeline job with multiple replicas writing data in Delta Lake format to S3 will likely cause data corruption due to the lack of a distributed lock implementation. For S3 deployments, ensure your pipeline runs with a single replica.

Overview

The Delta Lake sink writes pipeline output to an existing Delta Lake table. Data is validated against the table's schema and written in transactional batches. The table must already exist at the specified path with a defined schema.

Key Features

  • Delta Lake Integration: Write data directly to Delta Lake tables on S3, GCS, Azure Blob, or HDFS
  • Transactional Writes: ACID-compliant batch commits
  • Built-in Batching: Automatically accumulates records and flushes at configurable batch size or time interval
  • Partitioning: Support for partition columns for optimized storage and reads
  • Schema Validation: Incoming data is validated against the existing Delta table schema
  • Auto Checkpoint: Optional automatic checkpointing for Delta log management

Parameters

ParameterTypeDescriptionRequired
tablePathStringFull URI to the Delta table (e.g., s3a://bucket/path)Yes
avroSchemaSchemaAvro Schema object defining the record structureYes
partitionColumnsList<String>Columns used to partition the Delta tableNo
hadoopConfigurationMap<String, String>Custom Hadoop properties for storage accessNo
credentialUsernamePasswordCredentialCredential for storage authentication (null for default)No

Config Object

The full config object for the Delta Lake sink node (DeltaLakeSinkDto.Config):

FieldTypeRequiredDefaultDescription
tablePathStringYesFull URI to the Delta table (e.g., s3a://bucket/path)
avroSchemaMap<String, Object>YesAvro schema defining the record structure
partitionColumnsList<String>NonullColumns used to partition the Delta table
batchSizeintNo50000Number of records accumulated before committing
hadoopConfigurationMap<String, String>No{}Custom Hadoop properties for storage access
credentialIdStringNonullCredential ID for storage authentication
enableAutoCheckpointbooleanNotrueAutomatically checkpoint the Delta log
flushIntervalSecondsintNo30Maximum time in seconds between commits regardless of batch size

Supported Storage Backends

Storage ProviderPath SchemeCredential Type
AWS S3s3:// or s3a://Username/Password (Access Key / Secret Key)
Google Cloudgs://GCP Credential
Azure Blobabfs:// or abfss://API Key (Storage Account Key)
HDFShdfs://None (use Hadoop config)

Java SDK Usage

The deltalakeSink method has three overloads, from simplest to most configurable.

Simple (2 parameters)

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.deltalakeSink(
"s3a://my-bucket/data/events_delta", // Delta table path
avroSchema // Avro Schema object
);

With Partitions (3 parameters)

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.deltalakeSink(
"s3a://my-bucket/data/events_delta", // Delta table path
avroSchema, // Avro Schema object
List.of("event_date") // Partition columns
);

Full (5 parameters)

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.deltalakeSink(
"s3a://my-bucket/data/events_delta", // Delta table path
avroSchema, // Avro Schema object
List.of("event_date"), // Partition columns
Map.of( // Hadoop configuration
"fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE",
"fs.s3a.secret.key", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
),
credential // Credential (or null)
);

Writing to S3 with Credentials

Schema avroSchema = new Schema.Parser().parse("""
{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "event_date", "type": "string"}
]
}
""");

UsernamePasswordCredential credential = new UsernamePasswordCredential(
"AKIAIOSFODNN7EXAMPLE",
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
);

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka:9092", "events", "consumer-group", EncodingType.JSON_OBJECT, null)
.eval("dict_merge($, dict(event_date=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd\")))")
.deltalakeSink(
"s3a://data-lake/events_delta",
avroSchema,
List.of("event_date"),
null,
credential
);

Writing to GCS

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka:9092", "events", "consumer-group", EncodingType.JSON_OBJECT, null)
.deltalakeSink(
"gs://data-lake-bucket/events_delta",
avroSchema,
null,
null,
gcpCredential
);
  • s3Sink: Write data to S3 in JSON or CSV formats
  • kafkaSink: Write data to Kafka topics