Skip to main content

Pub/Sub Sink Node

The pubsubsink node publishes pipeline records as messages to a Google Cloud Pub/Sub topic in batches.

Records are batched before publishing for efficiency. An optional path expression can derive a message ordering key from each record's fields. Both service account credentials and Application Default Credentials are supported.

Key Features

  • Batched publishing: groups records into publish requests for efficient throughput
  • Optional ordering key: derive a Pub/Sub message ordering key from record fields using a FEEL expression
  • ADC support: authenticate with a service account JSON credential or Application Default Credentials

Configuration

FieldTypeRequiredDefaultDescription
projectIdStringNoGCP project ID. If omitted, the projectId from the resolved GcpCredential is used; an error is raised if neither is set.
topicStringYesPub/Sub topic name only (e.g. my-topic). The full path projects/<projectId>/topics/<topic> is constructed automatically.
encodingTypeStringYesMessage data format. See Encoding Types
credentialIdStringNoID of credentials in jobContext.otherProperties. Omit to use default credential chain.
orderingKeyExpressionStringNoPath expression (e.g. $.tenant_id) evaluated against each record to produce the Pub/Sub message ordering key
batchSizeIntegerNo100Records per publish batch (max 1000)

GCP Permissions

The service account used must have the pubsub.topics.publish permission on the target topic. The built-in roles/pubsub.publisher role includes this permission.

Minimal IAM binding example (via gcloud):

gcloud pubsub topics add-iam-policy-binding my-topic \
--member="serviceAccount:my-sa@my-project.iam.gserviceaccount.com" \
--role="roles/pubsub.publisher"

DAG Example

jobContext:
otherProperties:
gcp-cred:
authType: SERVICE_ACCOUNT_JSON_KEYFILE
jsonKeyContent: '{"type":"service_account","project_id":"my-project",...}'
projectId: "my-project"
metricTags: {}
dlqConfig:

dag:
- id: "source"
commandName: "stdin"
config:
encodingType: "JSON_OBJECT"
outputs:
- "sink"

- id: "sink"
commandName: "pubsubsink"
config:
projectId: "my-project"
topic: "my-topic"
encodingType: "JSON_OBJECT"
credentialId: "gcp-cred"
orderingKeyExpression: "$.tenant_id"
batchSize: 100
  • pubsubsource: Pull messages from a Pub/Sub subscription
  • kafkasink: Publish pipeline records to Apache Kafka topics