Kafka Source Node
The kafkaSource
node in ZephFlow enables you to consume data from Kafka topics, providing a seamless integration point
for processing streaming data from Kafka in your data pipelines.
Overview
The Kafka source node connects to a Kafka broker, subscribes to specified topics, and emits messages as events into your ZephFlow pipeline. This allows you to build data processing pipelines that consume and transform data from Kafka in real-time.
Key Features
- Kafka Integration: Connect to Kafka brokers to consume messages from topics
- Consumer Groups: Leverage Kafka's consumer group functionality for scalable processing
- Flexible Encoding: Support for multiple event encoding formats
- Customizable Configuration: Fine-tune Kafka consumer settings with custom properties
Basic Usage
The kafkaSource
method creates a node that consumes messages from a Kafka topic:
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
"broker-address:9092", // Kafka broker address
"topic-name", // Topic to consume from
"consumer-group-id", // Consumer group ID
EncodingType.JSON_OBJECT, // Encoding type of messages
null // Additional Kafka properties (optional)
);
Parameters
Parameter | Type | Description |
---|---|---|
broker | String | Kafka broker address with port (e.g., "localhost:9092") |
topic | String | Name of the Kafka topic to consume from |
groupId | String | Consumer group ID for tracking consumption progress |
encodingType | EncodingType | Format of the messages (e.g., JSON_OBJECT, STRING, etc.) |
properties | Map<String, String> | Additional Kafka consumer properties (optional) |
More details about encoding type support can be found here.
Common Configurations
Basic JSON Message Consumption
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
"kafka-broker:9092",
"events-topic",
"zephflow-consumer-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(processed_timestamp=ts_str_to_epoch($.timestamp, \"yyyy-MM-dd'T'HH:mm:ssX\")))");
With Custom Kafka Properties
Map<String, String> kafkaProps = new HashMap<>();
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
kafkaProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
"kafka-broker:9092",
"logs-topic",
"log-processor-group",
EncodingType.STRING,
kafkaProps
)
.parse(/* parser configuration */);
Do not set key.deserializer
and value.deserializer
in the consumer properties because KafkaSource node will always
read raw bytes from kafka and rely on EncodingType configuration for deserialization.
CSV Message Processing
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
"kafka-broker:9092",
"metrics-topic",
"metrics-processor-group",
EncodingType.CSV,
null
)
.eval("dict(timestamp=$.0, metric_name=$.1, value=parse_float($.2))");
Example Use Cases
Real-time Log Processing
// Process logs from Kafka and extract structured data
ZephFlow logProcessor = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"application-logs",
"log-processor-group",
EncodingType.STRING,
null
)
.parse(ParserConfigs.ParserConfig.builder()
.targetField("$")
.extractionConfig(new GrokExtractionConfig("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"))
.build())
.eval("dict_merge($, dict(is_error=$.level == \"ERROR\"))");
Enriching Event Data
// Consume events and enrich with additional computed fields
ZephFlow eventEnricher = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"user-events",
"event-enricher-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(" +
"event_day=epoch_to_ts_str(ts_str_to_epoch($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\"), \"yyyy-MM-dd\")," +
"event_hour=epoch_to_ts_str(ts_str_to_epoch($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\"), \"HH\")," +
"user_region=case(str_contains($.user_id, \"EU\") => \"Europe\", " +
"str_contains($.user_id, \"US\") => \"North America\", " +
"_ => \"Other\")" +
"))");
Filtering and Routing Messages
// Filter messages based on condition and route to different sinks
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"transaction-events",
"transaction-processor-group",
EncodingType.JSON_OBJECT,
null
);
// Filter high-value transactions
ZephFlow highValueTransactions = flow
.filter("$.amount > 1000")
.kafkaSink(
kafkaBroker,
"high-value-transactions",
null,
EncodingType.JSON_OBJECT,
null
);
// Process regular transactions
ZephFlow regularTransactions = flow
.filter("$.amount <= 1000")
.kafkaSink(
kafkaBroker,
"regular-transactions",
null,
EncodingType.JSON_OBJECT,
null
);
Best Practices
Consumer Group Strategy
- Use meaningful consumer group IDs: Choose descriptive names that identify the application and purpose
- Dedicated consumer groups: Use separate consumer groups for different processing pipelines
- Consider partitioning: Ensure the number of pipeline instances matches or is a divisor of the number of partitions for balanced consumption
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, inputTopic, groupId, EncodingType.JSON_OBJECT, null)
.assertion("$.id != null and $.timestamp != null")
.eval("dict_merge($, dict(processed=true))");
Performance Tuning
- Batch size: Adjust
MAX_POLL_RECORDS_CONFIG
based on message size and processing requirements - Poll interval: Configure
FETCH_MAX_WAIT_MS_CONFIG
andFETCH_MIN_BYTES_CONFIG
for optimized throughput - Consumer count: Scale horizontally by running multiple instances of your pipeline
Map<String, String> performanceProps = new HashMap<>();
performanceProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
performanceProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
performanceProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "65536");
ZephFlow highPerformanceFlow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"high-volume-topic",
"high-performance-group",
EncodingType.JSON_OBJECT,
performanceProps
);
Security Configuration
For secure Kafka clusters, provide the necessary security configurations:
Map<String, String> secureProps = new HashMap<>();
secureProps.put("security.protocol","SASL_SSL");
secureProps.put("sasl.mechanism","PLAIN");
secureProps.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required "+
"username=\"your-username\" "+
"password=\"your-password\";");
ZephFlow secureFlow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"secure-topic",
"secure-consumer-group",
EncodingType.JSON_OBJECT,
secureProps
);
Common Issues and Solutions
Consumer Lag
If your pipeline is falling behind (high consumer lag):
- Increase the number of pipeline instances
- Optimize downstream processing to reduce latency
- Consider increasing fetch sizes and batch processing capacity
Offset Management
- Use
AUTO_OFFSET_RESET_CONFIG
with caution:earliest
: Process all available messages (good for data completeness)latest
: Skip to most recent messages (good for real-time monitoring)
Integration with Other Nodes
The Kafka source node integrates well with other ZephFlow nodes:
// Complete pipeline example
ZephFlow pipeline = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "processor-group", EncodingType.JSON_OBJECT, null)
.filter("$.status != null")
.eval("dict_merge($, dict(processed_at=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")))")
.assertion("$.processed_at != null")
.kafkaSink(kafkaBroker, "output-topic", null, EncodingType.JSON_OBJECT, null);
// Execute the pipeline
pipeline.execute("data-processor-job","production","log-processor");
Related Nodes
- kafkaSink: Send processed data back to Kafka