Delta Lake Sink Node
The deltalakeSink node in ZephFlow enables you to write processed data directly to a Delta Lake table on cloud storage
or HDFS.
When using S3 as the cloud storage backend, Delta Lake does not support distributed writing. Deploying a data pipeline job with multiple replicas writing data in Delta Lake format to S3 will likely cause data corruption due to the lack of a distributed lock implementation. For S3 deployments, ensure your pipeline runs with a single replica.
Overview
The Delta Lake sink writes pipeline output to an existing Delta Lake table. Data is validated against the table's schema and written in transactional batches. The table must already exist at the specified path with a defined schema.
Key Features
- Delta Lake Integration: Write data directly to Delta Lake tables on S3, GCS, Azure Blob, or HDFS
- Transactional Writes: ACID-compliant batch commits
- Built-in Batching: Automatically accumulates records and flushes at configurable batch size or time interval
- Partitioning: Support for partition columns for optimized storage and reads
- Schema Validation: Incoming data is validated against the existing Delta table schema
- Auto Checkpoint: Optional automatic checkpointing for Delta log management
Basic Usage
The deltalakeSink method has three overloads, from simplest to most configurable:
Simple (2 parameters)
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.deltalakeSink(
"s3a://my-bucket/data/events_delta", // Delta table path
avroSchema // Avro Schema object
);
With Partitions (3 parameters)
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.deltalakeSink(
"s3a://my-bucket/data/events_delta", // Delta table path
avroSchema, // Avro Schema object
List.of("event_date") // Partition columns
);
Full (5 parameters)
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.deltalakeSink(
"s3a://my-bucket/data/events_delta", // Delta table path
avroSchema, // Avro Schema object
List.of("event_date"), // Partition columns
Map.of( // Hadoop configuration
"fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE",
"fs.s3a.secret.key", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
),
credential // Credential (or null)
);
Parameters
| Parameter | Type | Description | Required | Overload |
|---|---|---|---|---|
tablePath | String | Full URI to the Delta table (e.g., s3a://bucket/path) | Yes | All |
avroSchema | Schema | Avro Schema object defining the record structure | Yes | All |
partitionColumns | List<String> | Columns used to partition the Delta table | No | 3-param, Full |
hadoopConfiguration | Map<String, String> | Custom Hadoop properties for storage access | No | Full only |
credential | UsernamePasswordCredential | Credential for storage authentication (null for default) | No | Full only |
Batching Configuration
| Setting | Default | Description |
|---|---|---|
| Batch size | 50,000 records | Number of records accumulated before committing |
| Flush interval | 30 seconds | Maximum time between commits regardless of batch size |
| Enable auto checkpoint | true | Automatically checkpoint the Delta log |
| Cleanup after copy | true | Remove temporary files after successful commit |
Supported Storage Backends
| Storage Provider | Path Scheme | Credential Type |
|---|---|---|
| AWS S3 | s3:// or s3a:// | Username/Password (Access Key / Secret Key) |
| Google Cloud | gs:// | GCP Credential |
| Azure Blob | abfs:// | API Key (Storage Account Key) |
| HDFS | hdfs:// | None (use Hadoop config) |
Examples
Writing to S3
Schema avroSchema = new Schema.Parser().parse("""
{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "event_date", "type": "string"}
]
}
""");
UsernamePasswordCredential credential = new UsernamePasswordCredential(
"AKIAIOSFODNN7EXAMPLE",
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
);
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka:9092", "events", "consumer-group", EncodingType.JSON_OBJECT, null)
.eval("dict_merge($, dict(event_date=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd\")))")
.deltalakeSink(
"s3a://data-lake/events_delta",
avroSchema,
List.of("event_date"),
null,
credential
);
Writing to S3 (Simple)
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka:9092", "events", "consumer-group", EncodingType.JSON_OBJECT, null)
.deltalakeSink("s3a://data-lake/events_delta", avroSchema);
Writing to GCS
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka:9092", "events", "consumer-group", EncodingType.JSON_OBJECT, null)
.deltalakeSink(
"gs://data-lake-bucket/events_delta",
avroSchema,
null,
null,
gcpCredential
);