Kafka Sink Node
The kafkaSink
node in ZephFlow enables you to publish processed data to Kafka topics, providing a seamless integration
point for sending data from your pipeline to Kafka streams.
Overview
The Kafka sink node connects to a Kafka broker and publishes messages to specified topics. This allows you to build data processing pipelines that transform data and then send the results to Kafka for downstream consumption.
Key Features
- Kafka Integration: Connect to Kafka brokers to publish messages to topics
- Flexible Encoding: Support for multiple event encoding formats
- Partition Key Configuration: Optionally specify a field to use as the partition key
- Customizable Configuration: Fine-tune Kafka producer settings with custom properties
Basic Usage
The kafkaSink
method creates a node that publishes messages to a Kafka topic:
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker-address:9092", "input-topic", "consumer-group-id", EncodingType.JSON_OBJECT, null)
.eval("dict_merge($, dict(processed=true, timestamp=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")))")
.kafkaSink(
"broker-address:9092", // Kafka broker address
"output-topic", // Topic to publish to
null, // Partition key expression (optional)
EncodingType.JSON_OBJECT, // Encoding type for messages
null // Additional Kafka properties (optional)
);
Parameters
Parameter | Type | Description |
---|---|---|
broker | String | Kafka broker address with port (e.g., "localhost:9092") |
topic | String | Name of the Kafka topic to publish to |
partitionKeyFieldExpressionStr | String | Expression to determine partition key (null for default partitioning) |
encodingType | EncodingType | Format for serializing messages (e.g., JSON_OBJECT, STRING, etc.) |
properties | Map<String, String> | Additional Kafka producer properties (optional) |
More details about encoding type support can be found here.
Common Configurations
Basic JSON Message Publishing
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka-broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.eval("dict_merge($, dict(processed_timestamp=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")))")
.kafkaSink(
"kafka-broker:9092",
"output-topic",
null,
EncodingType.JSON_OBJECT,
null
);
With Custom Partition Key
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka-broker:9092", "user-events", "consumer-group", EncodingType.JSON_OBJECT, null)
.kafkaSink(
"kafka-broker:9092",
"partitioned-events",
"$.user_id", // Use user_id field as partition key
EncodingType.JSON_OBJECT,
null
);
With Custom Kafka Properties
Map<String, String> kafkaProps = new HashMap<>();
kafkaProps.put(ProducerConfig.ACKS_CONFIG, "all");
kafkaProps.put(ProducerConfig.RETRIES_CONFIG, "3");
kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, "5");
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka-broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.kafkaSink(
"kafka-broker:9092",
"output-topic",
null,
EncodingType.JSON_OBJECT,
kafkaProps
);
Do not set key.serializer
and value.serializer
in the producer properties because KafkaSink node will always use
byte array serializers and rely on EncodingType configuration for serialization.
Example Use Cases
Event Transformation and Publishing
// Process and transform events before publishing to output topic
ZephFlow eventProcessor = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"raw-events",
"event-processor-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(" +
"event_id=$.id," +
"event_type=$.type," +
"normalized_timestamp=ts_str_to_epoch($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")," +
"processed=true" +
"))")
.kafkaSink(
kafkaBroker,
"processed-events",
"$.event_id", // Partition by event_id
EncodingType.JSON_OBJECT,
null
);
Message Routing Based on Content
// Source flow
ZephFlow sourceFlow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"incoming-messages",
"router-group",
EncodingType.JSON_OBJECT,
null
);
// Route error messages to error topic
ZephFlow errorFlow = sourceFlow
.filter("$.status == \"error\"")
.kafkaSink(
kafkaBroker,
"error-events",
null,
EncodingType.JSON_OBJECT,
null
);
// Route success messages to success topic
ZephFlow successFlow = sourceFlow
.filter("$.status == \"success\"")
.kafkaSink(
kafkaBroker,
"success-events",
null,
EncodingType.JSON_OBJECT,
null
);
Enriching and Aggregating Data
// Enrich events with additional calculated fields
ZephFlow enrichmentFlow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"user-clicks",
"click-enricher-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(" +
"session_length=case($.end_time != null and $.start_time != null => " +
"ts_str_to_epoch($.end_time, \"yyyy-MM-dd'T'HH:mm:ss\") - " +
"ts_str_to_epoch($.start_time, \"yyyy-MM-dd'T'HH:mm:ss\"), " +
"_ => 0)," +
"click_count=size_of($.clicks)," +
"has_purchase=$.conversion == true" +
"))")
.kafkaSink(
kafkaBroker,
"enriched-sessions",
"$.session_id",
EncodingType.JSON_OBJECT,
null
);
Best Practices
Producer Configuration Strategy
-
Durability vs. Performance: Balance between durability and throughput based on your use case
// High durability configuration
Map<String, String> durabilityProps = new HashMap<>();
durabilityProps.put(ProducerConfig.ACKS_CONFIG, "all");
durabilityProps.put(ProducerConfig.RETRIES_CONFIG, "5");
durabilityProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
// High throughput configuration
Map<String, String> throughputProps = new HashMap<>();
throughputProps.put(ProducerConfig.ACKS_CONFIG, "1");
throughputProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "65536");
throughputProps.put(ProducerConfig.LINGER_MS_CONFIG, "10");
throughputProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); -
Partition Key Selection: Choose partition keys that provide even distribution
// Good partition key for user-related events
flow.kafkaSink(kafkaBroker, "user-events", "$.user_id", EncodingType.JSON_OBJECT, null);
// Good partition key for time-series data (hour of day)
flow.kafkaSink(kafkaBroker, "metrics", "epoch_to_ts_str($.timestamp, \"HH\")", EncodingType.JSON_OBJECT, null);
Performance Tuning
- Batch Size: Increase batch size for higher throughput at the cost of slightly higher latency
- Compression: Enable compression for large messages or high-volume topics
- Linger Time: Increase linger time to allow more batching for higher throughput
Map<String, String> performanceProps = new HashMap<>();
performanceProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "65536");
performanceProps.put(ProducerConfig.LINGER_MS_CONFIG, "20");
performanceProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
performanceProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
ZephFlow highPerformanceFlow = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.kafkaSink(
kafkaBroker,
"high-volume-topic",
null,
EncodingType.JSON_OBJECT,
performanceProps
);
Security Configuration
For secure Kafka clusters, provide the necessary security configurations:
Map<String, String> secureProps = new HashMap<>();
secureProps.put("security.protocol", "SASL_SSL");
secureProps.put("sasl.mechanism", "PLAIN");
secureProps.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"your-username\" " +
"password=\"your-password\";");
ZephFlow secureFlow = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.kafkaSink(
kafkaBroker,
"secure-topic",
null,
EncodingType.JSON_OBJECT,
secureProps
);
Common Issues and Solutions
Message Serialization Errors
- Ensure your events match the specified encoding type
- When using
EncodingType.JSON_OBJECT
, make sure your events are valid JSON objects - For other encoding types, ensure your data structure is compatible
Network and Connection Issues
- Verify broker addresses and network connectivity
- Add appropriate timeouts and retry configurations:
Map<String, String> reliabilityProps = new HashMap<>();
reliabilityProps.put(ProducerConfig.RETRIES_CONFIG, "5");
reliabilityProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "500");
reliabilityProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
Integration with Other Nodes
The Kafka sink node integrates well with other ZephFlow nodes:
// Complete pipeline example
ZephFlow pipeline = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "processor-group", EncodingType.JSON_OBJECT, null)
.filter("$.status != null")
.eval("dict_merge($, dict(processed_at=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")))")
.assertion("$.processed_at != null")
.kafkaSink(kafkaBroker, "output-topic", null, EncodingType.JSON_OBJECT, null);
// Execute the pipeline
pipeline.execute("data-processor-job", "production", "event-processor");
Related Nodes
- kafkaSource: Read data from Kafka topics