Filter Node
The filter node allows you to selectively process records in your ZephFlow pipeline based on specific conditions. It
evaluates each incoming record against an expression written in the Fleak Eval Expression Language. Records that evaluate
to true are passed to downstream nodes, otherwise are dropped from the pipeline.
For more details of using FEEL, please refer to FEEL Reference
Key Features
- Conditional Processing: Apply downstream transformations only to records that match specific criteria
- Data Validation: Filter out invalid or incomplete records early in your pipeline
- Conditional Branching: Use multiple filter nodes to create branched pipelines for different record types
- Performance Optimization: Reduce load on downstream processors by dropping unnecessary records early
Attaching Filter Node Using SDK
ZephFlow flow = ZephFlow.startFlow();
flow.filter("your FEEL filter condition");
Basic Usage
# Keep only records where the status field equals "success"
$.status == "success"
# Keep only records with response time under 500ms
$.response_time < 500
# Keep only records that have a non-empty items array
size_of($.items) > 0
# Complex conditions can be combined with logical operators
$.status == "success" and $.response_time < 500
# Non boolean evaluation result will cause record to be dropped
dict(value=$.status)
Config Object
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
expression | String | Yes | — | FEEL expression that evaluates to true to keep the record |
Examples
Basic Filtering
# Process only error records
$.level == "ERROR"
# Filter out null values
$.user_id != null
Complex Conditions
# Process only high-priority errors
$.level == "ERROR" and $.priority > 3
# Process records from specific sources OR with high severity
$.source == "payment-gateway" or $.severity == "critical"
# Process records with complex nested conditions
($.type == "transaction" and $.amount > 1000) or ($.type == "login" and str_contains($.ip, "192.168."))
Using Functions
# Filter based on array content
size_of($.tags) > 0
# Filter based on string content
str_contains(lower($.message), "error")
# Filter using case expressions
case(
$.status == 200 => true,
$.retry_count < 3 => true,
_ => false
)
Type-Specific Filtering
# Numeric comparisons
$.age >= 18 and $.age <= 65
# String operations
str_contains(lower($.user_agent), "mozilla")
# Timestamp filtering
ts_str_to_epoch($.timestamp, "yyyy-MM-dd") > 1609459200000 # After January 1, 2021 12:00:00 AM UTC