Skip to main content

S3 Sink Node

The s3Sink node in ZephFlow enables you to write processed data to Amazon S3 (or S3-compatible storage) with built-in batching, date-partitioned keys, and multiple encoding formats.

Overview

The S3 sink node uploads pipeline output to an S3 bucket. It uses built-in batching that accumulates records and uploads them in date-partitioned files. You can also connect to S3-compatible endpoints such as MinIO by providing a custom endpoint override.

Key Features

  • S3 Integration: Write data directly to Amazon S3 or S3-compatible storage (e.g., MinIO)
  • Built-in Batching: Automatically accumulates records and flushes at a fixed batch size or time interval
  • Date-Partitioned Keys: Automatically organizes files by year/month/day partitions
  • Multiple Encoding Formats: Support for JSON, CSV, and Parquet output formats
  • Retry with Exponential Backoff: Automatic retries on upload failures with doubling delay
  • Custom Endpoint Support: Connect to any S3-compatible endpoint via endpoint override

Parameters

The s3Sink node provides two method overloads:

Simple Overload

ParameterTypeDescription
regionStringAWS region (e.g., "us-east-1")
bucketStringS3 bucket name
folderStringKey prefix / folder path for uploaded objects
encodingTypeEncodingTypeFormat for serializing output data

Full Overload

ParameterTypeDescription
regionStringAWS region (e.g., "us-east-1")
bucketStringS3 bucket name
folderStringKey prefix / folder path for uploaded objects
encodingTypeEncodingTypeFormat for serializing output data
credentialUsernamePasswordCredentialAccess key / secret key credential (null for default AWS credentials)
s3EndpointOverrideStringCustom S3-compatible endpoint URL (null for standard AWS S3)
info

More details about encoding type support can be found here.

Config Object

The full config object for the S3 sink node (S3SinkDto.Config):

FieldTypeRequiredDefaultDescription
regionStrStringYesAWS region (e.g., "us-east-1")
bucketNameStringYesS3 bucket name
keyNameStringYesKey prefix / folder path for uploaded objects
encodingTypeStringYesFormat for serializing output data. Supported values: CSV, JSON_OBJECT, JSON_ARRAY, JSON_OBJECT_LINE, PARQUET. See encoding constraints below
credentialIdStringNonullCredential ID for authentication
s3EndpointOverrideStringNonullCustom S3-compatible endpoint URL (e.g., MinIO)
batchingbooleanNotrueEnable batching mode
batchSizeintNo10000Number of records accumulated before flushing
flushIntervalMillislongNo10000Maximum time in milliseconds between flushes
avroSchemaMap<String, Object>NonullAvro schema definition for the record structure

Built-in Batching Behavior

The S3 sink operates in batching mode by default with the following internal defaults:

SettingDefaultDescription
Batch size10,000 recordsNumber of records accumulated before flushing
Flush interval10,000ms (10s)Maximum time between flushes regardless of batch size

These defaults can be overridden via the batchSize and flushIntervalMillis config fields.

Encoding Types

The following encoding types are supported:

Encoding TypeDescription
CSVComma-separated values
JSON_OBJECTSingle JSON object. Not compatible with batching mode
JSON_ARRAYArray of JSON objects
JSON_OBJECT_LINEOne JSON object per line (JSONL)
PARQUETApache Parquet format. Requires batching enabled and avroSchema provided

S3 Key Pattern

Files are date-partitioned using the current date:

{folder}/year=2024/month=01/day=15/{uuid}.{ext}

For example, with folder output/events and JSON_OBJECT_LINE encoding:

output/events/year=2024/month=01/day=15/a3f8b2c1-7d4e-4a9b-b5f6-1c2d3e4f5a6b.jsonl

Best Practices

Encoding Selection

  • JSON_OBJECT_LINE: Best general-purpose choice — one JSON object per line, widely supported by downstream tools
  • JSON_ARRAY: Use when downstream consumers expect a JSON array
  • CSV: Use for tabular data or when integrating with tools that prefer CSV

Security and Credentials

  • When no credential is provided, the S3 sink uses the default AWS credential chain (environment variables, IAM roles, etc.)
  • For explicit credentials, use UsernamePasswordCredential where username is the access key ID and password is the secret access key
  • Prefer IAM roles over explicit credentials in production environments

S3-Compatible Endpoints

  • Use the s3EndpointOverride parameter to connect to MinIO, LocalStack, or other S3-compatible services
  • The region parameter is still required even when using a custom endpoint

Common Issues and Solutions

Credential or Permission Errors

  • Verify the IAM user or role has s3:PutObject permission on the target bucket and key prefix
  • When using explicit credentials, ensure the access key and secret key are correct
  • For S3-compatible endpoints, confirm the endpoint URL is reachable and the credentials match

Upload Failures

The S3 sink automatically retries failed uploads with exponential backoff (up to 3 retries with doubling delay). If uploads continue to fail, check:

  • Network connectivity to the S3 endpoint
  • Bucket existence and region configuration
  • Available disk space for temporary files (batching uses temp files before upload)

Java SDK Usage

Basic Usage

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("broker-address:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.eval("dict_merge($, dict(processed=true))")
.s3Sink(
"us-east-1", // AWS region
"my-data-bucket", // S3 bucket name
"output/events", // Folder (key prefix)
EncodingType.JSON_OBJECT_LINE // Encoding type
);

With Explicit Credentials

UsernamePasswordCredential credential = new UsernamePasswordCredential(
"AKIAIOSFODNN7EXAMPLE", // Access key ID
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" // Secret access key
);

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource("kafka-broker:9092", "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.s3Sink(
"us-east-1",
"my-bucket",
"data/output",
EncodingType.JSON_OBJECT_LINE,
credential,
null // Use standard AWS S3 endpoint
);

Kafka-to-S3 Data Lake Pipeline

ZephFlow pipeline = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"raw-events",
"s3-archiver-group",
EncodingType.JSON_OBJECT,
null
)
.eval("dict_merge($, dict(" +
"event_date=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd\")," +
"processed=true" +
"))")
.s3Sink(
"us-east-1",
"data-lake-bucket",
"events/raw",
EncodingType.JSON_OBJECT_LINE
);

Log Archival with Filtering

ZephFlow logArchival = ZephFlow.startFlow()
.kafkaSource(
kafkaBroker,
"application-logs",
"log-archiver-group",
EncodingType.JSON_OBJECT,
null
)
.filter("$.level == \"ERROR\" or $.level == \"WARN\"")
.s3Sink(
"us-west-2",
"log-archive-bucket",
"logs/important",
EncodingType.CSV
);

S3-Compatible Storage (MinIO)

UsernamePasswordCredential minioCredential = new UsernamePasswordCredential(
"minio-access-key",
"minio-secret-key"
);

ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "consumer-group", EncodingType.JSON_OBJECT, null)
.s3Sink(
"us-east-1",
"my-minio-bucket",
"output/data",
EncodingType.JSON_OBJECT_LINE,
minioCredential,
"http://localhost:9000" // MinIO endpoint
);

Complete Pipeline

ZephFlow pipeline = ZephFlow.startFlow()
.kafkaSource(kafkaBroker, "input-topic", "processor-group", EncodingType.JSON_OBJECT, null)
.filter("$.status != null")
.eval("dict_merge($, dict(processed_at=epoch_to_ts_str($.timestamp, \"yyyy-MM-dd'T'HH:mm:ss\")))")
.assertion("$.processed_at != null")
.s3Sink("us-east-1", "output-bucket", "processed/events", EncodingType.JSON_OBJECT_LINE);

pipeline.execute("s3-pipeline-job", "production", "event-processor");