Skip to main content

Delta Lake Sink Node

The deltalakeSink node in ZephFlow enables you to write processed data directly to a Delta Lake table on cloud storage or HDFS.

warning

When using S3 as the cloud storage backend, Delta Lake does not support distributed writing. Deploying a data pipeline job with multiple replicas writing data in Delta Lake format to S3 will likely cause data corruption due to the lack of a distributed lock implementation. For S3 deployments, ensure your pipeline runs with a single replica.

Overview

The Delta Lake sink writes pipeline output to an existing Delta Lake table. Data is validated against the table's schema and written in transactional batches. The table must already exist at the specified path with a defined schema.

Key Features

  • Delta Lake Integration: Write data directly to Delta Lake tables on S3, GCS, Azure Blob, or HDFS
  • Transactional Writes: ACID-compliant batch commits
  • Built-in Batching: Automatically accumulates records and flushes at configurable batch size or time interval
  • Partitioning: Support for partition columns for optimized storage and reads
  • Schema Validation: Incoming data is validated against the existing Delta table schema
  • Auto Checkpoint: Optional automatic checkpointing for Delta log management

Basic Usage

The deltalakeSink method has three overloads, from simplest to most configurable:

Simple (2 parameters)

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.deltalakeSink(
"s3a://my-bucket/data/events_delta", // Delta table path
avroSchema // Avro Schema object
);

With Partitions (3 parameters)

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.deltalakeSink(
"s3a://my-bucket/data/events_delta", // Delta table path
avroSchema, // Avro Schema object
List.of("event_date") // Partition columns
);

Full (5 parameters)

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.deltalakeSink(
"s3a://my-bucket/data/events_delta", // Delta table path
avroSchema, // Avro Schema object
List.of("event_date"), // Partition columns
Map.of( // Hadoop configuration
"fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE",
"fs.s3a.secret.key", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
),
credential // Credential (or null)
);

Parameters

ParameterTypeDescriptionRequiredOverload
tablePathStringFull URI to the Delta table (e.g., s3a://bucket/path)YesAll
avroSchemaSchemaAvro Schema object defining the record structureYesAll
partitionColumnsList<String>Columns used to partition the Delta tableNo3-param, Full
hadoopConfigurationMap<String, String>Custom Hadoop properties for storage accessNoFull only
credentialUsernamePasswordCredentialCredential for storage authentication (null for default)NoFull only

Batching Configuration

SettingDefaultDescription
Batch size50,000 recordsNumber of records accumulated before committing
Flush interval30 secondsMaximum time between commits regardless of batch size
Enable auto checkpointtrueAutomatically checkpoint the Delta log
Cleanup after copytrueRemove temporary files after successful commit

Supported Storage Backends

Storage ProviderPath SchemeCredential Type
AWS S3s3:// or s3a://Username/Password (Access Key / Secret Key)
Google Cloudgs://GCP Credential
Azure Blobabfs://API Key (Storage Account Key)
HDFShdfs://None (use Hadoop config)

Examples

Writing to S3

Schema avroSchema = new Schema.Parser().parse("""
{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "event_date", "type": "string"}
]
}
""");

UsernamePasswordCredential credential = new UsernamePasswordCredential(
"AKIAIOSFODNN7EXAMPLE",
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
);

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka:9092", "events", "consumer-group", EncodingType.JSON_OBJECT, null)
.eval("dict_merge($, dict(event_date=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd\")))")
.deltalakeSink(
"s3a://data-lake/events_delta",
avroSchema,
List.of("event_date"),
null,
credential
);

Writing to S3 (Simple)

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka:9092", "events", "consumer-group", EncodingType.JSON_OBJECT, null)
.deltalakeSink("s3a://data-lake/events_delta", avroSchema);

Writing to GCS

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka:9092", "events", "consumer-group", EncodingType.JSON_OBJECT, null)
.deltalakeSink(
"gs://data-lake-bucket/events_delta",
avroSchema,
null,
null,
gcpCredential
);
  • s3Sink: Write data to S3 in JSON or CSV formats
  • kafkaSink: Write data to Kafka topics