Skip to main content

GCS Source Node

The gcssource node reads objects from a Google Cloud Storage bucket and emits each object's decoded content as records in the pipeline. It is a batch source — it lists every matching blob in the bucket, downloads each one, and terminates when all blobs are consumed.

Typical use cases include replaying archived events stored as .jsonl exports, ingesting CSV reports dropped into a bucket by another system, or one-off backfills from cold storage.

Key Features

  • Prefix filtering: only blobs whose name starts with objectPrefix are read
  • Multiple encodings: each blob is decoded with the configured encodingType (CSV, JSON, NDJSON, XML, Parquet, plain text)
  • Credential modes: service account JSON keyfile, OAuth access token, or Application Default Credentials
  • Batch source: runs the listing once and terminates when all blobs are consumed

Configuration

FieldTypeRequiredDefaultDescription
bucketNameStringYesGCS bucket to read from
objectPrefixStringNoPrefix used to filter blobs. Leave empty to read every object
encodingTypeStringYesFormat used to decode each object's bytes. Supported values: CSV, JSON_OBJECT, JSON_ARRAY, JSON_OBJECT_LINE, STRING_LINE, TEXT, XML, PARQUET
credentialIdStringNoID of a GcpCredential in jobContext.otherProperties. Omit to use Application Default Credentials

Encoding Types

The encodingType controls how each downloaded blob's bytes are turned into pipeline records. See encoding for the full reference.

EncodingBehavior
JSON_OBJECT_LINEOne JSON object per line — most common for archived event streams
JSON_ARRAYA JSON array; each element becomes one record
JSON_OBJECTOne JSON object per blob
CSVComma-separated rows, first row treated as header
STRING_LINE / TEXTOne record per line of text
XMLParsed XML element
PARQUETApache Parquet file

DAG Example

jobContext:
otherProperties:
gcs-cred:
authType: SERVICE_ACCOUNT_JSON_KEYFILE
jsonKeyContent: |
{
"type": "service_account",
"project_id": "my-project",
"private_key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n",
"client_email": "reader@my-project.iam.gserviceaccount.com"
}
projectId: "my-project"
metricTags: {}
dlqConfig:

dag:
- id: "source"
commandName: "gcssource"
config:
bucketName: "my-data-lake"
objectPrefix: "events/2026/04/"
encodingType: "JSON_OBJECT_LINE"
credentialId: "gcs-cred"
outputs:
- "sink"

- id: "sink"
commandName: "stdout"
config:
encodingType: "JSON_OBJECT"

Credentials

credentialId resolves a GcpCredential from jobContext.otherProperties. The credential's authType determines how authentication is performed:

authTypeRequired fields
SERVICE_ACCOUNT_JSON_KEYFILEjsonKeyContent (full keyfile JSON), projectId (optional)
ACCESS_TOKENaccessToken (short-lived ya29.* token), projectId (optional)
APPLICATION_DEFAULTnone — relies on the runtime's ADC chain

If credentialId is omitted, the source uses Application Default Credentials.

jobContext:
otherProperties:
gcs-cred:
authType: ACCESS_TOKEN
accessToken: "ya29.c.b0AXv..."
projectId: "my-project"

The credential principal needs at least the Storage Object Viewer role on the bucket.

  • gcssink: Write records back to a Google Cloud Storage bucket
  • s3sink: Write records to AWS S3
  • kafkasource: Stream-based source for Kafka topics