Skip to main content

Quick Start Guide

ZephFlow is a flexible data processing framework that allows you to build and execute powerful data transformation pipelines. This guide will help you get started with ZephFlow and explore its basic functionality.

Introduction

ZephFlow enables you to create data processing workflows using a directed acyclic graph (DAG) structure. You can define data flows that filter, transform, and process data using SQL or the Fleak Eval Expression Language (FEEL), a flexible expression language designed for data manipulation.

With ZephFlow, you can:

  • Create data processing pipelines with filtering, transformation, and validation
  • Run pipelines in various environments (standalone, as an HTTP service, or embedded)
  • Process data in various formats
  • Apply complex transformations using a powerful expression language

Ways to Use ZephFlow

There are three primary ways to use ZephFlow:

  1. Within an existing JVM program: Integrate ZephFlow directly into your Java application
  2. Standalone data pipeline process: Run ZephFlow as an independent process using CLI
  3. HTTP backend: Run ZephFlow as an HTTP service that processes data through API calls

Let's explore each approach.

Using ZephFlow Within a JVM Program

Project Setup

Add your dependency to the project:

implementation 'io.fleak.zephflow:sdk:0.2.0'

Basic Usage

Here's a simple example that demonstrates how to use ZephFlow within a Java application:

// Create a simple flow to filter numbers > 5 and add additional properties
ZephFlow flow = ZephFlow.startFlow();

// Create input flow
ZephFlow inputFlow = flow.stdinSource(EncodingType.JSON_ARRAY);

// Filter and transform
ZephFlow processedFlow =
inputFlow
.filter("$.num > 5")
.eval("dict(original=$.num, doubled=$.num*2, description='High value')");

// Output to stdout
ZephFlow outputFlow = processedFlow.stdoutSink(EncodingType.JSON_OBJECT);

// Execute the flow
outputFlow.execute("test_id","test_env","test_service");

This code creates a simple data processing pipeline that:

  1. Reads JSON arrays from standard input
  2. Filters elements where the num property is greater than 5
  3. Transforms each element by adding new properties
  4. Outputs the results as JSON objects to standard output

The pipeline follows this pattern: Single input source → filter → transform → output

The outputFlow.execute() method runs the entire data flow within the same JVM thread.

Running ZephFlow as a Standalone Process

You can run ZephFlow as a standalone process using the command line interface:

Installation

Pull the CLI starter docker image:

docker pull fleak/zephflow-clistarter:latest

Usage

ZephFlow CLI starter expects a DAG definition file as the input. You need to map your local DAG file into the container when running it.

docker run \
-v <your_local_dag_definition_file>:/app/dag_definition.yml \
fleak/zephflow-clistarter:latest -f /app/dag_definition.yml

Creating a DAG Definition

info

For more info about ZephFlow DAGs, please refer to the dag explanation.

You can use the ZephFlow SDK to generate a DAG definition:

// Create a simple flow to filter numbers > 5 and add additional properties
ZephFlow flow = ZephFlow.startFlow();

ZephFlow outflow = ... // dag construction code

// create a dag definition object
AdjacencyListDagDefinition dagDef = outflow.buildDag();

// print out Dag definition in YAML
System.out.println(dagDef);

The generated YAML can be saved to a file and used with the CLI starter.

Running ZephFlow as an HTTP Backend

ZephFlow can also run as an HTTP API backend, providing a convenient way to expose data processing pipelines as services:

Installation

Pull the HTTP starter docker image:

docker pull fleak/zephflow-httpstarter:latest

Run the container using docker:

docker run -p 8080:8080 fleak/zephflow-httpstarter

Usage

Once started, you can access the Swagger UI at http://localhost:8080/swagger-ui/index.html

Creating a Workflow

POST to the /api/v1/workflows endpoint to create a workflow. The request body should contain a JSON array of DAG nodes, where each node represents a step in your workflow. For example:

