Quick Start Guide
ZephFlow is a flexible data processing framework that allows you to build and execute powerful data transformation pipelines. This guide will help you get started with ZephFlow and explore its basic functionality.
Introduction
ZephFlow enables you to create data processing workflows using a directed acyclic graph (DAG) structure. You can define data flows that filter, transform, and process data using SQL or the Fleak Eval Expression Language (FEEL), a flexible expression language designed for data manipulation.
With ZephFlow, you can:
- Create data processing pipelines with filtering, transformation, and validation
- Run pipelines in various environments (standalone, as an HTTP service, or embedded)
- Process data in various formats
- Apply complex transformations using a powerful expression language
Ways to Use ZephFlow
There are three primary ways to use ZephFlow:
- Within an existing JVM program: Integrate ZephFlow directly into your Java application
- Standalone data pipeline process: Run ZephFlow as an independent process using CLI
- HTTP backend: Run ZephFlow as an HTTP service that processes data through API calls
Let's explore each approach.
Using ZephFlow Within a JVM Program
Project Setup
Add your dependency to the project:
- Gradle
- Maven
implementation 'io.fleak.zephflow:sdk:0.2.0'
<dependency>
<groupId>io.fleak.zephflow</groupId>
<artifactId>sdk</artifactId>
<version>0.2.0</version>
</dependency>
Basic Usage
Here's a simple example that demonstrates how to use ZephFlow within a Java application:
// Create a simple flow to filter numbers > 5 and add additional properties
ZephFlow flow = ZephFlow.startFlow();
// Create input flow
ZephFlow inputFlow = flow.stdinSource(EncodingType.JSON_ARRAY);
// Filter and transform
ZephFlow processedFlow =
inputFlow
.filter("$.num > 5")
.eval("dict(original=$.num, doubled=$.num*2, description='High value')");
// Output to stdout
ZephFlow outputFlow = processedFlow.stdoutSink(EncodingType.JSON_OBJECT);
// Execute the flow
outputFlow.execute("test_id","test_env","test_service");
This code creates a simple data processing pipeline that:
- Reads JSON arrays from standard input
- Filters elements where the
num
property is greater than 5 - Transforms each element by adding new properties
- Outputs the results as JSON objects to standard output
The pipeline follows this pattern: Single input source → filter → transform → output
The outputFlow.execute()
method runs the entire data flow within the same JVM thread.
Running ZephFlow as a Standalone Process
You can run ZephFlow as a standalone process using the command line interface:
Installation
Pull the CLI starter docker image:
docker pull fleak/zephflow-clistarter:latest
Usage
ZephFlow CLI starter expects a DAG definition file as the input. You need to map your local DAG file into the container when running it.
docker run \
-v <your_local_dag_definition_file>:/app/dag_definition.yml \
fleak/zephflow-clistarter:latest -f /app/dag_definition.yml
Creating a DAG Definition
For more info about ZephFlow DAGs, please refer to the dag explanation.
You can use the ZephFlow SDK to generate a DAG definition:
// Create a simple flow to filter numbers > 5 and add additional properties
ZephFlow flow = ZephFlow.startFlow();
ZephFlow outflow = ... // dag construction code
// create a dag definition object
AdjacencyListDagDefinition dagDef = outflow.buildDag();
// print out Dag definition in YAML
System.out.println(dagDef);
The generated YAML can be saved to a file and used with the CLI starter.
Running ZephFlow as an HTTP Backend
ZephFlow can also run as an HTTP API backend, providing a convenient way to expose data processing pipelines as services:
Installation
Pull the HTTP starter docker image:
docker pull fleak/zephflow-httpstarter:latest
Run the container using docker:
docker run -p 8080:8080 fleak/zephflow-httpstarter
Usage
Once started, you can access the Swagger UI at http://localhost:8080/swagger-ui/index.html
Creating a Workflow
POST to the /api/v1/workflows
endpoint to create a workflow. The request body should contain a JSON array of DAG
nodes, where each node represents a step in your workflow. For example:
{
"dag": [
{
"id": "node1",
"commandName": "filter",
"config": "$.value > 10",
"outputs": [
"node2"
]
},
{
"id": "node2",
"commandName": "eval",
"config": "dict(result=$.value * 2)",
"outputs": []
}
]
}
Each node in the DAG requires:
id
: Unique identifier for the nodecommandName
: The operation to perform (e.g., "filter", "eval")config
: Configuration string for the operation (usually a FEEL expression)outputs
: Array of node IDs that this node connects to
You can also use the ZephFlow
SDK to submit a workflow:
ZephFlow flow = ZephFlow.startFlow();
ZephFlow outputFlow = ... // code to construct the dag
// For HTTP API usage
String hostUrl = "http://localhost:8080";
String workflowId = outputFlow.submitApiEndpoint(hostUrl);
System.out.println("Created workflow with ID: " + workflowId);
The response will contain the newly created workflow ID that you'll use when submitting data for processing.
Processing Data
Send data for processing to:
/api/v1/execution/run/{workflow_id}/batch
The API supports several input formats:
- JSON array: Each array element is treated as an input event
- CSV: Each row is treated as an input event
- Text lines: Each line is treated as an input event
Make sure to set the appropriate Content-Type
header for the API to correctly parse your data.
- JSON array:
application/json
- CSV:
text/csv
- Text lines:
text/plain
Workflows created through the API cannot contain a source command in the DAG. The API backend automatically attaches an
HTTP source
at the beginning of the DAG.
Debugging Workflow Execution
To debug workflow execution, you can enable additional output parameters:
/api/v1/execution/run/{workflow_id}/batch?includeErrorByStep=true&includeOutputByStep=true
includeErrorByStep=true
: Include errors at each node in the responseincludeOutputByStep=true
: Include intermediate processing results at each node in the response
Data Transformation in ZephFlow
ZephFlow provides multiple approaches for data transformation, filtering, and validation. The framework is built with a modular command architecture, where each command in the DAG is a standalone unit that performs a specific function.
Transformation Approaches
Fleak Eval Expression Language (FEEL)
FEEL provides a concise, expressive way to manipulate data structures. Common FEEL-based commands include:
-
filter
: Selectively process events based on conditions$.status == "success" and $.response_time < 500
-
eval
: Transform data structuresdict(
user_id=$.user.id,
total_amount=$.items[0].price * $.items[0].quantity
) -
assertion
: Validate data against conditions$.user_id != null and $.transaction_id != null
SQL Transformations
ZephFlow also supports SQL-based transformations for users who prefer SQL syntax for data manipulation.
TODO: Add SQL transformation documentation
Source and Sink Commands
ZephFlow pipelines, if not running as REST API backend, typically begin with a source command and end with a sink command:
- Source commands bring data into the pipeline (e.g., file, stdin, Kafka)
- Sink commands output the processed data (e.g., file, stdout, Kafka, database)
We currently support the following sources and sinks:
- kafka source/sink
- AWS S3 sink
- AWS Kinesis Sink
- stdout source/sink
Example Pipeline Using Multiple Commands
Here's an example that demonstrates how various commands can be combined in a pipeline:
ZephFlow flow = ZephFlow.startFlow();
ZephFlow pipeline = flow
.kafkaSource("my_broker", "input_topic", "test_group", EncodingType.JSON_OBJECT, null) // Source command
.filter("$.level == 'ERROR'") // FEEL filter command
.eval("dict_merge($, dict(timestamp_ms=ts_str_to_epoch($.timestamp, 'yyyy-MM-dd HH:mm:ss')))") // FEEL transform
.sql("SELECT level, message, timestamp_ms FROM events WHERE timestamp_ms > 1632268800000") // SQL command
.kafkaSink("my_broker", "output_topic", null, EncodingType.JSON_OBJECT, null); // Sink command
Next Steps
After getting familiar with the basics of ZephFlow, you might want to:
- Learn more about the available commands and their configurations
- Set up monitoring and error handling for your data pipelines