Quick Start Guide
ZephFlow is a flexible data processing framework that allows you to build and execute powerful data transformation pipelines. This guide will help you get started with ZephFlow and explore its basic functionality.
Introduction
ZephFlow enables you to create data processing workflows using a directed acyclic graph (DAG) structure. You can define data flows that filter, transform, and process data using SQL or the Fleak Eval Expression Language (FEEL), a flexible expression language designed for data manipulation.
With ZephFlow, you can:
- Create data processing pipelines with filtering, transformation, and validation
- Run pipelines in various environments (standalone, as an HTTP service, or embedded)
- Process data in various formats
- Apply complex transformations using a powerful expression language
To understand what is a DAG and how ZephFlow uses DAG to express processing logic, check the DAG structure docs.
Ways to Use ZephFlow
There are three primary ways to use ZephFlow:
- Within an existing JVM program: Integrate ZephFlow directly into your Java application
- Standalone data pipeline process: Run ZephFlow as an independent process using CLI
- HTTP backend: Run ZephFlow as an HTTP service that processes data through API calls
Let's explore each approach.
Using ZephFlow Within a JVM Program
Project Setup
Add your dependency to the project:
- Gradle
- Maven
implementation 'io.fleak.zephflow:sdk:0.2.2'
<dependency>
<groupId>io.fleak.zephflow</groupId>
<artifactId>sdk</artifactId>
<version>0.2.2</version>
</dependency>
Basic Usage
Here's a simple example that demonstrates how to use ZephFlow within a Java application:
// Create a simple flow to filter numbers > 5 and add additional properties
ZephFlow flow = ZephFlow.startFlow();
// Create input flow
ZephFlow inputFlow = flow.stdinSource(EncodingType.JSON_ARRAY);
// Filter and transform
ZephFlow processedFlow =
inputFlow
.filter("$.num > 5")
.eval("dict(original=$.num, doubled=$.num*2, description='High value')");
// Output to stdout
ZephFlow outputFlow = processedFlow.stdoutSink(EncodingType.JSON_OBJECT);
// Execute the flow
outputFlow.execute("test_id","test_env","test_service");
This code creates a simple data processing pipeline that:
- Reads JSON arrays from standard input
- Filters elements where the
num
property is greater than 5 - Transforms each element by adding new properties
- Outputs the results as JSON objects to standard output
The pipeline follows this pattern: Single input source → filter → transform → output
The outputFlow.execute()
method runs the entire data flow within the same JVM thread.
Running ZephFlow as a Standalone Process with CLI Starter
The CLI starter provides a convenient way to run ZephFlow pipelines in a Docker container without embedding them in a Java application. This section guides you through creating and running a simple data processing pipeline using the CLI starter.
Overview
The ZephFlow CLI starter:
- Reads a DAG definition file that defines your data pipeline
- Processes data according to the defined pipeline
- Supports various input/output modes defined in your pipeline definition (stdin/stdout, files, Kafka, etc.)
Installation
Pull the CLI starter docker image:
docker pull fleak/zephflow-clistarter:latest
Quick Start Example
Let's create a simple pipeline that reads JSON data from standard input, filters records, transforms them, and outputs the results to standard output.
Step 1: Create a sample DAG definition file
Create a file named simple_pipeline.yml
with the following content:
jobContext:
otherProperties: { }
metricTags:
env: "default_env"
service: "default_service"
dlqConfig: null
dag:
- id: "stdin_source"
commandName: "stdin"
config: |
{
"encodingType": "JSON_ARRAY"
}
outputs:
- "filter_node"
- id: "filter_node"
commandName: "filter"
config: |
$.value > 10
outputs:
- "transform_node"
- id: "transform_node"
commandName: "eval"
config: |
dict_merge($, dict(
processed=true,
timestamp_iso=epoch_to_ts_str($.timestamp, "yyyy-MM-dd'T'HH:mm:ss"),
doubled_value=$.value * 2
))
outputs:
- "stdout_sink"
- id: "stdout_sink"
commandName: "stdout"
config: |
{
"encodingType": "JSON_OBJECT"
}
outputs: [ ]
This DAG reads a collection of JSON objects from your standard input, keeps only the ones with values greater than 10, and enhances them with useful additions. Each record gets a "processed" flag, a human-readable timestamp converted from Unix time, and a "doubled_value" field that's twice the original. The processed output records are then printed out in your terminal.
Step 2: Run the ZephFlow CLI starter
Run the Docker container, mapping your DAG definition file and allowing stdin/stdout to flow through:
docker run -i \
-v $(pwd)/simple_pipeline.yml:/app/simple_pipeline.yml \
fleak/zephflow-clistarter:latest -f /app/simple_pipeline.yml
The -i
flag is critical as it keeps stdin open, allowing you to send data to the container.
Step 3: Send test data and observe results
Once the container is running, you can send JSON data via stdin. In your terminal, paste the following JSON and press Enter:
[
{
"value": 5,
"timestamp": 1635724800000
},
{
"value": 15,
"timestamp": 1635724800000
},
{
"value": 25,
"timestamp": 1635724800000
}
]
After entering the data, press Ctrl+D to signal the end of input (EOF).
You should see output similar to:
{"doubled_value":30,"processed":true,"timestamp":1635724800000,"timestamp_iso":"2021-11-01T00:00:00","value":15}
{"doubled_value":50,"processed":true,"timestamp":1635724800000,"timestamp_iso":"2021-11-01T00:00:00","value":25}
Notice that only records with value > 10
appear in the output, and they include the additional fields defined in the
transform node.
Generating a DAG Definition with the ZephFlow SDK
If you prefer to build your DAG programmatically, you can use the ZephFlow SDK to generate the YAML definition:
import io.fleak.zephflow.lib.serdes.EncodingType;
import io.fleak.zephflow.runner.dag.AdjacencyListDagDefinition;
import io.fleak.zephflow.sdk.ZephFlow;
public class GenerateDagDefinition {
public static void main(String[] args) {
// Create a flow starting point
ZephFlow flow = ZephFlow.startFlow();
// Define source: read JSON from stdin
ZephFlow inputFlow = flow.stdinSource(EncodingType.JSON_OBJECT);
// Add filter: keep only events with value > 10
ZephFlow filteredFlow = inputFlow.filter("$.value > 10");
// Add transformation
ZephFlow transformedFlow =
filteredFlow.eval(
"dict_merge($, dict("
+ " processed=true,"
+ " timestamp_iso=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\"),"
+ " doubled_value=$.value * 2"
+ "))");
// Define sink: output JSON to stdout
ZephFlow outputFlow = transformedFlow.stdoutSink(EncodingType.JSON_OBJECT);
// Generate and print the DAG definition
AdjacencyListDagDefinition dagDefinition = outputFlow.buildDag();
System.out.println(dagDefinition); // Prints YAML representation
}
}
You can save this output to a file and use it with the CLI starter.
Running ZephFlow as an HTTP Backend
ZephFlow can also run as an HTTP API backend, providing a convenient way to expose data processing pipelines as services:
Installation
Pull the HTTP starter docker image:
docker pull fleak/zephflow-httpstarter:latest
Run the container using docker:
docker run -p 8080:8080 fleak/zephflow-httpstarter
Usage
Once started, you can access the Swagger UI at http://localhost:8080/swagger-ui/index.html
Creating a Workflow
POST to the /api/v1/workflows
endpoint to create a workflow. The request body should contain a JSON array of DAG
nodes, where each node represents a step in your workflow. For example:
{
"dag": [
{
"id": "node1",
"commandName": "filter",
"config": "$.value > 10",
"outputs": [
"node2"
]
},
{
"id": "node2",
"commandName": "eval",
"config": "dict(result=$.value * 2)",
"outputs": []
}
]
}
Each node in the DAG requires:
id
: Unique identifier for the nodecommandName
: The operation to perform (e.g., "filter", "eval")config
: Configuration string for the operation (usually a FEEL expression)outputs
: Array of node IDs that this node connects to
You can also use the ZephFlow
SDK to submit a workflow:
ZephFlow flow = ZephFlow.startFlow();
ZephFlow outputFlow = ... // code to construct the dag
// For HTTP API usage
String hostUrl = "http://localhost:8080";
String workflowId = outputFlow.submitApiEndpoint(hostUrl);
System.out.println("Created workflow with ID: " + workflowId);
The response will contain the newly created workflow ID that you'll use when submitting data for processing.
Processing Data
Send data for processing to:
/api/v1/execution/run/{workflow_id}/batch
The API supports several input formats:
- JSON array: Each array element is treated as an input event
- CSV: Each row is treated as an input event
- Text lines: Each line is treated as an input event
Make sure to set the appropriate Content-Type
header for the API to correctly parse your data.
- JSON array:
application/json
- CSV:
text/csv
- Text lines:
text/plain
Workflows created through the API cannot contain a source command in the DAG. The API backend automatically attaches an
HTTP source
at the beginning of the DAG.
Debugging Workflow Execution
To debug workflow execution, you can enable additional output parameters:
/api/v1/execution/run/{workflow_id}/batch?includeErrorByStep=true&includeOutputByStep=true
includeErrorByStep=true
: Include errors at each node in the responseincludeOutputByStep=true
: Include intermediate processing results at each node in the response
Data Transformation in ZephFlow
ZephFlow provides multiple approaches for data transformation, filtering, and validation. The framework is built with a modular command architecture, where each command in the DAG is a standalone unit that performs a specific function.
Transformation Approaches
Fleak Eval Expression Language (FEEL)
FEEL provides a concise, expressive way to manipulate data structures. Common FEEL-based commands include:
-
filter
: Selectively process events based on conditions$.status == "success" and $.response_time < 500
-
eval
: Transform data structuresdict(
user_id=$.user.id,
total_amount=$.items[0].price * $.items[0].quantity
) -
assertion
: Validate data against conditions$.user_id != null and $.transaction_id != null
SQL Transformations
ZephFlow also supports SQL-based transformations for users who prefer SQL syntax for data manipulation.
TODO: Add SQL transformation documentation
Source and Sink Commands
ZephFlow pipelines, if not running as REST API backend, typically begin with a source command and end with a sink command:
- Source commands bring data into the pipeline (e.g., file, stdin, Kafka)
- Sink commands output the processed data (e.g., file, stdout, Kafka, database)
We currently support the following sources and sinks:
- kafka source/sink
- AWS S3 sink
- AWS Kinesis Sink
- stdout source/sink
Example Pipeline Using Multiple Commands
Here's an example that demonstrates how various commands can be combined in a pipeline:
ZephFlow flow = ZephFlow.startFlow();
ZephFlow pipeline = flow
.kafkaSource("my_broker", "input_topic", "test_group", EncodingType.JSON_OBJECT, null) // Source command
.filter("$.level == 'ERROR'") // FEEL filter command
.eval("dict_merge($, dict(timestamp_ms=ts_str_to_epoch($.timestamp, 'yyyy-MM-dd HH:mm:ss')))") // FEEL transform
.sql("SELECT level, message, timestamp_ms FROM events WHERE timestamp_ms > 1632268800000") // SQL command
.kafkaSink("my_broker", "output_topic", null, EncodingType.JSON_OBJECT, null); // Sink command
Next Steps
After getting familiar with the basics of ZephFlow, you might want to:
- Learn more about the available commands and their configurations
- Set up monitoring and error handling for your data pipelines