Skip to main content

Error Handling

ZephFlow provides robust error handling with Dead Letter Queue support (S3 and GCS) for capturing and analyzing failed events, ensuring data integrity throughout transformation pipelines.

Introduction

Error handling is a critical aspect of any data processing pipeline. ZephFlow provides robust mechanisms to handle errors that occur during event processing, ensuring that your pipelines remain resilient and your data is not lost when issues arise.

This document covers how ZephFlow handles errors, how to configure error handling options, and best practices for managing errors in your data pipelines.

Error Handling Overview

When an event flows through a ZephFlow pipeline, various errors can occur during processing. For example:

  • Invalid data formats
  • Failed parsing operations
  • Type conversion errors
  • Failed assertions or validations
  • Errors in function execution

When an error occurs, ZephFlow marks the affected event as an "error event" but continues processing other events. This ensures that a single error doesn't cause the entire pipeline to break or lose other valid events.

Error Handling Modes

ZephFlow provides different error handling mechanisms depending on how the pipeline is being executed:

Standalone Process Mode

When running ZephFlow in a standalone process (e.g., as a Java application), error events can be sent to a Dead Letter Queue (DLQ). The DLQ serves as a storage location for events that couldn't be processed successfully, allowing you to:

  • Investigate the cause of failures
  • Potentially reprocess the events after fixing the issue
  • Keep a record of problematic data for analysis

To use this mode, you need to configure a Dead Letter Queue through the JobContext when building and executing your DAG.

REST API Backend Mode

When running ZephFlow as a REST API backend, error information can be returned directly in the HTTP response. This is particularly useful during development and testing.

To enable this, set the includeErrorByStep flag to true in your URL parameters when making requests to the API.

For more information on debugging workflow execution, see the Getting Started documentation.

Dead Letter Queue Configuration

Configuring the DLQ

To configure a Dead Letter Queue, create a DlqConfig implementation and add it to your JobContext. ZephFlow supports S3 and GCS (Google Cloud Storage) as destinations for dead letter events.

S3 Dead Letter Queue Configuration

import io.fleak.zephflow.api.JobContext;

// Create an S3 DLQ configuration
JobContext.S3DlqConfig dlqConfig = JobContext.S3DlqConfig.builder()
.region("us-west-2") // AWS region
.bucket("my-error-events-bucket") // S3 bucket name
.batchSize(100) // Number of events to batch before writing
.flushIntervalMillis(30000) // Maximum time to wait before writing (30 seconds)
.build();

// Create a JobContext with the DLQ configuration
JobContext jobContext = JobContext.builder()
.dlqConfig(dlqConfig)
.build();

// Use the JobContext when building and executing your flow
AdjacencyListDagDefinition dagDefinition = flow.buildDag(jobContext);

GCS Dead Letter Queue Configuration

import io.fleak.zephflow.api.JobContext;

// Create a GCS DLQ configuration
JobContext.GcsDlqConfig dlqConfig = JobContext.GcsDlqConfig.builder()
.bucket("my-error-events-bucket") // GCS bucket name
.serviceAccountJson(gcpCredentialsJson) // GCP service account JSON credentials
.batchSize(100) // Number of events to batch before writing
.flushIntervalMillis(30000) // Maximum time to wait before writing (30 seconds)
.build();

// Create a JobContext with the DLQ configuration
JobContext jobContext = JobContext.builder()
.dlqConfig(dlqConfig)
.build();

// Use the JobContext when building and executing your flow
AdjacencyListDagDefinition dagDefinition = flow.buildDag(jobContext);

When using a YAML DAG definition, specify the type field to select the DLQ backend:

# S3 DLQ
jobContext:
dlqConfig:
type: "s3"
region: "us-west-2"
bucket: "my-error-events-bucket"
batchSize: 100
flushIntervalMillis: 30000

# GCS DLQ
jobContext:
dlqConfig:
type: "gcs"
bucket: "my-error-events-bucket"
serviceAccountJson: "${GCP_SERVICE_ACCOUNT_JSON}"
batchSize: 100
flushIntervalMillis: 30000

DLQ Configuration Parameters

S3 Parameters

