Skip to main content

Kafka Source Node

The kafkaSource node in ZephFlow enables you to consume data from Kafka topics, providing a seamless integration point for processing streaming data from Kafka in your data pipelines.

Overview

The Kafka source node connects to a Kafka broker, subscribes to specified topics, and emits messages as events into your ZephFlow pipeline. This allows you to build data processing pipelines that consume and transform data from Kafka in real-time.

Key Features

  • Kafka Integration: Connect to Kafka brokers to consume messages from topics
  • Consumer Groups: Leverage Kafka's consumer group functionality for scalable processing
  • Flexible Encoding: Support for multiple event encoding formats
  • Customizable Configuration: Fine-tune Kafka consumer settings with custom properties

Parameters

ParameterTypeDescription
brokerStringKafka broker address with port (e.g., "localhost:9092")
topicStringName of the Kafka topic to consume from
groupIdStringConsumer group ID for tracking consumption progress
encodingTypeEncodingTypeFormat of the messages (e.g., JSON_OBJECT, STRING, etc.)
propertiesMap<String, String>Additional Kafka consumer properties (optional)
info

More details about encoding type support can be found here.

Config Object

The full config object for the Kafka source node (KafkaSourceDto.Config):

FieldTypeRequiredDefaultDescription
brokerStringYesKafka broker address with port (e.g., "localhost:9092")
topicStringYesName of the Kafka topic to consume from
groupIdStringYesConsumer group ID for tracking consumption progress
encodingTypeEncodingTypeYesFormat of the messages. Supported values: CSV, JSON_OBJECT, JSON_ARRAY, JSON_OBJECT_LINE, STRING_LINE, TEXT, XML, PARQUET
propertiesMap<String, String>NonullAdditional Kafka consumer properties
commitStrategyStringNoBATCHOffset commit strategy. Values: PER_RECORD, BATCH, NONE
commitBatchSizeIntegerNo1000Number of records per commit batch (when commitStrategy is BATCH)
commitIntervalMsLongNo5000Interval in milliseconds between commits (when commitStrategy is BATCH)

Default Consumer Settings

When no custom properties are provided, the Kafka source node applies the following defaults optimized for high-throughput consumption:

PropertyDefault ValueDescription
max.poll.records5000Maximum records returned per poll
fetch.min.bytes1048576 (1MB)Minimum data to fetch per request
fetch.max.wait.ms1000Maximum wait time for fetch.min.bytes to be satisfied
session.timeout.ms10000Consumer session timeout
max.partition.fetch.bytes10485760 (10MB)Maximum data per partition per fetch
auto.offset.resetearliestStart from earliest offset when no committed offset exists
enable.auto.commitfalseOffsets are committed by ZephFlow, not by Kafka's auto-commit

Custom properties passed via the properties parameter will override these defaults.

tip

The auto.offset.reset=earliest default means new consumer groups will process all available messages from the beginning of the topic. Set it to latest if you only want to process new messages.

Dead Letter Queue (DLQ)

Deserialization failures (e.g., malformed JSON when using EncodingType.JSON_OBJECT) can be routed to a Dead Letter Queue when DLQ is configured at the job level. Failed events are sent to the DLQ topic instead of halting the pipeline.

Best Practices

Consumer Group Strategy

  • Use meaningful consumer group IDs: Choose descriptive names that identify the application and purpose
  • Dedicated consumer groups: Use separate consumer groups for different processing pipelines
  • Consider partitioning: Ensure the number of pipeline instances matches or is a divisor of the number of partitions for balanced consumption

Performance Tuning

  • Batch size: Adjust max.poll.records based on message size and processing requirements (default: 5000)
  • Poll interval: Configure fetch.max.wait.ms (default: 1000ms) and fetch.min.bytes (default: 1MB) for optimized throughput
  • Consumer count: Scale horizontally by running multiple instances of your pipeline

Common Issues and Solutions

Consumer Lag

If your pipeline is falling behind (high consumer lag):

  • Increase the number of pipeline instances
  • Optimize downstream processing to reduce latency
  • Consider increasing fetch sizes and batch processing capacity