{
"dag": [
{
"id": "node1",
"commandName": "filter",
"config": "$.value > 10",
"outputs": [
"node2"
]
},
{
"id": "node2",
"commandName": "eval",
"config": "dict(result=$.value * 2)",
"outputs": []
}
]
}

Each node in the DAG requires:

  • id: Unique identifier for the node
  • commandName: The operation to perform (e.g., "filter", "eval")
  • config: Configuration string for the operation (usually a FEEL expression)
  • outputs: Array of node IDs that this node connects to

You can also use the ZephFlow SDK to submit a workflow:

ZephFlow flow = ZephFlow.startFlow();

ZephFlow outputFlow = ... // code to construct the dag

// For HTTP API usage
String hostUrl = "http://localhost:8080";
String workflowId = outputFlow.submitApiEndpoint(hostUrl);
System.out.println("Created workflow with ID: " + workflowId);

The response will contain the newly created workflow ID that you'll use when submitting data for processing.

Processing Data

Send data for processing to:

/api/v1/execution/run/{workflow_id}/batch

The API supports several input formats:

  • JSON array: Each array element is treated as an input event
  • CSV: Each row is treated as an input event
  • Text lines: Each line is treated as an input event

Make sure to set the appropriate Content-Type header for the API to correctly parse your data.

  • JSON array: application/json
  • CSV: text/csv
  • Text lines: text/plain
info

Workflows created through the API cannot contain a source command in the DAG. The API backend automatically attaches an HTTP source at the beginning of the DAG.

Debugging Workflow Execution

To debug workflow execution, you can enable additional output parameters:

/api/v1/execution/run/{workflow_id}/batch?includeErrorByStep=true&includeOutputByStep=true
  • includeErrorByStep=true: Include errors at each node in the response
  • includeOutputByStep=true: Include intermediate processing results at each node in the response

Data Transformation in ZephFlow

ZephFlow provides multiple approaches for data transformation, filtering, and validation. The framework is built with a modular command architecture, where each command in the DAG is a standalone unit that performs a specific function.

Transformation Approaches

Fleak Eval Expression Language (FEEL)

FEEL provides a concise, expressive way to manipulate data structures. Common FEEL-based commands include:

  • filter: Selectively process events based on conditions

    $.status == "success" and $.response_time < 500
  • eval: Transform data structures

    dict(
    user_id=$.user.id,
    total_amount=$.items[0].price * $.items[0].quantity
    )
  • assertion: Validate data against conditions

    $.user_id != null and $.transaction_id != null

SQL Transformations

ZephFlow also supports SQL-based transformations for users who prefer SQL syntax for data manipulation.

TODO: Add SQL transformation documentation

Source and Sink Commands

ZephFlow pipelines, if not running as REST API backend, typically begin with a source command and end with a sink command:

  • Source commands bring data into the pipeline (e.g., file, stdin, Kafka)
  • Sink commands output the processed data (e.g., file, stdout, Kafka, database)

We currently support the following sources and sinks:

  • kafka source/sink
  • AWS S3 sink
  • AWS Kinesis Sink
  • stdout source/sink

Example Pipeline Using Multiple Commands

Here's an example that demonstrates how various commands can be combined in a pipeline:

ZephFlow flow = ZephFlow.startFlow();

ZephFlow pipeline = flow
.kafkaSource("my_broker", "input_topic", "test_group", EncodingType.JSON_OBJECT, null) // Source command
.filter("$.level == 'ERROR'") // FEEL filter command
.eval("dict_merge($, dict(timestamp_ms=ts_str_to_epoch($.timestamp, 'yyyy-MM-dd HH:mm:ss')))") // FEEL transform
.sql("SELECT level, message, timestamp_ms FROM events WHERE timestamp_ms > 1632268800000") // SQL command
.kafkaSink("my_broker", "output_topic", null, EncodingType.JSON_OBJECT, null); // Sink command

Next Steps

After getting familiar with the basics of ZephFlow, you might want to: