S3 Sink Node
The s3Sink node in ZephFlow enables you to write processed data to Amazon S3 (or S3-compatible storage) with
built-in batching, date-partitioned keys, and multiple encoding formats.
Overview
The S3 sink node uploads pipeline output to an S3 bucket. It uses built-in batching that accumulates records and uploads them in date-partitioned files. You can also connect to S3-compatible endpoints such as MinIO by providing a custom endpoint override.
Key Features
- S3 Integration: Write data directly to Amazon S3 or S3-compatible storage (e.g., MinIO)
- Built-in Batching: Automatically accumulates records and flushes at a fixed batch size or time interval
- Date-Partitioned Keys: Automatically organizes files by
year/month/daypartitions - Multiple Encoding Formats: Support for JSON and CSV output formats
- Retry with Exponential Backoff: Automatic retries on upload failures with doubling delay
- Custom Endpoint Support: Connect to any S3-compatible endpoint via endpoint override
Basic Usage
The s3Sink method creates a node that uploads data to an S3 bucket:
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker-address:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.eval("dict_merge($, dict(processed=true))")
.s3Sink(
"us-east-1", // AWS region
"my-data-bucket", // S3 bucket name
"output/events", // Folder (key prefix)
EncodingType.JSON_OBJECT_LINE // Encoding type
);
Parameters
The s3Sink node provides two method overloads:
Simple Overload
s3Sink(String region, String bucket, String folder, EncodingType encodingType)
| Parameter | Type | Description |
|---|---|---|
region | String | AWS region (e.g., "us-east-1") |
bucket | String | S3 bucket name |
folder | String | Key prefix / folder path for uploaded objects |
encodingType | EncodingType | Format for serializing output data |
Full Overload
s3Sink(String region, String bucket, String folder, EncodingType encodingType,
UsernamePasswordCredential credential, String s3EndpointOverride)
| Parameter | Type | Description |
|---|---|---|
region | String | AWS region (e.g., "us-east-1") |
bucket | String | S3 bucket name |
folder | String | Key prefix / folder path for uploaded objects |
encodingType | EncodingType | Format for serializing output data |
credential | UsernamePasswordCredential | Access key / secret key credential (null for default AWS credentials) |
s3EndpointOverride | String | Custom S3-compatible endpoint URL (null for standard AWS S3) |
More details about encoding type support can be found here.
Built-in Batching Behavior
The S3 sink always operates in batching mode with the following internal defaults:
| Setting | Default | Description |
|---|---|---|
| Batch size | 10,000 records | Number of records accumulated before flushing |
| Flush interval | 10,000ms (10s) | Maximum time between flushes regardless of batch size |
These settings are not configurable through the public API — they are applied automatically.
Encoding Types
The following encoding types are supported:
| Encoding Type | Description |
|---|---|
JSON_ARRAY | Array of JSON objects |
JSON_OBJECT_LINE | One JSON object per line (JSONL) |
CSV | Comma-separated values |
S3 Key Pattern
Files are date-partitioned using the current date:
{folder}/year=2024/month=01/day=15/{uuid}.{ext}
For example, with folder output/events and JSON_OBJECT_LINE encoding:
output/events/year=2024/month=01/day=15/a3f8b2c1-7d4e-4a9b-b5f6-1c2d3e4f5a6b.jsonl
Common Configurations
Basic JSON Line Output
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka-broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.s3Sink(
"us-east-1",
"my-bucket",
"data/output",
EncodingType.JSON_OBJECT_LINE
);
With Explicit Credentials
UsernamePasswordCredential credential = new UsernamePasswordCredential(
"AKIAIOSFODNN7EXAMPLE", // Access key ID
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" // Secret access key
);
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka-broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.s3Sink(
"us-east-1",
"my-bucket",
"data/output",
EncodingType.JSON_OBJECT_LINE,
credential,
null // Use standard AWS S3 endpoint
);
Example Use Cases
Kafka-to-S3 Data Lake Pipeline
ZephFlow pipeline = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"raw-events",
"s3-archiver-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(" +
"event_date=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd\")," +
"processed=true" +
"))")
.s3Sink(
"us-east-1",
"data-lake-bucket",
"events/raw",
EncodingType.JSON_OBJECT_LINE
);
Log Archival with Filtering
ZephFlow logArchival = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"application-logs",
"log-archiver-group",
EncodingType.JSON_OBJECT,
null
)
.filter("$.level == \"ERROR\" or $.level == \"WARN\"")
.s3Sink(
"us-west-2",
"log-archive-bucket",
"logs/important",
EncodingType.CSV
);
S3-Compatible Storage (MinIO)
UsernamePasswordCredential minioCredential = new UsernamePasswordCredential(
"minio-access-key",
"minio-secret-key"
);
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.s3Sink(
"us-east-1",
"my-minio-bucket",
"output/data",
EncodingType.JSON_OBJECT_LINE,
minioCredential,
"http://localhost:9000" // MinIO endpoint
);
Best Practices
Encoding Selection
JSON_OBJECT_LINE: Best general-purpose choice — one JSON object per line, widely supported by downstream toolsJSON_ARRAY: Use when downstream consumers expect a JSON arrayCSV: Use for tabular data or when integrating with tools that prefer CSV
Security and Credentials
- When no credential is provided, the S3 sink uses the default AWS credential chain (environment variables, IAM roles, etc.)
- For explicit credentials, use
UsernamePasswordCredentialwhereusernameis the access key ID andpasswordis the secret access key - Prefer IAM roles over explicit credentials in production environments
S3-Compatible Endpoints
- Use the
s3EndpointOverrideparameter to connect to MinIO, LocalStack, or other S3-compatible services - The region parameter is still required even when using a custom endpoint
Common Issues and Solutions
Credential or Permission Errors
- Verify the IAM user or role has
s3:PutObjectpermission on the target bucket and key prefix - When using explicit credentials, ensure the access key and secret key are correct
- For S3-compatible endpoints, confirm the endpoint URL is reachable and the credentials match
Upload Failures
The S3 sink automatically retries failed uploads with exponential backoff (up to 3 retries with doubling delay). If uploads continue to fail, check:
- Network connectivity to the S3 endpoint
- Bucket existence and region configuration
- Available disk space for temporary files (batching uses temp files before upload)
Integration with Other Nodes
The S3 sink node integrates well with other ZephFlow nodes:
ZephFlow pipeline = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "processor-group", EncodingType.JSON_OBJECT, null)
.filter("$.status != null")
.eval("dict_merge($, dict(processed_at=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")))")
.assertion("$.processed_at != null")
.s3Sink("us-east-1", "output-bucket", "processed/events", EncodingType.JSON_OBJECT_LINE);
pipeline.execute("s3-pipeline-job", "production", "event-processor");
Related Nodes
- kafkaSource: Read data from Kafka topics
- kafkaSink: Write data to Kafka topics