Error Handling
Introduction
Error handling is a critical aspect of any data processing pipeline. ZephFlow provides robust mechanisms to handle errors that occur during event processing, ensuring that your pipelines remain resilient and your data is not lost when issues arise.
This document covers how ZephFlow handles errors, how to configure error handling options, and best practices for managing errors in your data pipelines.
Error Handling Overview
When an event flows through a ZephFlow pipeline, various errors can occur during processing. For example:
- Invalid data formats
- Failed parsing operations
- Type conversion errors
- Failed assertions or validations
- Errors in function execution
When an error occurs, ZephFlow marks the affected event as an "error event" but continues processing other events. This ensures that a single error doesn't cause the entire pipeline to break or lose other valid events.
Error Handling Modes
ZephFlow provides different error handling mechanisms depending on how the pipeline is being executed:
Standalone Process Mode
When running ZephFlow in a standalone process (e.g., as a Java application), error events can be sent to a Dead Letter Queue (DLQ). The DLQ serves as a storage location for events that couldn't be processed successfully, allowing you to:
- Investigate the cause of failures
- Potentially reprocess the events after fixing the issue
- Keep a record of problematic data for analysis
To use this mode, you need to configure a Dead Letter Queue through the JobContext
when building and executing your
DAG.
REST API Backend Mode
When running ZephFlow as a REST API backend, error information can be returned directly in the HTTP response. This is particularly useful during development and testing.
To enable this, set the includeErrorByStep
flag to true
in your URL parameters when making requests to the API.
For more information on debugging workflow execution, see the Getting Started documentation.
Dead Letter Queue Configuration
Configuring the DLQ
To configure a Dead Letter Queue, create a DlqConfig
implementation and add it to your JobContext
. Currently,
ZephFlow supports S3 as a destination for dead letter events.
S3 Dead Letter Queue Configuration
import io.fleak.commons.api.JobContext;
// Create an S3 DLQ configuration
JobContext.S3DlqConfig dlqConfig = JobContext.S3DlqConfig.builder()
.region("us-west-2") // AWS region
.bucket("my-error-events-bucket") // S3 bucket name
.batchSize(100) // Number of events to batch before writing
.flushIntervalMillis(30000) // Maximum time to wait before writing (30 seconds)
.build();
// Create a JobContext with the DLQ configuration
JobContext jobContext = JobContext.builder()
.dlqConfig(dlqConfig)
.build();
// Use the JobContext when building and executing your flow
AdjacencyListDagDefinition dagDefinition = flow.buildDag(jobContext);
DLQ Configuration Parameters
The S3 Dead Letter Queue configuration includes the following parameters:
Parameter | Description |
---|---|
region | AWS region where the S3 bucket is located |
bucket | Name of the S3 bucket to store error events |
batchSize | Number of error events to accumulate before writing to S3 (triggers a flush when reached) |
flushIntervalMillis | Maximum time in milliseconds to wait before writing accumulated events to S3 (ensures events are written even if batch size isn't reached) |
Dead Letter Format
When an error event is sent to the DLQ, it's stored using the Avro format with the following schema:
{
"namespace": "io.fleak.commons.lib.deadletter",
"type": "record",
"name": "DeadLetter",
"fields": [
{
"name": "processingTimestamp",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "key",
"type": [
"null",
"bytes"
],
"default": null
},
{
"name": "value",
"type": [
"null",
"bytes"
],
"default": null
},
{
"name": "metadata",
"type": [
"null",
{
"type": "map",
"values": "string"
}
],
"default": null
},
{
"name": "errorMessage",
"type": "string"
}
]
}
The dead letter record includes:
processingTimestamp
: When the error occurredkey
: Original event key (if applicable)value
: Original event valuemetadata
: Additional contextual information about the errorerrorMessage
: Description of the error that occurred
Integration with ZephFlow Pipelines
Here's a complete example of configuring error handling in a ZephFlow pipeline:
import io.fleak.commons.api.JobContext;
import io.fleak.commons.lib.serdes.EncodingType;
// Configure DLQ
JobContext.S3DlqConfig dlqConfig = JobContext.S3DlqConfig.builder()
.region("us-west-2")
.bucket("error-events")
.batchSize(50)
.flushIntervalMillis(60000) // 1 minute
.build();
JobContext jobContext = JobContext.builder()
.dlqConfig(dlqConfig)
.build();
// Create a flow with error-prone operations
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.eval("dict_merge($, dict(int_value=parse_int($.string_value)))") // Could fail if string_value is not a number
.assertion("$.int_value > 0") // Will mark events as errors if condition is not met
.kafkaSink("kafka:9092", "output-topic", null, EncodingType.JSON_OBJECT, null);
// Execute the flow with error handling configured
flow.execute("my-job-id", "production", "data-processor", jobContext);
Use Assertions for Data Validation
The assertion
node is a powerful tool for data validation in ZephFlow. Unlike the filter
node (which simply drops
non-matching events), the assertion
node marks events that don't meet the specified condition as error events, which
can then be routed to your DLQ.
// Validate that required fields exist and have valid values
flow.assertion("$.user_id != null and $.timestamp != null and $.amount > 0");