Skip to main content

Kafka Source Node

The kafkaSource node in ZephFlow enables you to consume data from Kafka topics, providing a seamless integration point for processing streaming data from Kafka in your data pipelines.

Overview

The Kafka source node connects to a Kafka broker, subscribes to specified topics, and emits messages as events into your ZephFlow pipeline. This allows you to build data processing pipelines that consume and transform data from Kafka in real-time.

Key Features

  • Kafka Integration: Connect to Kafka brokers to consume messages from topics
  • Consumer Groups: Leverage Kafka's consumer group functionality for scalable processing
  • Flexible Encoding: Support for multiple event encoding formats
  • Customizable Configuration: Fine-tune Kafka consumer settings with custom properties

Basic Usage

The kafkaSource method creates a node that consumes messages from a Kafka topic:

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
"broker-address:9092", // Kafka broker address
"topic-name", // Topic to consume from
"consumer-group-id", // Consumer group ID
EncodingType.JSON_OBJECT, // Encoding type of messages
null // Additional Kafka properties (optional)
);

Parameters

ParameterTypeDescription
brokerStringKafka broker address with port (e.g., "localhost:9092")
topicStringName of the Kafka topic to consume from
groupIdStringConsumer group ID for tracking consumption progress
encodingTypeEncodingTypeFormat of the messages (e.g., JSON_OBJECT, STRING, etc.)
propertiesMap<String, String>Additional Kafka consumer properties (optional)
info

More details about encoding type support can be found here.

Common Configurations

Basic JSON Message Consumption

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
"kafka-broker:9092",
"events-topic",
"zephflow-consumer-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(processed_timestamp=ts_str_to_epoch($.timestamp, \"yyyy-MM-dd'T'HH:mm:ssX\")))");

With Custom Kafka Properties

Map<String, String> kafkaProps = new HashMap<>();
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
kafkaProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
"kafka-broker:9092",
"logs-topic",
"log-processor-group",
EncodingType.STRING,
kafkaProps
)
.parse(/* parser configuration */);
info

Do not set key.deserializer and value.deserializer in the consumer properties because KafkaSource node will always read raw bytes from kafka and rely on EncodingType configuration for deserialization.

CSV Message Processing

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
"kafka-broker:9092",
"metrics-topic",
"metrics-processor-group",
EncodingType.CSV,
null
)
.eval("dict(timestamp=$.0, metric_name=$.1, value=parse_float($.2))");

Example Use Cases

Real-time Log Processing

// Process logs from Kafka and extract structured data
ZephFlow logProcessor = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"application-logs",
"log-processor-group",
EncodingType.STRING,
null
)
.parse(ParserConfigs.ParserConfig.builder()
.targetField("$")
.extractionConfig(new GrokExtractionConfig("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"))
.build())
.eval("dict_merge($, dict(is_error=$.level == \"ERROR\"))");

Enriching Event Data

// Consume events and enrich with additional computed fields
ZephFlow eventEnricher = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"user-events",
"event-enricher-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(" +
"event_day=epoch_to_ts_str(ts_str_to_epoch($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\"), \"yyyy-MM-dd\")," +
"event_hour=epoch_to_ts_str(ts_str_to_epoch($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\"), \"HH\")," +
"user_region=case(str_contains($.user_id, \"EU\") => \"Europe\", " +
"str_contains($.user_id, \"US\") => \"North America\", " +
"_ => \"Other\")" +
"))");

Filtering and Routing Messages

// Filter messages based on condition and route to different sinks
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"transaction-events",
"transaction-processor-group",
EncodingType.JSON_OBJECT,
null
);

// Filter high-value transactions
ZephFlow highValueTransactions = flow
.filter("$.amount > 1000")
.kafkaSink(
kafkaBroker,
"high-value-transactions",
null,
EncodingType.JSON_OBJECT,
null
);

// Process regular transactions
ZephFlow regularTransactions = flow
.filter("$.amount <= 1000")
.kafkaSink(
kafkaBroker,
"regular-transactions",
null,
EncodingType.JSON_OBJECT,
null
);

Best Practices

Consumer Group Strategy

  • Use meaningful consumer group IDs: Choose descriptive names that identify the application and purpose
  • Dedicated consumer groups: Use separate consumer groups for different processing pipelines
  • Consider partitioning: Ensure the number of pipeline instances matches or is a divisor of the number of partitions for balanced consumption
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, inputTopic, groupId, EncodingType.JSON_OBJECT, null)
.assertion("$.id != null and $.timestamp != null")
.eval("dict_merge($, dict(processed=true))");

Performance Tuning

  • Batch size: Adjust MAX_POLL_RECORDS_CONFIG based on message size and processing requirements
  • Poll interval: Configure FETCH_MAX_WAIT_MS_CONFIG and FETCH_MIN_BYTES_CONFIG for optimized throughput
  • Consumer count: Scale horizontally by running multiple instances of your pipeline
Map<String, String> performanceProps = new HashMap<>();
performanceProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
performanceProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
performanceProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "65536");

ZephFlow highPerformanceFlow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"high-volume-topic",
"high-performance-group",
EncodingType.JSON_OBJECT,
performanceProps
);

Security Configuration

For secure Kafka clusters, provide the necessary security configurations:

Map<String, String> secureProps = new HashMap<>();
secureProps.put("security.protocol","SASL_SSL");
secureProps.put("sasl.mechanism","PLAIN");
secureProps.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required "+
"username=\"your-username\" "+
"password=\"your-password\";");

ZephFlow secureFlow = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"secure-topic",
"secure-consumer-group",
EncodingType.JSON_OBJECT,
secureProps
);

Common Issues and Solutions

Consumer Lag

If your pipeline is falling behind (high consumer lag):

  • Increase the number of pipeline instances
  • Optimize downstream processing to reduce latency
  • Consider increasing fetch sizes and batch processing capacity

Offset Management

  • Use AUTO_OFFSET_RESET_CONFIG with caution:
    • earliest: Process all available messages (good for data completeness)
    • latest: Skip to most recent messages (good for real-time monitoring)

Integration with Other Nodes

The Kafka source node integrates well with other ZephFlow nodes:

// Complete pipeline example
ZephFlow pipeline = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "processor-group", EncodingType.JSON_OBJECT, null)
.filter("$.status != null")
.eval("dict_merge($, dict(processed_at=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")))")
.assertion("$.processed_at != null")
.kafkaSink(kafkaBroker, "output-topic", null, EncodingType.JSON_OBJECT, null);

// Execute the pipeline
pipeline.execute("data-processor-job","production","log-processor");
  • kafkaSink: Send processed data back to Kafka