Skip to main content

Kafka Sink Node

The kafkaSink node in ZephFlow enables you to publish processed data to Kafka topics, providing a seamless integration point for sending data from your pipeline to Kafka streams.

Overview

The Kafka sink node connects to a Kafka broker and publishes messages to specified topics. This allows you to build data processing pipelines that transform data and then send the results to Kafka for downstream consumption.

Key Features

  • Kafka Integration: Connect to Kafka brokers to publish messages to topics
  • Flexible Encoding: Support for multiple event encoding formats
  • Partition Key Configuration: Optionally specify a field to use as the partition key
  • Customizable Configuration: Fine-tune Kafka producer settings with custom properties

Basic Usage

The kafkaSink method creates a node that publishes messages to a Kafka topic:

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker-address:9092", "input-topic", "consumer-group-id", EncodingType.JSON_OBJECT, null)
.eval("dict_merge($, dict(processed=true, timestamp=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")))")
.kafkaSink(
"broker-address:9092", // Kafka broker address
"output-topic", // Topic to publish to
null, // Partition key expression (optional)
EncodingType.JSON_OBJECT, // Encoding type for messages
null // Additional Kafka properties (optional)
);

Parameters

ParameterTypeDescription
brokerStringKafka broker address with port (e.g., "localhost:9092")
topicStringName of the Kafka topic to publish to
partitionKeyFieldExpressionStrStringExpression to determine partition key (null for default partitioning)
encodingTypeEncodingTypeFormat for serializing messages (e.g., JSON_OBJECT, STRING, etc.)
propertiesMap<String, String>Additional Kafka producer properties (optional)
info

More details about encoding type support can be found here.

Default Producer Settings

When no custom properties are provided, the Kafka sink node applies the following defaults optimized for high-throughput production:

PropertyDefault ValueDescription
batch.size65536 (64KB)Maximum batch size in bytes
linger.ms10Time to wait for additional messages before sending a batch
buffer.memory67108864 (64MB)Total memory available to the producer for buffering
compression.typelz4Compression algorithm for message batches
acks1Number of acknowledgments required (leader only)
retries3Number of retry attempts on failure
max.in.flight.requests.per.connection5Maximum unacknowledged requests per connection

Custom properties passed via the properties parameter will override these defaults.

Delivery Model

The Kafka sink uses an asynchronous fire-and-forget delivery model. Messages are sent to the Kafka producer asynchronously, and delivery results (success or failure) are tracked via callbacks. This means:

  • Sending a message does not block the pipeline
  • Delivery errors are tracked asynchronously and reported through counters
  • For stronger delivery guarantees, set acks=all and increase retries via custom properties

Common Configurations

Basic JSON Message Publishing

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka-broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.eval("dict_merge($, dict(processed_timestamp=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")))")
.kafkaSink(
"kafka-broker:9092",
"output-topic",
null,
EncodingType.JSON_OBJECT,
null
);

With Custom Partition Key

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka-broker:9092", "user-events", "consumer-group", EncodingType.JSON_OBJECT, null)
.kafkaSink(
"kafka-broker:9092",
"partitioned-events",
"$.user_id", // Use user_id field as partition key
EncodingType.JSON_OBJECT,
null
);

With Custom Kafka Properties

Map<String, String> kafkaProps = new HashMap<>();
kafkaProps.put(ProducerConfig.ACKS_CONFIG, "all");
kafkaProps.put(ProducerConfig.RETRIES_CONFIG, "3");
kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, "5");

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka-broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.kafkaSink(
"kafka-broker:9092",
"output-topic",
null,
EncodingType.JSON_OBJECT,
kafkaProps
);
info

Do not set key.serializer and value.serializer in the producer properties because KafkaSink node will always use byte array serializers and rely on EncodingType configuration for serialization.

Example Use Cases

Event Transformation and Publishing

// Process and transform events before publishing to output topic
ZephFlow eventProcessor = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"raw-events",
"event-processor-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(" +
"event_id=$.id," +
"event_type=$.type," +
"normalized_timestamp=ts_str_to_epoch($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")," +
"processed=true" +
"))")
.kafkaSink(
kafkaBroker,
"processed-events",
"$.event_id", // Partition by event_id
EncodingType.JSON_OBJECT,
null
);

Message Routing Based on Content

// Source flow
ZephFlow sourceFlow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"incoming-messages",
"router-group",
EncodingType.JSON_OBJECT,
null
);

// Route error messages to error topic
ZephFlow errorFlow = sourceFlow
.filter("$.status == \"error\"")
.kafkaSink(
kafkaBroker,
"error-events",
null,
EncodingType.JSON_OBJECT,
null
);