ParameterDescription
regionAWS region where the S3 bucket is located
bucketName of the S3 bucket to store error events
accessKeyIdAWS access key ID for authentication (optional; uses default credential chain if omitted)
secretAccessKeyAWS secret access key for authentication (optional; uses default credential chain if omitted)
s3EndpointOverrideCustom S3 endpoint URL (optional; useful for S3-compatible storage or local testing)
batchSizeNumber of error events to accumulate before writing to S3 (triggers a flush when reached)
flushIntervalMillisMaximum time in milliseconds to wait before writing accumulated events to S3 (ensures events are written even if batch size isn't reached)

GCS Parameters

ParameterDescription
bucketName of the GCS bucket to store error events
serviceAccountJsonGCP service account JSON credentials for authentication
batchSizeNumber of error events to accumulate before writing to GCS (triggers a flush when reached)
flushIntervalMillisMaximum time in milliseconds to wait before writing accumulated events to GCS (ensures events are written even if batch size isn't reached)

Dead Letter Format

When an error event is sent to the DLQ, it's stored using the Avro format with the following schema:

{
"namespace": "io.fleak.zephflow.lib.deadletter",
"type": "record",
"name": "DeadLetter",
"fields": [
{
"name": "processingTimestamp",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "key",
"type": [
"null",
"bytes"
],
"default": null
},
{
"name": "value",
"type": [
"null",
"bytes"
],
"default": null
},
{
"name": "metadata",
"type": [
"null",
{
"type": "map",
"values": "string"
}
],
"default": null
},
{
"name": "errorMessage",
"type": "string"
}
]
}

The dead letter record includes:

  • processingTimestamp: When the error occurred
  • key: Original event key (if applicable)
  • value: Original event value
  • metadata: Additional contextual information about the error
  • errorMessage: Description of the error that occurred
  • nodeId: The ID of the pipeline node where the error occurred

Integration with ZephFlow Pipelines

Here's a complete example of configuring error handling in a ZephFlow pipeline:

import io.fleak.zephflow.api.JobContext;
import io.fleak.zephflow.lib.serdes.EncodingType;

// Configure DLQ
JobContext.S3DlqConfig dlqConfig = JobContext.S3DlqConfig.builder()
.region("us-west-2")
.bucket("error-events")
.batchSize(50)
.flushIntervalMillis(60000) // 1 minute
.build();

JobContext jobContext = JobContext.builder()
.dlqConfig(dlqConfig)
.build();

// Create a flow with error-prone operations
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.eval("dict_merge($, dict(int_value=parse_int($.string_value)))") // Could fail if string_value is not a number
.assertion("$.int_value > 0") // Will mark events as errors if condition is not met
.kafkaSink("kafka:9092", "output-topic", null, EncodingType.JSON_OBJECT, null);

// Execute the flow with error handling configured
flow.execute("my-job-id", "production", "data-processor", jobContext);

Dead Letter Enhancements

File Path Prefix

When a prefix is configured in jobContext, dead letter file paths include this prefix, making it easier to organize DLQ files across different jobs or environments.

Node ID in Error Records

NodeExecutionException errors include the node ID in the dead letter record's metadata, helping you quickly identify which node in the pipeline caused the failure.

Raw Data Sampler

The raw data sampler periodically captures raw input data and writes it to the DLQ destination for debugging and inspection purposes. This is useful for verifying what data is entering your pipeline without modifying the pipeline itself.

Configuration

The sampler is configured via the DLQ config (both S3 and GCS support this):

// S3 example
JobContext.S3DlqConfig dlqConfig = JobContext.S3DlqConfig.builder()
.region("us-west-2")
.bucket("my-error-events-bucket")
.batchSize(100)
.flushIntervalMillis(30000)
.rawDataSampleIntervalMs(60000) // Sample raw data every 60 seconds (default)
.build();

// GCS example
JobContext.GcsDlqConfig dlqConfig = JobContext.GcsDlqConfig.builder()
.bucket("my-error-events-bucket")
.serviceAccountJson(gcpCredentialsJson)
.batchSize(100)
.flushIntervalMillis(30000)
.rawDataSampleIntervalMs(60000) // Sample raw data every 60 seconds (default)
.build();
ParameterDescriptionDefault
rawDataSampleIntervalMsInterval in milliseconds between raw data samples60000

Use Assertions for Data Validation

The assertion node is a powerful tool for data validation in ZephFlow. Unlike the filter node (which simply drops non-matching events), the assertion node marks events that don't meet the specified condition as error events, which can then be routed to your DLQ.

// Validate that required fields exist and have valid values
flow.assertion("$.user_id != null and $.timestamp != null and $.amount > 0");