Kafka Sink Node
The kafkaSink node in ZephFlow enables you to publish processed data to Kafka topics, providing a seamless integration
point for sending data from your pipeline to Kafka streams.
Overview
The Kafka sink node connects to a Kafka broker and publishes messages to specified topics. This allows you to build data processing pipelines that transform data and then send the results to Kafka for downstream consumption.
Key Features
- Kafka Integration: Connect to Kafka brokers to publish messages to topics
- Flexible Encoding: Support for multiple event encoding formats
- Partition Key Configuration: Optionally specify a field to use as the partition key
- Customizable Configuration: Fine-tune Kafka producer settings with custom properties
Parameters
| Parameter | Type | Description |
|---|---|---|
broker | String | Kafka broker address with port, comma-separated if multiple (e.g., "kafka-1:9092,kafka-2:9092") |
topic | String | Name of the Kafka topic to publish to |
partitionKeyFieldExpressionStr | String | Expression to determine partition key (null for default partitioning) |
encodingType | EncodingType | Format for serializing messages. Supported types: CSV, JSON_OBJECT, JSON_ARRAY, JSON_OBJECT_LINE. Each record is published as an individual Kafka message. |
properties | Map<String, String> | Additional Kafka producer properties (optional) |
More details about encoding type support can be found here.
Config Object
The full config object for the Kafka sink node (KafkaSinkDto.Config):
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
broker | String | Yes | — | Kafka broker address with port, comma-separated if multiple |
topic | String | Yes | — | Name of the Kafka topic to publish to |
partitionKeyFieldExpressionStr | String | No | null | Expression to determine partition key (null for default partitioning) |
encodingType | String | Yes | — | Format for serializing messages. Supported values: CSV, JSON_OBJECT, JSON_ARRAY, JSON_OBJECT_LINE |
properties | Map<String, String> | No | null | Additional Kafka producer properties |
Default Producer Settings
When no custom properties are provided, the Kafka sink node applies the following defaults optimized for high-throughput production:
| Property | Default Value | Description |
|---|---|---|
batch.size | 65536 (64KB) | Maximum batch size in bytes |
linger.ms | 10 | Time to wait for additional messages before sending a batch |
buffer.memory | 67108864 (64MB) | Total memory available to the producer for buffering |
compression.type | lz4 | Compression algorithm for message batches |
acks | 1 | Number of acknowledgments required (leader only) |
retries | 3 | Number of retry attempts on failure |
max.in.flight.requests.per.connection | 5 | Maximum unacknowledged requests per connection |
Custom properties passed via the properties parameter will override these defaults.
Delivery Model
The Kafka sink uses an asynchronous fire-and-forget delivery model. Messages are sent to the Kafka producer asynchronously, and delivery results (success or failure) are tracked via callbacks. This means:
- Sending a message does not block the pipeline
- Delivery errors are tracked asynchronously and reported through counters
- For stronger delivery guarantees, set
acks=alland increaseretriesvia custom properties
Best Practices
Producer Configuration Strategy
-
Durability vs. Performance: Balance between durability and throughput based on your use case
- For high durability: set
acks=all, increaseretries, and enable idempotence - For high throughput: increase
batch.size,linger.ms, and use efficient compression
- For high durability: set
-
Partition Key Selection: Choose partition keys that provide even distribution across partitions
Performance Tuning
- Batch Size: Increase batch size for higher throughput at the cost of slightly higher latency (default: 64KB)
- Compression: Compression is enabled by default (lz4). Switch to snappy or zstd if preferred
- Linger Time: Increase linger time to allow more batching for higher throughput (default: 10ms)
Common Issues and Solutions
Message Serialization Errors
- Ensure your events match the specified encoding type
- When using
EncodingType.JSON_OBJECT, make sure your events are valid JSON objects - For other encoding types, ensure your data structure is compatible
Network and Connection Issues
- Verify broker addresses and network connectivity
- Add appropriate timeouts and retry configurations via custom properties
Java SDK Usage
Basic Usage
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker-address:9092", "input-topic", "consumer-group-id", EncodingType.JSON_OBJECT, null)
.eval("dict_merge($, dict(processed=true, timestamp=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")))")
.kafkaSink(
"broker-address:9092", // Kafka broker address
"output-topic", // Topic to publish to
null, // Partition key expression (optional)
EncodingType.JSON_OBJECT, // Encoding type for messages
null // Additional Kafka properties (optional)
);
With Custom Partition Key
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka-broker:9092", "user-events", "consumer-group", EncodingType.JSON_OBJECT, null)
.kafkaSink(
"kafka-broker:9092",
"partitioned-events",
"$.user_id", // Use user_id field as partition key
EncodingType.JSON_OBJECT,
null
);
With Custom Kafka Properties
Map<String, String> kafkaProps = new HashMap<>();
kafkaProps.put(ProducerConfig.ACKS_CONFIG, "all");
kafkaProps.put(ProducerConfig.RETRIES_CONFIG, "3");
kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, "5");
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka-broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.kafkaSink(
"kafka-broker:9092",
"output-topic",
null,
EncodingType.JSON_OBJECT,
kafkaProps
);
Do not set key.serializer and value.serializer in the producer properties because KafkaSink node will always use
byte array serializers and rely on EncodingType configuration for serialization.
Event Transformation and Publishing
ZephFlow eventProcessor = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"raw-events",
"event-processor-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(" +
"event_id=$.id," +
"event_type=$.type," +
"normalized_timestamp=ts_str_to_epoch($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")," +
"processed=true" +
"))")
.kafkaSink(
kafkaBroker,
"processed-events",
"$.event_id", // Partition by event_id
EncodingType.JSON_OBJECT,
null
);
Message Routing Based on Content
ZephFlow sourceFlow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"incoming-messages",
"router-group",
EncodingType.JSON_OBJECT,
null
);
ZephFlow errorFlow = sourceFlow
.filter("$.status == \"error\"")
.kafkaSink(
kafkaBroker,
"error-events",
null,
EncodingType.JSON_OBJECT,
null
);
ZephFlow successFlow = sourceFlow
.filter("$.status == \"success\"")
.kafkaSink(
kafkaBroker,
"success-events",
null,
EncodingType.JSON_OBJECT,
null
);
Enriching and Aggregating Data
ZephFlow enrichmentFlow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"user-clicks",
"click-enricher-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(" +
"session_length=case($.end_time != null and $.start_time != null => " +
"ts_str_to_epoch($.end_time, \"yyyy-MM-dd'T'HH:mm:ss\") - " +
"ts_str_to_epoch($.start_time, \"yyyy-MM-dd'T'HH:mm:ss\"), " +
"_ => 0)," +
"click_count=size_of($.clicks)," +
"has_purchase=$.conversion == true" +
"))")
.kafkaSink(
kafkaBroker,
"enriched-sessions",
"$.session_id",
EncodingType.JSON_OBJECT,
null
);
High Durability Configuration
Map<String, String> durabilityProps = new HashMap<>();
durabilityProps.put(ProducerConfig.ACKS_CONFIG, "all");
durabilityProps.put(ProducerConfig.RETRIES_CONFIG, "5");
durabilityProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.kafkaSink(kafkaBroker, "output-topic", null, EncodingType.JSON_OBJECT, durabilityProps);
High Throughput Configuration
Map<String, String> performanceProps = new HashMap<>();
performanceProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "131072");
performanceProps.put(ProducerConfig.LINGER_MS_CONFIG, "20");
performanceProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
performanceProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "134217728");
ZephFlow highPerformanceFlow = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.kafkaSink(
kafkaBroker,
"high-volume-topic",
null,
EncodingType.JSON_OBJECT,
performanceProps
);
Security Configuration
For secure Kafka clusters, provide the necessary security configurations:
Map<String, String> secureProps = new HashMap<>();
secureProps.put("security.protocol", "SASL_SSL");
secureProps.put("sasl.mechanism", "PLAIN");
secureProps.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"your-username\" " +
"password=\"your-password\";");
ZephFlow secureFlow = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "consumer-group", EncodingType.JSON_OBJECT, secureProps)
.kafkaSink(
kafkaBroker,
"secure-topic",
null,
EncodingType.JSON_OBJECT,
secureProps
);
Retry Configuration
Map<String, String> reliabilityProps = new HashMap<>();
reliabilityProps.put(ProducerConfig.RETRIES_CONFIG, "5");
reliabilityProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "500");
reliabilityProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
Complete Pipeline
ZephFlow pipeline = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "processor-group", EncodingType.JSON_OBJECT, null)
.filter("$.status != null")
.eval("dict_merge($, dict(processed_at=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")))")
.assertion("$.processed_at != null")
.kafkaSink(kafkaBroker, "output-topic", null, EncodingType.JSON_OBJECT, null);
pipeline.execute("data-processor-job", "production", "event-processor");
Related Nodes
- Kafka Source: Read data from Kafka topics