Kafka Source Node
The kafkaSource node in ZephFlow enables you to consume data from Kafka topics, providing a seamless integration point
for processing streaming data from Kafka in your data pipelines.
Overview
The Kafka source node connects to a Kafka broker, subscribes to specified topics, and emits messages as events into your ZephFlow pipeline. This allows you to build data processing pipelines that consume and transform data from Kafka in real-time.
Key Features
- Kafka Integration: Connect to Kafka brokers to consume messages from topics
- Consumer Groups: Leverage Kafka's consumer group functionality for scalable processing
- Flexible Encoding: Support for multiple event encoding formats
- Customizable Configuration: Fine-tune Kafka consumer settings with custom properties
Parameters
| Parameter | Type | Description |
|---|---|---|
broker | String | Kafka broker address with port (e.g., "localhost:9092") |
topic | String | Name of the Kafka topic to consume from |
groupId | String | Consumer group ID for tracking consumption progress |
encodingType | EncodingType | Format of the messages (e.g., JSON_OBJECT, STRING, etc.) |
properties | Map<String, String> | Additional Kafka consumer properties (optional) |
More details about encoding type support can be found here.
Config Object
The full config object for the Kafka source node (KafkaSourceDto.Config):
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
broker | String | Yes | — | Kafka broker address with port (e.g., "localhost:9092") |
topic | String | Yes | — | Name of the Kafka topic to consume from |
groupId | String | Yes | — | Consumer group ID for tracking consumption progress |
encodingType | EncodingType | Yes | — | Format of the messages. Supported values: CSV, JSON_OBJECT, JSON_ARRAY, JSON_OBJECT_LINE, STRING_LINE, TEXT, XML, PARQUET |
properties | Map<String, String> | No | null | Additional Kafka consumer properties |
commitStrategy | String | No | BATCH | Offset commit strategy. Values: PER_RECORD, BATCH, NONE |
commitBatchSize | Integer | No | 1000 | Number of records per commit batch (when commitStrategy is BATCH) |
commitIntervalMs | Long | No | 5000 | Interval in milliseconds between commits (when commitStrategy is BATCH) |
Default Consumer Settings
When no custom properties are provided, the Kafka source node applies the following defaults optimized for high-throughput consumption:
| Property | Default Value | Description |
|---|---|---|
max.poll.records | 5000 | Maximum records returned per poll |
fetch.min.bytes | 1048576 (1MB) | Minimum data to fetch per request |
fetch.max.wait.ms | 1000 | Maximum wait time for fetch.min.bytes to be satisfied |
session.timeout.ms | 10000 | Consumer session timeout |
max.partition.fetch.bytes | 10485760 (10MB) | Maximum data per partition per fetch |
auto.offset.reset | earliest | Start from earliest offset when no committed offset exists |
enable.auto.commit | false | Offsets are committed by ZephFlow, not by Kafka's auto-commit |
Custom properties passed via the properties parameter will override these defaults.
The auto.offset.reset=earliest default means new consumer groups will process all available messages from the beginning of the topic. Set it to latest if you only want to process new messages.
Dead Letter Queue (DLQ)
Deserialization failures (e.g., malformed JSON when using EncodingType.JSON_OBJECT) can be routed to a Dead Letter Queue when DLQ is configured at the job level. Failed events are sent to the DLQ topic instead of halting the pipeline.
Best Practices
Consumer Group Strategy
- Use meaningful consumer group IDs: Choose descriptive names that identify the application and purpose
- Dedicated consumer groups: Use separate consumer groups for different processing pipelines
- Consider partitioning: Ensure the number of pipeline instances matches or is a divisor of the number of partitions for balanced consumption
Performance Tuning
- Batch size: Adjust
max.poll.recordsbased on message size and processing requirements (default: 5000) - Poll interval: Configure
fetch.max.wait.ms(default: 1000ms) andfetch.min.bytes(default: 1MB) for optimized throughput - Consumer count: Scale horizontally by running multiple instances of your pipeline
Common Issues and Solutions
Consumer Lag
If your pipeline is falling behind (high consumer lag):
- Increase the number of pipeline instances
- Optimize downstream processing to reduce latency
- Consider increasing fetch sizes and batch processing capacity
Offset Management
- Use
auto.offset.resetwith caution:earliest: Process all available messages (good for data completeness)latest: Skip to most recent messages (good for real-time monitoring)
Java SDK Usage
Basic Usage
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
"broker-address:9092", // Kafka broker address
"topic-name", // Topic to consume from
"consumer-group-id", // Consumer group ID
EncodingType.JSON_OBJECT, // Encoding type of messages
null // Additional Kafka properties (optional)
);
Basic JSON Message Consumption
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
"kafka-broker:9092",
"events-topic",
"zephflow-consumer-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(processed_timestamp=ts_str_to_epoch($.timestamp, \"yyyy-MM-dd'T'HH:mm:ssX\")))");
With Custom Kafka Properties
Map<String, String> kafkaProps = new HashMap<>();
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
kafkaProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
"kafka-broker:9092",
"logs-topic",
"log-processor-group",
EncodingType.STRING_LINE,
kafkaProps
)
.parse(/* parser configuration */);
Do not set key.deserializer and value.deserializer in the consumer properties because KafkaSource node will always
read raw bytes from kafka and rely on EncodingType configuration for deserialization.
CSV Message Processing
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
"kafka-broker:9092",
"metrics-topic",
"metrics-processor-group",
EncodingType.CSV,
null
)
.eval("dict(timestamp=$.0, metric_name=$.1, value=parse_float($.2))");
Real-time Log Processing
ZephFlow logProcessor = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"application-logs",
"log-processor-group",
EncodingType.STRING_LINE,
null
)
.parse(ParserConfigs.ParserConfig.builder()
.targetField("$")
.extractionConfig(new GrokExtractionConfig("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"))
.build())
.eval("dict_merge($, dict(is_error=$.level == \"ERROR\"))");
Enriching Event Data
ZephFlow eventEnricher = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"user-events",
"event-enricher-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(" +
"event_day=epoch_to_ts_str(ts_str_to_epoch($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\"), \"yyyy-MM-dd\")," +
"event_hour=epoch_to_ts_str(ts_str_to_epoch($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\"), \"HH\")," +
"user_region=case(str_contains($.user_id, \"EU\") => \"Europe\", " +
"str_contains($.user_id, \"US\") => \"North America\", " +
"_ => \"Other\")" +
"))");
Filtering and Routing Messages
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"transaction-events",
"transaction-processor-group",
EncodingType.JSON_OBJECT,
null
);
ZephFlow highValueTransactions = flow
.filter("$.amount > 1000")
.kafkaSink(
kafkaBroker,
"high-value-transactions",
null,
EncodingType.JSON_OBJECT,
null
);
ZephFlow regularTransactions = flow
.filter("$.amount <= 1000")
.kafkaSink(
kafkaBroker,
"regular-transactions",
null,
EncodingType.JSON_OBJECT,
null
);
Performance Tuning
Map<String, String> performanceProps = new HashMap<>();
performanceProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
performanceProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
performanceProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "65536");
ZephFlow highPerformanceFlow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"high-volume-topic",
"high-performance-group",
EncodingType.JSON_OBJECT,
performanceProps
);
Security Configuration
For secure Kafka clusters, provide the necessary security configurations:
Map<String, String> secureProps = new HashMap<>();
secureProps.put("security.protocol","SASL_SSL");
secureProps.put("sasl.mechanism","PLAIN");
secureProps.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required "+
"username=\"your-username\" "+
"password=\"your-password\";");
ZephFlow secureFlow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"secure-topic",
"secure-consumer-group",
EncodingType.JSON_OBJECT,
secureProps
);
Complete Pipeline
ZephFlow pipeline = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "processor-group", EncodingType.JSON_OBJECT, null)
.filter("$.status != null")
.eval("dict_merge($, dict(processed_at=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")))")
.assertion("$.processed_at != null")
.kafkaSink(kafkaBroker, "output-topic", null, EncodingType.JSON_OBJECT, null);
pipeline.execute("data-processor-job","production","log-processor");
Related Nodes
- kafkaSink: Send processed data back to Kafka