Skip to main content

Kafka Sink Node

The kafkaSink node in ZephFlow enables you to publish processed data to Kafka topics, providing a seamless integration point for sending data from your pipeline to Kafka streams.

Overview

The Kafka sink node connects to a Kafka broker and publishes messages to specified topics. This allows you to build data processing pipelines that transform data and then send the results to Kafka for downstream consumption.

Key Features

  • Kafka Integration: Connect to Kafka brokers to publish messages to topics
  • Flexible Encoding: Support for multiple event encoding formats
  • Partition Key Configuration: Optionally specify a field to use as the partition key
  • Customizable Configuration: Fine-tune Kafka producer settings with custom properties

Parameters

ParameterTypeDescription
brokerStringKafka broker address with port, comma-separated if multiple (e.g., "kafka-1:9092,kafka-2:9092")
topicStringName of the Kafka topic to publish to
partitionKeyFieldExpressionStrStringExpression to determine partition key (null for default partitioning)
encodingTypeEncodingTypeFormat for serializing messages. Supported types: CSV, JSON_OBJECT, JSON_ARRAY, JSON_OBJECT_LINE. Each record is published as an individual Kafka message.
propertiesMap<String, String>Additional Kafka producer properties (optional)
info

More details about encoding type support can be found here.

Config Object

The full config object for the Kafka sink node (KafkaSinkDto.Config):

FieldTypeRequiredDefaultDescription
brokerStringYesKafka broker address with port, comma-separated if multiple
topicStringYesName of the Kafka topic to publish to
partitionKeyFieldExpressionStrStringNonullExpression to determine partition key (null for default partitioning)
encodingTypeStringYesFormat for serializing messages. Supported values: CSV, JSON_OBJECT, JSON_ARRAY, JSON_OBJECT_LINE
propertiesMap<String, String>NonullAdditional Kafka producer properties

Default Producer Settings

When no custom properties are provided, the Kafka sink node applies the following defaults optimized for high-throughput production:

PropertyDefault ValueDescription
batch.size65536 (64KB)Maximum batch size in bytes
linger.ms10Time to wait for additional messages before sending a batch
buffer.memory67108864 (64MB)Total memory available to the producer for buffering
compression.typelz4Compression algorithm for message batches
acks1Number of acknowledgments required (leader only)
retries3Number of retry attempts on failure
max.in.flight.requests.per.connection5Maximum unacknowledged requests per connection

Custom properties passed via the properties parameter will override these defaults.

Delivery Model

The Kafka sink uses an asynchronous fire-and-forget delivery model. Messages are sent to the Kafka producer asynchronously, and delivery results (success or failure) are tracked via callbacks. This means:

  • Sending a message does not block the pipeline
  • Delivery errors are tracked asynchronously and reported through counters
  • For stronger delivery guarantees, set acks=all and increase retries via custom properties

Best Practices

Producer Configuration Strategy

  • Durability vs. Performance: Balance between durability and throughput based on your use case

    • For high durability: set acks=all, increase retries, and enable idempotence
    • For high throughput: increase batch.size, linger.ms, and use efficient compression
  • Partition Key Selection: Choose partition keys that provide even distribution across partitions

Performance Tuning

  • Batch Size: Increase batch size for higher throughput at the cost of slightly higher latency (default: 64KB)
  • Compression: Compression is enabled by default (lz4). Switch to snappy or zstd if preferred
  • Linger Time: Increase linger time to allow more batching for higher throughput (default: 10ms)

Common Issues and Solutions

Message Serialization Errors

  • Ensure your events match the specified encoding type
  • When using EncodingType.JSON_OBJECT, make sure your events are valid JSON objects
  • For other encoding types, ensure your data structure is compatible

Network and Connection Issues

  • Verify broker addresses and network connectivity
  • Add appropriate timeouts and retry configurations via custom properties

Java SDK Usage

Basic Usage

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker-address:9092", "input-topic", "consumer-group-id", EncodingType.JSON_OBJECT, null)
.eval("dict_merge($, dict(processed=true, timestamp=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")))")
.kafkaSink(
"broker-address:9092", // Kafka broker address
"output-topic", // Topic to publish to
null, // Partition key expression (optional)
EncodingType.JSON_OBJECT, // Encoding type for messages
null // Additional Kafka properties (optional)
);