Offset Management

  • Use auto.offset.reset with caution:
    • earliest: Process all available messages (good for data completeness)
    • latest: Skip to most recent messages (good for real-time monitoring)

Java SDK Usage

Basic Usage

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
"broker-address:9092", // Kafka broker address
"topic-name", // Topic to consume from
"consumer-group-id", // Consumer group ID
EncodingType.JSON_OBJECT, // Encoding type of messages
null // Additional Kafka properties (optional)
);

Basic JSON Message Consumption

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
"kafka-broker:9092",
"events-topic",
"zephflow-consumer-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(processed_timestamp=ts_str_to_epoch($.timestamp, \"yyyy-MM-dd'T'HH:mm:ssX\")))");

With Custom Kafka Properties

Map<String, String> kafkaProps = new HashMap<>();
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
kafkaProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
"kafka-broker:9092",
"logs-topic",
"log-processor-group",
EncodingType.STRING_LINE,
kafkaProps
)
.parse(/* parser configuration */);
info

Do not set key.deserializer and value.deserializer in the consumer properties because KafkaSource node will always read raw bytes from kafka and rely on EncodingType configuration for deserialization.

CSV Message Processing

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
"kafka-broker:9092",
"metrics-topic",
"metrics-processor-group",
EncodingType.CSV,
null
)
.eval("dict(timestamp=$.0, metric_name=$.1, value=parse_float($.2))");

Real-time Log Processing

ZephFlow logProcessor = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"application-logs",
"log-processor-group",
EncodingType.STRING_LINE,
null
)
.parse(ParserConfigs.ParserConfig.builder()
.targetField("$")
.extractionConfig(new GrokExtractionConfig("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"))
.build())
.eval("dict_merge($, dict(is_error=$.level == \"ERROR\"))");

Enriching Event Data

ZephFlow eventEnricher = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"user-events",
"event-enricher-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(" +
"event_day=epoch_to_ts_str(ts_str_to_epoch($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\"), \"yyyy-MM-dd\")," +
"event_hour=epoch_to_ts_str(ts_str_to_epoch($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\"), \"HH\")," +
"user_region=case(str_contains($.user_id, \"EU\") => \"Europe\", " +
"str_contains($.user_id, \"US\") => \"North America\", " +
"_ => \"Other\")" +
"))");

Filtering and Routing Messages

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"transaction-events",
"transaction-processor-group",
EncodingType.JSON_OBJECT,
null
);

ZephFlow highValueTransactions = flow
.filter("$.amount > 1000")
.kafkaSink(
kafkaBroker,
"high-value-transactions",
null,
EncodingType.JSON_OBJECT,
null
);

ZephFlow regularTransactions = flow
.filter("$.amount <= 1000")
.kafkaSink(
kafkaBroker,
"regular-transactions",
null,
EncodingType.JSON_OBJECT,
null
);

Performance Tuning

Map<String, String> performanceProps = new HashMap<>();
performanceProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
performanceProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
performanceProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "65536");

ZephFlow highPerformanceFlow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"high-volume-topic",
"high-performance-group",
EncodingType.JSON_OBJECT,
performanceProps
);

Security Configuration

For secure Kafka clusters, provide the necessary security configurations:

Map<String, String> secureProps = new HashMap<>();
secureProps.put("security.protocol","SASL_SSL");
secureProps.put("sasl.mechanism","PLAIN");
secureProps.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required "+
"username=\"your-username\" "+
"password=\"your-password\";");

ZephFlow secureFlow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"secure-topic",
"secure-consumer-group",
EncodingType.JSON_OBJECT,
secureProps
);

Complete Pipeline

ZephFlow pipeline = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "processor-group", EncodingType.JSON_OBJECT, null)
.filter("$.status != null")
.eval("dict_merge($, dict(processed_at=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")))")
.assertion("$.processed_at != null")
.kafkaSink(kafkaBroker, "output-topic", null, EncodingType.JSON_OBJECT, null);

pipeline.execute("data-processor-job","production","log-processor");
  • kafkaSink: Send processed data back to Kafka