Skip to main content

S3 Sink Node

The s3Sink node in ZephFlow enables you to write processed data to Amazon S3 (or S3-compatible storage) with built-in batching, date-partitioned keys, and multiple encoding formats.

Overview

The S3 sink node uploads pipeline output to an S3 bucket. It uses built-in batching that accumulates records and uploads them in date-partitioned files. You can also connect to S3-compatible endpoints such as MinIO by providing a custom endpoint override.

Key Features

  • S3 Integration: Write data directly to Amazon S3 or S3-compatible storage (e.g., MinIO)
  • Built-in Batching: Automatically accumulates records and flushes at a fixed batch size or time interval
  • Date-Partitioned Keys: Automatically organizes files by year/month/day partitions
  • Multiple Encoding Formats: Support for JSON and CSV output formats
  • Retry with Exponential Backoff: Automatic retries on upload failures with doubling delay
  • Custom Endpoint Support: Connect to any S3-compatible endpoint via endpoint override

Basic Usage

The s3Sink method creates a node that uploads data to an S3 bucket:

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker-address:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.eval("dict_merge($, dict(processed=true))")
.s3Sink(
"us-east-1", // AWS region
"my-data-bucket", // S3 bucket name
"output/events", // Folder (key prefix)
EncodingType.JSON_OBJECT_LINE // Encoding type
);

Parameters

The s3Sink node provides two method overloads:

Simple Overload

s3Sink(String region, String bucket, String folder, EncodingType encodingType)
ParameterTypeDescription
regionStringAWS region (e.g., "us-east-1")
bucketStringS3 bucket name
folderStringKey prefix / folder path for uploaded objects
encodingTypeEncodingTypeFormat for serializing output data

Full Overload

s3Sink(String region, String bucket, String folder, EncodingType encodingType,
UsernamePasswordCredential credential, String s3EndpointOverride)
ParameterTypeDescription
regionStringAWS region (e.g., "us-east-1")
bucketStringS3 bucket name
folderStringKey prefix / folder path for uploaded objects
encodingTypeEncodingTypeFormat for serializing output data
credentialUsernamePasswordCredentialAccess key / secret key credential (null for default AWS credentials)
s3EndpointOverrideStringCustom S3-compatible endpoint URL (null for standard AWS S3)
info

More details about encoding type support can be found here.

Built-in Batching Behavior

The S3 sink always operates in batching mode with the following internal defaults:

SettingDefaultDescription
Batch size10,000 recordsNumber of records accumulated before flushing
Flush interval10,000ms (10s)Maximum time between flushes regardless of batch size

These settings are not configurable through the public API — they are applied automatically.

Encoding Types

The following encoding types are supported:

Encoding TypeDescription
JSON_ARRAYArray of JSON objects
JSON_OBJECT_LINEOne JSON object per line (JSONL)
CSVComma-separated values

S3 Key Pattern

Files are date-partitioned using the current date:

{folder}/year=2024/month=01/day=15/{uuid}.{ext}

For example, with folder output/events and JSON_OBJECT_LINE encoding:

output/events/year=2024/month=01/day=15/a3f8b2c1-7d4e-4a9b-b5f6-1c2d3e4f5a6b.jsonl

Common Configurations

Basic JSON Line Output

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka-broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.s3Sink(
"us-east-1",
"my-bucket",
"data/output",
EncodingType.JSON_OBJECT_LINE
);

With Explicit Credentials

UsernamePasswordCredential credential = new UsernamePasswordCredential(
"AKIAIOSFODNN7EXAMPLE", // Access key ID
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" // Secret access key
);

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka-broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.s3Sink(
"us-east-1",
"my-bucket",
"data/output",
EncodingType.JSON_OBJECT_LINE,
credential,
null // Use standard AWS S3 endpoint
);

Example Use Cases

Kafka-to-S3 Data Lake Pipeline

ZephFlow pipeline = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"raw-events",
"s3-archiver-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(" +
"event_date=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd\")," +
"processed=true" +
"))")
.s3Sink(
"us-east-1",
"data-lake-bucket",
"events/raw",
EncodingType.JSON_OBJECT_LINE
);

Log Archival with Filtering

ZephFlow logArchival = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"application-logs",
"log-archiver-group",
EncodingType.JSON_OBJECT,
null
)
.filter("$.level == \"ERROR\" or $.level == \"WARN\"")
.s3Sink(
"us-west-2",
"log-archive-bucket",
"logs/important",
EncodingType.CSV
);

S3-Compatible Storage (MinIO)

UsernamePasswordCredential minioCredential = new UsernamePasswordCredential(
"minio-access-key",
"minio-secret-key"
);

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.s3Sink(
"us-east-1",
"my-minio-bucket",
"output/data",
EncodingType.JSON_OBJECT_LINE,
minioCredential,
"http://localhost:9000" // MinIO endpoint
);

Best Practices

Encoding Selection

  • JSON_OBJECT_LINE: Best general-purpose choice — one JSON object per line, widely supported by downstream tools
  • JSON_ARRAY: Use when downstream consumers expect a JSON array
  • CSV: Use for tabular data or when integrating with tools that prefer CSV

Security and Credentials

  • When no credential is provided, the S3 sink uses the default AWS credential chain (environment variables, IAM roles, etc.)
  • For explicit credentials, use UsernamePasswordCredential where username is the access key ID and password is the secret access key
  • Prefer IAM roles over explicit credentials in production environments

S3-Compatible Endpoints

  • Use the s3EndpointOverride parameter to connect to MinIO, LocalStack, or other S3-compatible services
  • The region parameter is still required even when using a custom endpoint

Common Issues and Solutions

Credential or Permission Errors

  • Verify the IAM user or role has s3:PutObject permission on the target bucket and key prefix
  • When using explicit credentials, ensure the access key and secret key are correct
  • For S3-compatible endpoints, confirm the endpoint URL is reachable and the credentials match

Upload Failures

The S3 sink automatically retries failed uploads with exponential backoff (up to 3 retries with doubling delay). If uploads continue to fail, check:

  • Network connectivity to the S3 endpoint
  • Bucket existence and region configuration
  • Available disk space for temporary files (batching uses temp files before upload)

Integration with Other Nodes

The S3 sink node integrates well with other ZephFlow nodes:

ZephFlow pipeline = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "processor-group", EncodingType.JSON_OBJECT, null)
.filter("$.status != null")
.eval("dict_merge($, dict(processed_at=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")))")
.assertion("$.processed_at != null")
.s3Sink("us-east-1", "output-bucket", "processed/events", EncodingType.JSON_OBJECT_LINE);

pipeline.execute("s3-pipeline-job", "production", "event-processor");