With Custom Partition Key

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka-broker:9092", "user-events", "consumer-group", EncodingType.JSON_OBJECT, null)
.kafkaSink(
"kafka-broker:9092",
"partitioned-events",
"$.user_id", // Use user_id field as partition key
EncodingType.JSON_OBJECT,
null
);

With Custom Kafka Properties

Map<String, String> kafkaProps = new HashMap<>();
kafkaProps.put(ProducerConfig.ACKS_CONFIG, "all");
kafkaProps.put(ProducerConfig.RETRIES_CONFIG, "3");
kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, "5");

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka-broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.kafkaSink(
"kafka-broker:9092",
"output-topic",
null,
EncodingType.JSON_OBJECT,
kafkaProps
);
info

Do not set key.serializer and value.serializer in the producer properties because KafkaSink node will always use byte array serializers and rely on EncodingType configuration for serialization.

Event Transformation and Publishing

ZephFlow eventProcessor = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"raw-events",
"event-processor-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(" +
"event_id=$.id," +
"event_type=$.type," +
"normalized_timestamp=ts_str_to_epoch($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")," +
"processed=true" +
"))")
.kafkaSink(
kafkaBroker,
"processed-events",
"$.event_id", // Partition by event_id
EncodingType.JSON_OBJECT,
null
);

Message Routing Based on Content

ZephFlow sourceFlow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"incoming-messages",
"router-group",
EncodingType.JSON_OBJECT,
null
);

ZephFlow errorFlow = sourceFlow
.filter("$.status == \"error\"")
.kafkaSink(
kafkaBroker,
"error-events",
null,
EncodingType.JSON_OBJECT,
null
);

ZephFlow successFlow = sourceFlow
.filter("$.status == \"success\"")
.kafkaSink(
kafkaBroker,
"success-events",
null,
EncodingType.JSON_OBJECT,
null
);

Enriching and Aggregating Data

ZephFlow enrichmentFlow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"user-clicks",
"click-enricher-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(" +
"session_length=case($.end_time != null and $.start_time != null => " +
"ts_str_to_epoch($.end_time, \"yyyy-MM-dd'T'HH:mm:ss\") - " +
"ts_str_to_epoch($.start_time, \"yyyy-MM-dd'T'HH:mm:ss\"), " +
"_ => 0)," +
"click_count=size_of($.clicks)," +
"has_purchase=$.conversion == true" +
"))")
.kafkaSink(
kafkaBroker,
"enriched-sessions",
"$.session_id",
EncodingType.JSON_OBJECT,
null
);

High Durability Configuration

Map<String, String> durabilityProps = new HashMap<>();
durabilityProps.put(ProducerConfig.ACKS_CONFIG, "all");
durabilityProps.put(ProducerConfig.RETRIES_CONFIG, "5");
durabilityProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.kafkaSink(kafkaBroker, "output-topic", null, EncodingType.JSON_OBJECT, durabilityProps);

High Throughput Configuration

Map<String, String> performanceProps = new HashMap<>();
performanceProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "131072");
performanceProps.put(ProducerConfig.LINGER_MS_CONFIG, "20");
performanceProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
performanceProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "134217728");

ZephFlow highPerformanceFlow = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.kafkaSink(
kafkaBroker,
"high-volume-topic",
null,
EncodingType.JSON_OBJECT,
performanceProps
);

Security Configuration

For secure Kafka clusters, provide the necessary security configurations:

Map<String, String> secureProps = new HashMap<>();
secureProps.put("security.protocol", "SASL_SSL");
secureProps.put("sasl.mechanism", "PLAIN");
secureProps.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"your-username\" " +
"password=\"your-password\";");

ZephFlow secureFlow = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "consumer-group", EncodingType.JSON_OBJECT, secureProps)
.kafkaSink(
kafkaBroker,
"secure-topic",
null,
EncodingType.JSON_OBJECT,
secureProps
);

Retry Configuration

Map<String, String> reliabilityProps = new HashMap<>();
reliabilityProps.put(ProducerConfig.RETRIES_CONFIG, "5");
reliabilityProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "500");
reliabilityProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");

Complete Pipeline

ZephFlow pipeline = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "processor-group", EncodingType.JSON_OBJECT, null)
.filter("$.status != null")
.eval("dict_merge($, dict(processed_at=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")))")
.assertion("$.processed_at != null")
.kafkaSink(kafkaBroker, "output-topic", null, EncodingType.JSON_OBJECT, null);

pipeline.execute("data-processor-job", "production", "event-processor");