Error Handling
ZephFlow provides robust error handling with Dead Letter Queue support (S3 and GCS) for capturing and analyzing failed events, ensuring data integrity throughout transformation pipelines.
Introduction
Error handling is a critical aspect of any data processing pipeline. ZephFlow provides robust mechanisms to handle errors that occur during event processing, ensuring that your pipelines remain resilient and your data is not lost when issues arise.
This document covers how ZephFlow handles errors, how to configure error handling options, and best practices for managing errors in your data pipelines.
Error Handling Overview
When an event flows through a ZephFlow pipeline, various errors can occur during processing. For example:
- Invalid data formats
- Failed parsing operations
- Type conversion errors
- Failed assertions or validations
- Errors in function execution
When an error occurs, ZephFlow marks the affected event as an "error event" but continues processing other events. This ensures that a single error doesn't cause the entire pipeline to break or lose other valid events.
Error Handling Modes
ZephFlow provides different error handling mechanisms depending on how the pipeline is being executed:
Standalone Process Mode
When running ZephFlow in a standalone process (e.g., as a Java application), error events can be sent to a Dead Letter Queue (DLQ). The DLQ serves as a storage location for events that couldn't be processed successfully, allowing you to:
- Investigate the cause of failures
- Potentially reprocess the events after fixing the issue
- Keep a record of problematic data for analysis
To use this mode, you need to configure a Dead Letter Queue through the JobContext when building and executing your
DAG.
REST API Backend Mode
When running ZephFlow as a REST API backend, error information can be returned directly in the HTTP response. This is particularly useful during development and testing.
To enable this, set the includeErrorByStep flag to true in your URL parameters when making requests to the API.
For more information on debugging workflow execution, see the Getting Started documentation.
Dead Letter Queue Configuration
Configuring the DLQ
To configure a Dead Letter Queue, create a DlqConfig implementation and add it to your JobContext. ZephFlow
supports S3 and GCS (Google Cloud Storage) as destinations for dead letter events.
S3 Dead Letter Queue Configuration
import io.fleak.zephflow.api.JobContext;
// Create an S3 DLQ configuration
JobContext.S3DlqConfig dlqConfig = JobContext.S3DlqConfig.builder()
.region("us-west-2") // AWS region
.bucket("my-error-events-bucket") // S3 bucket name
.batchSize(100) // Number of events to batch before writing
.flushIntervalMillis(30000) // Maximum time to wait before writing (30 seconds)
.build();
// Create a JobContext with the DLQ configuration
JobContext jobContext = JobContext.builder()
.dlqConfig(dlqConfig)
.build();
// Use the JobContext when building and executing your flow
AdjacencyListDagDefinition dagDefinition = flow.buildDag(jobContext);
GCS Dead Letter Queue Configuration
import io.fleak.zephflow.api.JobContext;
// Create a GCS DLQ configuration
JobContext.GcsDlqConfig dlqConfig = JobContext.GcsDlqConfig.builder()
.bucket("my-error-events-bucket") // GCS bucket name
.serviceAccountJson(gcpCredentialsJson) // GCP service account JSON credentials
.batchSize(100) // Number of events to batch before writing
.flushIntervalMillis(30000) // Maximum time to wait before writing (30 seconds)
.build();
// Create a JobContext with the DLQ configuration
JobContext jobContext = JobContext.builder()
.dlqConfig(dlqConfig)
.build();
// Use the JobContext when building and executing your flow
AdjacencyListDagDefinition dagDefinition = flow.buildDag(jobContext);
When using a YAML DAG definition, specify the type field to select the DLQ backend:
# S3 DLQ
jobContext:
dlqConfig:
type: "s3"
region: "us-west-2"
bucket: "my-error-events-bucket"
batchSize: 100
flushIntervalMillis: 30000
# GCS DLQ
jobContext:
dlqConfig:
type: "gcs"
bucket: "my-error-events-bucket"
serviceAccountJson: "${GCP_SERVICE_ACCOUNT_JSON}"
batchSize: 100
flushIntervalMillis: 30000
DLQ Configuration Parameters
S3 Parameters
| Parameter | Description |
|---|---|
region | AWS region where the S3 bucket is located |
bucket | Name of the S3 bucket to store error events |
accessKeyId | AWS access key ID for authentication (optional; uses default credential chain if omitted) |
secretAccessKey | AWS secret access key for authentication (optional; uses default credential chain if omitted) |
s3EndpointOverride | Custom S3 endpoint URL (optional; useful for S3-compatible storage or local testing) |
batchSize | Number of error events to accumulate before writing to S3 (triggers a flush when reached) |
flushIntervalMillis | Maximum time in milliseconds to wait before writing accumulated events to S3 (ensures events are written even if batch size isn't reached) |
GCS Parameters
| Parameter | Description |
|---|---|
bucket | Name of the GCS bucket to store error events |
serviceAccountJson | GCP service account JSON credentials for authentication |
batchSize | Number of error events to accumulate before writing to GCS (triggers a flush when reached) |
flushIntervalMillis | Maximum time in milliseconds to wait before writing accumulated events to GCS (ensures events are written even if batch size isn't reached) |
Dead Letter Format
When an error event is sent to the DLQ, it's stored using the Avro format with the following schema:
{
"namespace": "io.fleak.zephflow.lib.deadletter",
"type": "record",
"name": "DeadLetter",
"fields": [
{
"name": "processingTimestamp",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "key",
"type": [
"null",
"bytes"
],
"default": null
},
{
"name": "value",
"type": [
"null",
"bytes"
],
"default": null
},
{
"name": "metadata",
"type": [
"null",
{
"type": "map",
"values": "string"
}
],
"default": null
},
{
"name": "errorMessage",
"type": "string"
}
]
}
The dead letter record includes:
processingTimestamp: When the error occurredkey: Original event key (if applicable)value: Original event valuemetadata: Additional contextual information about the errorerrorMessage: Description of the error that occurrednodeId: The ID of the pipeline node where the error occurred
Integration with ZephFlow Pipelines
Here's a complete example of configuring error handling in a ZephFlow pipeline:
import io.fleak.zephflow.api.JobContext;
import io.fleak.zephflow.lib.serdes.EncodingType;
// Configure DLQ
JobContext.S3DlqConfig dlqConfig = JobContext.S3DlqConfig.builder()
.region("us-west-2")
.bucket("error-events")
.batchSize(50)
.flushIntervalMillis(60000) // 1 minute
.build();
JobContext jobContext = JobContext.builder()
.dlqConfig(dlqConfig)
.build();
// Create a flow with error-prone operations
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.eval("dict_merge($, dict(int_value=parse_int($.string_value)))") // Could fail if string_value is not a number
.assertion("$.int_value > 0") // Will mark events as errors if condition is not met
.kafkaSink("kafka:9092", "output-topic", null, EncodingType.JSON_OBJECT, null);
// Execute the flow with error handling configured
flow.execute("my-job-id", "production", "data-processor", jobContext);
Dead Letter Enhancements
File Path Prefix
When a prefix is configured in jobContext, dead letter file paths include this prefix, making it easier to organize DLQ files across different jobs or environments.
Node ID in Error Records
NodeExecutionException errors include the node ID in the dead letter record's metadata, helping you quickly identify which node in the pipeline caused the failure.
Raw Data Sampler
The raw data sampler periodically captures raw input data and writes it to the DLQ destination for debugging and inspection purposes. This is useful for verifying what data is entering your pipeline without modifying the pipeline itself.
Configuration
The sampler is configured via the DLQ config (both S3 and GCS support this):
// S3 example
JobContext.S3DlqConfig dlqConfig = JobContext.S3DlqConfig.builder()
.region("us-west-2")
.bucket("my-error-events-bucket")
.batchSize(100)
.flushIntervalMillis(30000)
.rawDataSampleIntervalMs(60000) // Sample raw data every 60 seconds (default)
.build();
// GCS example
JobContext.GcsDlqConfig dlqConfig = JobContext.GcsDlqConfig.builder()
.bucket("my-error-events-bucket")
.serviceAccountJson(gcpCredentialsJson)
.batchSize(100)
.flushIntervalMillis(30000)
.rawDataSampleIntervalMs(60000) // Sample raw data every 60 seconds (default)
.build();
| Parameter | Description | Default |
|---|---|---|
rawDataSampleIntervalMs | Interval in milliseconds between raw data samples | 60000 |
Use Assertions for Data Validation
The assertion node is a powerful tool for data validation in ZephFlow. Unlike the filter node (which simply drops
non-matching events), the assertion node marks events that don't meet the specified condition as error events, which
can then be routed to your DLQ.
// Validate that required fields exist and have valid values
flow.assertion("$.user_id != null and $.timestamp != null and $.amount > 0");