Supported Encodings
Introduction
ZephFlow processes data from various sources and destinations, often requiring conversion between byte streams and
structured data formats. The EncodingType
enum defines how ZephFlow interprets incoming data and formats outgoing data
when interacting with external systems.
Purpose of Encoding Types
When ZephFlow interacts with external storage and messaging systems (like Kafka, S3, or Kinesis), data is typically
transmitted as bytes. The EncodingType
configuration tells ZephFlow:
- How to decode incoming byte streams into structured events that can be processed by the pipeline
- How to encode outgoing events back into the appropriate byte format for the destination system
Proper encoding configuration ensures data integrity throughout your data processing pipeline.
Supported Encoding Types
ZephFlow supports the following encoding types:
Encoding Type | File Extension | Description |
---|---|---|
CSV | csv | Treats each row as a separate event. Appropriate for comma-separated values data. |
JSON_OBJECT | json | Treats the entire JSON object as a single event. Used when each message contains a complete JSON object. |
JSON_ARRAY | json | Treats each element in a JSON array as a separate event. Useful for processing arrays of objects in batch. |
JSON_OBJECT_LINE | jsonl | Treats each line as a separate JSON object event (newline-delimited JSON). Ideal for streaming JSON records. |
STRING_LINE | txt | Treats each line as a separate string event. Suitable for plain text or log files. |
Usage Examples
Kafka Source and Sink
When configuring Kafka source and sink nodes, the EncodingType
parameter tells ZephFlow how to interpret messages:
// Read JSON objects from a Kafka topic
ZephFlow fromKafka = ZephFlow.startFlow()
.kafkaSource(
"kafka:9092", // broker address
"input-topic", // topic name
"my-consumer-group", // consumer group ID
EncodingType.JSON_OBJECT, // treat each message as a JSON object
null // additional properties
);
// Write processed events as JSON objects to another Kafka topic
fromKafka.kafkaSink(
"kafka:9092", // broker address
"output-topic", // topic name
null, // partition key expression (null for default partitioning)
EncodingType.JSON_OBJECT, // encode events as JSON objects
null // additional properties
);
S3 Sink Example
When writing data to S3, the encoding type determines the format of the stored files:
// Process events and store them as CSV files in S3
ZephFlow.startFlow()
.kafkaSource(
"kafka:9092", // broker address
"input-topic", // topic name
"my-consumer-group", // consumer group ID
EncodingType.JSON_OBJECT, // treat each message as a JSON object
null // additional properties
)
.filter("$.status == 'success'") // only process successful events
.s3Sink(
"us-west-2", // AWS region
"my-bucket", // S3 bucket name
"data/output/", // S3 key prefix
EncodingType.CSV, // store data in CSV format
null // S3 endpoint override (null for default AWS endpoint)
);
StdIn/StdOut Example
For command-line applications, you can use encoding types with standard input and output:
// Read string lines from standard input
ZephFlow flow = ZephFlow.startFlow()
.stdinSource(EncodingType.STRING_LINE)
.eval("dict(text=$.text, processed_at=to_str($.timestamp))")
.stdoutSink(EncodingType.JSON_OBJECT_LINE); // Output as newline-delimited JSON
Event Processing Behavior
Each encoding type processes events differently:
- CSV: Each row becomes a separate event with columns mapped to fields
- JSON_OBJECT: The entire JSON message is treated as a single event
- JSON_ARRAY: Each element in the array becomes a separate event in the pipeline
- JSON_OBJECT_LINE: Each line is parsed as a separate JSON object event
- STRING_LINE: Each line is treated as a simple string event
Best Practices
-
Match the encoding type to your data format: Always ensure the encoding type you configure matches the actual data format you're reading from or writing to.
-
Pay attention to serialization/deserialization:
- When using Kafka, the serializers/deserializers should be aligned with your ZephFlow encoding type.
- For example, with
EncodingType.JSON_OBJECT
, use byte array serializers in your Kafka producers, as shown in the example:
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());