DAG Structure
ZephFlow uses a Directed Acyclic Graph (DAG) to represent data processing pipelines. Understanding the DAG structure is essential for creating and running effective data processing workflows.
DAG Definition Format
Depending on how you use ZephFlow, the DAG is represented in different formats:
- When using ZephFlow as a standalone process with CLI or as a JVM process, the DAG is defined in YAML format
- When using ZephFlow as an HTTP backend, the DAG is defined in JSON format
Both formats contain the same essential information about the nodes and their connections.
Core Components of a DAG
A ZephFlow DAG consists of:
- Nodes: Individual processing steps (commands) in the pipeline
- Connections: Directed edges that define how data flows between nodes
- Optional context: Job-level configuration that applies to the entire pipeline
Node Structure
Each node in a DAG has the following properties:
id
: A unique identifier for the node. No duplicate IDs are allowed in a DAG.commandName
: The type of operation to perform (e.g., "filter", "eval", "kafka", "stdin").config
: Configuration specific to the node/command.outputs
: A list of node IDs that this node connects to (downstream nodes).
Example DAG Structure
Here's an example of a complete DAG definition in YAML format:
jobContext:
otherProperties:
metricTags:
dlqConfig:
dag:
# /->(even events)->(add even tag) b-\
# a-->(odd events) ->(add odd tag) c -> d
# \-> (all events) -/
- id: "a"
commandName: "testSource"
config:
outputs:
- "a_to_b_filter"
- "a_to_c_filter"
- "d"
- id: "a_to_b_filter"
commandName: "filter"
config: |
$.num%2 == 0
outputs:
- "b"
- id: "a_to_c_filter"
commandName: "filter"
config: |
$.num%2 == 1
outputs:
- "c"
- id: "b"
commandName: "eval"
config: |
dict(
type='even',
num=$.num
)
outputs:
- "d"
- id: "c"
commandName: "eval"
config: |
dict(
type='odd',
num=$.num
)
outputs:
- "d"
- id: "d"
commandName: "testSink"
config:
This DAG represents a pipeline that:
- Receives data at node
a
- Filters events into even and odd numbers via
a_to_b_filter
anda_to_c_filter
- Adds tags in nodes
b
andc
- Merges all processed events in node
d
The same DAG in JSON format (for HTTP API usage) would look like:
{
"dag": [
{
"id": "a",
"commandName": "testSource",
"config": "",
"outputs": [
"a_to_b_filter",
"a_to_c_filter",
"d"
]
},
{
"id": "a_to_b_filter",
"commandName": "filter",
"config": "$.num%2 == 0",
"outputs": [
"b"
]
},
{
"id": "a_to_c_filter",
"commandName": "filter",
"config": "$.num%2 == 1",
"outputs": [
"c"
]
},
{
"id": "b",
"commandName": "eval",
"config": "dict(\n type='even',\n num=$.num\n)",
"outputs": [
"d"
]
},
{
"id": "c",
"commandName": "eval",
"config": "dict(\n type='odd',\n num=$.num\n)",
"outputs": [
"d"
]
},
{
"id": "d",
"commandName": "testSink",
"config": "",
"outputs": []
}
]
}
Creating DAGs Using the ZephFlow SDK
While you can write DAG definitions manually, using the ZephFlow SDK to build your pipelines is recommended. The SDK provides a fluent API that makes it easy to create complex DAGs:
// Create a flow with branching and merging
ZephFlow flow = ZephFlow.startFlow();
ZephFlow inputFlow = flow.stdinSource(EncodingType.JSON_ARRAY);
// Create even flow: filter for even numbers, then add even tag
ZephFlow evenFlow =
inputFlow
.filter("$.num%2 == 0") // Apply condition for even numbers
.eval("dict(type='even', num=$.num)");
// Create odd flow: filter for odd numbers, then add odd tag
ZephFlow oddFlow =
inputFlow
.filter("$.num%2 == 1") // Apply condition for odd numbers
.eval("dict(type='odd', num=$.num)");
// Merge the two flows
ZephFlow mergedFlow = ZephFlow.merge(evenFlow, oddFlow);
// Connect to stdout sink
ZephFlow outputFlow = mergedFlow.stdoutSink(EncodingType.JSON_OBJECT);
// Generate the DAG definition
AdjacencyListDagDefinition dagDef = outputFlow.buildDag();
The SDK ensures that nodes are properly connected and generates unique IDs automatically. Once you've built your flow, you can:
- Execute it directly:
outputFlow.execute(jobId, env, service)
- Convert it to a DAG definition:
outputFlow.buildDag()
DAG Validation Rules
ZephFlow enforces several rules on DAG structures to ensure valid processing pipelines:
- Entry Points: If not running as Rest API backend, a DAG must have exactly one entry point (source node). Otherwise it cannot contain source command nodes.
- Source Nodes: Source nodes cannot have incoming connections.
- Sink Nodes: Sink nodes must be terminal nodes (cannot have outgoing connections).
- Node IDs: Each node must have a unique ID within the DAG.
- Non-empty DAG: A DAG must contain at least one node.
Common DAG Patterns
Linear Flow
A simple sequence of operations:
Source → Filter → Transform → Sink
Branching Flow
Process data differently based on conditions:
/→ FilterA → TransformA → \
Source → → Sink
\→ FilterB → TransformB → /
Multiple Terminal Sinks
Send different processed data to different destinations:
/→ FilterA → SinkA
Source →
\→ FilterB → SinkB
Complex Nested Flows
Combine branching and merging at multiple levels:
/→ TransformA → \
/→ FilterA \
/ \→ TransformB → /
/ \
Source → → Merge → Sink
\ /
\ /→ TransformC → /
\→ FilterB
\→ TransformD → \
\→ SinkD