Skip to main content

GCS Sink Node

The gcssink node writes pipeline records to a Google Cloud Storage bucket as newline-delimited JSON (.jsonl) blobs. It batches records in memory and uploads each batch as a single object, organized into date-partitioned virtual folders.

Typical use cases include archiving streaming events to cold storage, producing date-partitioned data lake landings consumed by BigQuery or Dataflow, and regulatory retention of processed records.

Key Features

  • Batched NDJSON writes: records accumulate until the batch reaches batchSize, then are uploaded as a single .jsonl object
  • Date-partitioned naming: every object name embeds a yyyy/MM/dd/HH-mm-ss path component plus a UUID, ready for tools that expect Hive-style partitions
  • Credential modes: service account JSON keyfile, OAuth access token, or Application Default Credentials

Configuration

FieldTypeRequiredDefaultDescription
bucketNameStringYesGCS bucket to write to
objectPrefixStringNoevents/Prefix prepended to every generated object name. Include a trailing / to act as a virtual folder
credentialIdStringNoID of a GcpCredential in jobContext.otherProperties. Omit to use Application Default Credentials
batchSizeIntegerNo100Number of records buffered in memory before flushing as a single object. Minimum: 1

Object Naming

Every object the sink writes follows the pattern:

{objectPrefix}{yyyy/MM/dd/HH-mm-ss}-{uuid}.jsonl

For example, with objectPrefix: "events/":

events/2026/04/27/12-34-56-3f1a8b9d-2c5e-4a8c-9d6e-7b1c2a3d4e5f.jsonl

The content type is set to application/x-ndjson and each line in the blob is one record from the workflow batch.

DAG Example

jobContext:
otherProperties:
gcs-cred:
authType: SERVICE_ACCOUNT_JSON_KEYFILE
jsonKeyContent: |
{
"type": "service_account",
"project_id": "my-project",
"private_key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n",
"client_email": "writer@my-project.iam.gserviceaccount.com"
}
projectId: "my-project"
metricTags: {}
dlqConfig:

dag:
- id: "source"
commandName: "kafkasource"
config:
broker: "kafka:9092"
topic: "raw-events"
groupId: "gcs-archiver"
encodingType: "JSON_OBJECT"
outputs:
- "sink"

- id: "sink"
commandName: "gcssink"
config:
bucketName: "my-data-lake"
objectPrefix: "archive/orders/"
credentialId: "gcs-cred"
batchSize: 500

Credentials

credentialId resolves a GcpCredential from jobContext.otherProperties:

authTypeRequired fields
SERVICE_ACCOUNT_JSON_KEYFILEjsonKeyContent, projectId (optional)
ACCESS_TOKENaccessToken, projectId (optional)
APPLICATION_DEFAULTnone — relies on the runtime's ADC chain

If credentialId is omitted, the sink uses Application Default Credentials. The credential principal needs at least the Storage Object Creator role on the bucket.

Tuning Batch Size

batchSize is a tradeoff between write frequency and memory:

  • Larger batches reduce GCS write requests and produce fewer, larger objects (cheaper to scan downstream).
  • Smaller batches lower memory usage and reduce time-to-availability of records in storage.

The default of 100 is conservative; for high-throughput pipelines 100010000 is typical.

  • gcssource: Read objects from a Google Cloud Storage bucket
  • s3sink: Write records to AWS S3
  • kafkasink: Stream records to a Kafka topic