Skip to main content

Error Handling

Introduction

Error handling is a critical aspect of any data processing pipeline. ZephFlow provides robust mechanisms to handle errors that occur during event processing, ensuring that your pipelines remain resilient and your data is not lost when issues arise.

This document covers how ZephFlow handles errors, how to configure error handling options, and best practices for managing errors in your data pipelines.

Error Handling Overview

When an event flows through a ZephFlow pipeline, various errors can occur during processing. For example:

  • Invalid data formats
  • Failed parsing operations
  • Type conversion errors
  • Failed assertions or validations
  • Errors in function execution

When an error occurs, ZephFlow marks the affected event as an "error event" but continues processing other events. This ensures that a single error doesn't cause the entire pipeline to break or lose other valid events.

Error Handling Modes

ZephFlow provides different error handling mechanisms depending on how the pipeline is being executed:

Standalone Process Mode

When running ZephFlow in a standalone process (e.g., as a Java application), error events can be sent to a Dead Letter Queue (DLQ). The DLQ serves as a storage location for events that couldn't be processed successfully, allowing you to:

  • Investigate the cause of failures
  • Potentially reprocess the events after fixing the issue
  • Keep a record of problematic data for analysis

To use this mode, you need to configure a Dead Letter Queue through the JobContext when building and executing your DAG.

REST API Backend Mode

When running ZephFlow as a REST API backend, error information can be returned directly in the HTTP response. This is particularly useful during development and testing.

To enable this, set the includeErrorByStep flag to true in your URL parameters when making requests to the API.

For more information on debugging workflow execution, see the Getting Started documentation.

Dead Letter Queue Configuration

Configuring the DLQ

To configure a Dead Letter Queue, create a DlqConfig implementation and add it to your JobContext. Currently, ZephFlow supports S3 as a destination for dead letter events.

S3 Dead Letter Queue Configuration

import io.fleak.commons.api.JobContext;

// Create an S3 DLQ configuration
JobContext.S3DlqConfig dlqConfig = JobContext.S3DlqConfig.builder()
.region("us-west-2") // AWS region
.bucket("my-error-events-bucket") // S3 bucket name
.batchSize(100) // Number of events to batch before writing
.flushIntervalMillis(30000) // Maximum time to wait before writing (30 seconds)
.build();

// Create a JobContext with the DLQ configuration
JobContext jobContext = JobContext.builder()
.dlqConfig(dlqConfig)
.build();

// Use the JobContext when building and executing your flow
AdjacencyListDagDefinition dagDefinition = flow.buildDag(jobContext);

DLQ Configuration Parameters

The S3 Dead Letter Queue configuration includes the following parameters:

ParameterDescription
regionAWS region where the S3 bucket is located
bucketName of the S3 bucket to store error events
batchSizeNumber of error events to accumulate before writing to S3 (triggers a flush when reached)
flushIntervalMillisMaximum time in milliseconds to wait before writing accumulated events to S3 (ensures events are written even if batch size isn't reached)

Dead Letter Format

When an error event is sent to the DLQ, it's stored using the Avro format with the following schema:

{
"namespace": "io.fleak.commons.lib.deadletter",
"type": "record",
"name": "DeadLetter",
"fields": [
{
"name": "processingTimestamp",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "key",
"type": [
"null",
"bytes"
],
"default": null
},
{
"name": "value",
"type": [
"null",
"bytes"
],
"default": null
},
{
"name": "metadata",
"type": [
"null",
{
"type": "map",
"values": "string"
}
],
"default": null
},
{
"name": "errorMessage",
"type": "string"
}
]
}

The dead letter record includes:

  • processingTimestamp: When the error occurred
  • key: Original event key (if applicable)
  • value: Original event value
  • metadata: Additional contextual information about the error
  • errorMessage: Description of the error that occurred

Integration with ZephFlow Pipelines

Here's a complete example of configuring error handling in a ZephFlow pipeline:

import io.fleak.commons.api.JobContext;
import io.fleak.commons.lib.serdes.EncodingType;

// Configure DLQ
JobContext.S3DlqConfig dlqConfig = JobContext.S3DlqConfig.builder()
.region("us-west-2")
.bucket("error-events")
.batchSize(50)
.flushIntervalMillis(60000) // 1 minute
.build();

JobContext jobContext = JobContext.builder()
.dlqConfig(dlqConfig)
.build();

// Create a flow with error-prone operations
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.eval("dict_merge($, dict(int_value=parse_int($.string_value)))") // Could fail if string_value is not a number
.assertion("$.int_value > 0") // Will mark events as errors if condition is not met
.kafkaSink("kafka:9092", "output-topic", null, EncodingType.JSON_OBJECT, null);

// Execute the flow with error handling configured
flow.execute("my-job-id", "production", "data-processor", jobContext);

Use Assertions for Data Validation

The assertion node is a powerful tool for data validation in ZephFlow. Unlike the filter node (which simply drops non-matching events), the assertion node marks events that don't meet the specified condition as error events, which can then be routed to your DLQ.

// Validate that required fields exist and have valid values
flow.assertion("$.user_id != null and $.timestamp != null and $.amount > 0");