Skip to main content

Supported Encodings

Introduction

ZephFlow processes data from various sources and destinations, often requiring conversion between byte streams and structured data formats. The EncodingType enum defines how ZephFlow interprets incoming data and formats outgoing data when interacting with external systems.

Purpose of Encoding Types

When ZephFlow interacts with external storage and messaging systems (like Kafka, S3, or Kinesis), data is typically transmitted as bytes. The EncodingType configuration tells ZephFlow:

  1. How to decode incoming byte streams into structured events that can be processed by the pipeline
  2. How to encode outgoing events back into the appropriate byte format for the destination system

Proper encoding configuration ensures data integrity throughout your data processing pipeline.

Supported Encoding Types

ZephFlow supports the following encoding types:

Encoding TypeFile ExtensionDescription
CSVcsvTreats each row as a separate event. Appropriate for comma-separated values data.
JSON_OBJECTjsonTreats the entire JSON object as a single event. Used when each message contains a complete JSON object.
JSON_ARRAYjsonTreats each element in a JSON array as a separate event. Useful for processing arrays of objects in batch.
JSON_OBJECT_LINEjsonlTreats each line as a separate JSON object event (newline-delimited JSON). Ideal for streaming JSON records.
STRING_LINEtxtTreats each line as a separate string event. Suitable for plain text or log files.

Usage Examples

Kafka Source and Sink

When configuring Kafka source and sink nodes, the EncodingType parameter tells ZephFlow how to interpret messages:

// Read JSON objects from a Kafka topic
ZephFlow fromKafka = ZephFlow.startFlow()
.kafkaSource(
"kafka:9092", // broker address
"input-topic", // topic name
"my-consumer-group", // consumer group ID
EncodingType.JSON_OBJECT, // treat each message as a JSON object
null // additional properties
);

// Write processed events as JSON objects to another Kafka topic
fromKafka.kafkaSink(
"kafka:9092", // broker address
"output-topic", // topic name
null, // partition key expression (null for default partitioning)
EncodingType.JSON_OBJECT, // encode events as JSON objects
null // additional properties
);

S3 Sink Example

When writing data to S3, the encoding type determines the format of the stored files:

// Process events and store them as CSV files in S3
ZephFlow.startFlow()
.kafkaSource(
"kafka:9092", // broker address
"input-topic", // topic name
"my-consumer-group", // consumer group ID
EncodingType.JSON_OBJECT, // treat each message as a JSON object
null // additional properties
)
.filter("$.status == 'success'") // only process successful events
.s3Sink(
"us-west-2", // AWS region
"my-bucket", // S3 bucket name
"data/output/", // S3 key prefix
EncodingType.CSV, // store data in CSV format
null // S3 endpoint override (null for default AWS endpoint)
);

StdIn/StdOut Example

For command-line applications, you can use encoding types with standard input and output:

// Read string lines from standard input
ZephFlow flow = ZephFlow.startFlow()
.stdinSource(EncodingType.STRING_LINE)
.eval("dict(text=$.text, processed_at=to_str($.timestamp))")
.stdoutSink(EncodingType.JSON_OBJECT_LINE); // Output as newline-delimited JSON

Event Processing Behavior

Each encoding type processes events differently:

  • CSV: Each row becomes a separate event with columns mapped to fields
  • JSON_OBJECT: The entire JSON message is treated as a single event
  • JSON_ARRAY: Each element in the array becomes a separate event in the pipeline
  • JSON_OBJECT_LINE: Each line is parsed as a separate JSON object event
  • STRING_LINE: Each line is treated as a simple string event

Best Practices

  1. Match the encoding type to your data format: Always ensure the encoding type you configure matches the actual data format you're reading from or writing to.

  2. Pay attention to serialization/deserialization:

    • When using Kafka, the serializers/deserializers should be aligned with your ZephFlow encoding type.
    • For example, with EncodingType.JSON_OBJECT, use byte array serializers in your Kafka producers, as shown in the example:
    Properties producerProps = new Properties();
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());