// Route success messages to success topic
ZephFlow successFlow = sourceFlow
.filter("$.status == \"success\"")
.kafkaSink(
kafkaBroker,
"success-events",
null,
EncodingType.JSON_OBJECT,
null
);

Enriching and Aggregating Data

// Enrich events with additional calculated fields
ZephFlow enrichmentFlow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"user-clicks",
"click-enricher-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(" +
"session_length=case($.end_time != null and $.start_time != null => " +
"ts_str_to_epoch($.end_time, \"yyyy-MM-dd'T'HH:mm:ss\") - " +
"ts_str_to_epoch($.start_time, \"yyyy-MM-dd'T'HH:mm:ss\"), " +
"_ => 0)," +
"click_count=size_of($.clicks)," +
"has_purchase=$.conversion == true" +
"))")
.kafkaSink(
kafkaBroker,
"enriched-sessions",
"$.session_id",
EncodingType.JSON_OBJECT,
null
);

Best Practices

Producer Configuration Strategy

  • Durability vs. Performance: Balance between durability and throughput based on your use case

    // High durability configuration (defaults: acks=1, retries=3)
    Map<String, String> durabilityProps = new HashMap<>();
    durabilityProps.put(ProducerConfig.ACKS_CONFIG, "all");
    durabilityProps.put(ProducerConfig.RETRIES_CONFIG, "5");
    durabilityProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

    // High throughput configuration (defaults already include batch.size=65536, linger.ms=10, compression=lz4)
    Map<String, String> throughputProps = new HashMap<>();
    throughputProps.put(ProducerConfig.ACKS_CONFIG, "1");
    throughputProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "131072");
    throughputProps.put(ProducerConfig.LINGER_MS_CONFIG, "20");
    throughputProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
  • Partition Key Selection: Choose partition keys that provide even distribution

    // Good partition key for user-related events
    flow.kafkaSink(kafkaBroker, "user-events", "$.user_id", EncodingType.JSON_OBJECT, null);

    // Good partition key for time-series data (hour of day)
    flow.kafkaSink(kafkaBroker, "metrics", "epoch_to_ts_str($.timestamp, \"HH\")", EncodingType.JSON_OBJECT, null);

Performance Tuning

  • Batch Size: Increase batch size for higher throughput at the cost of slightly higher latency (default: 64KB)
  • Compression: Compression is enabled by default (lz4). Switch to snappy or zstd if preferred
  • Linger Time: Increase linger time to allow more batching for higher throughput (default: 10ms)
Map<String, String> performanceProps = new HashMap<>();
performanceProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "131072");
performanceProps.put(ProducerConfig.LINGER_MS_CONFIG, "20");
performanceProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
performanceProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "134217728");

ZephFlow highPerformanceFlow = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.kafkaSink(
kafkaBroker,
"high-volume-topic",
null,
EncodingType.JSON_OBJECT,
performanceProps
);

Security Configuration

For secure Kafka clusters, provide the necessary security configurations:

Map<String, String> secureProps = new HashMap<>();
secureProps.put("security.protocol", "SASL_SSL");
secureProps.put("sasl.mechanism", "PLAIN");
secureProps.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"your-username\" " +
"password=\"your-password\";");

ZephFlow secureFlow = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "consumer-group", EncodingType.JSON_OBJECT, secureProps)
.kafkaSink(
kafkaBroker,
"secure-topic",
null,
EncodingType.JSON_OBJECT,
secureProps
);

Common Issues and Solutions

Message Serialization Errors

  • Ensure your events match the specified encoding type
  • When using EncodingType.JSON_OBJECT, make sure your events are valid JSON objects
  • For other encoding types, ensure your data structure is compatible

Network and Connection Issues

  • Verify broker addresses and network connectivity
  • Add appropriate timeouts and retry configurations:
    Map<String, String> reliabilityProps = new HashMap<>();
    reliabilityProps.put(ProducerConfig.RETRIES_CONFIG, "5");
    reliabilityProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "500");
    reliabilityProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");

Integration with Other Nodes

The Kafka sink node integrates well with other ZephFlow nodes:

// Complete pipeline example
ZephFlow pipeline = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "processor-group", EncodingType.JSON_OBJECT, null)
.filter("$.status != null")
.eval("dict_merge($, dict(processed_at=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")))")
.assertion("$.processed_at != null")
.kafkaSink(kafkaBroker, "output-topic", null, EncodingType.JSON_OBJECT, null);

// Execute the pipeline
pipeline.execute("data-processor-job", "production", "event-processor");