Pub/Sub Sink Node
The pubsubsink node publishes pipeline records as messages to a Google Cloud Pub/Sub topic in batches.
Records are batched before publishing for efficiency. An optional path expression can derive a message ordering key from each record's fields. Both service account credentials and Application Default Credentials are supported.
Key Features
- Batched publishing: groups records into publish requests for efficient throughput
- Optional ordering key: derive a Pub/Sub message ordering key from record fields using a FEEL expression
- ADC support: authenticate with a service account JSON credential or Application Default Credentials
Configuration
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
projectId | String | No | — | GCP project ID. If omitted, the projectId from the resolved GcpCredential is used; an error is raised if neither is set. |
topic | String | Yes | — | Pub/Sub topic name only (e.g. my-topic). The full path projects/<projectId>/topics/<topic> is constructed automatically. |
encodingType | String | Yes | — | Message data format. See Encoding Types |
credentialId | String | No | — | ID of credentials in jobContext.otherProperties. Omit to use default credential chain. |
orderingKeyExpression | String | No | — | Path expression (e.g. $.tenant_id) evaluated against each record to produce the Pub/Sub message ordering key |
batchSize | Integer | No | 100 | Records per publish batch (max 1000) |
GCP Permissions
The service account used must have the pubsub.topics.publish permission on the target topic. The built-in roles/pubsub.publisher role includes this permission.
Minimal IAM binding example (via gcloud):
gcloud pubsub topics add-iam-policy-binding my-topic \
--member="serviceAccount:my-sa@my-project.iam.gserviceaccount.com" \
--role="roles/pubsub.publisher"
DAG Example
jobContext:
otherProperties:
gcp-cred:
authType: SERVICE_ACCOUNT_JSON_KEYFILE
jsonKeyContent: '{"type":"service_account","project_id":"my-project",...}'
projectId: "my-project"
metricTags: {}
dlqConfig:
dag:
- id: "source"
commandName: "stdin"
config:
encodingType: "JSON_OBJECT"
outputs:
- "sink"
- id: "sink"
commandName: "pubsubsink"
config:
projectId: "my-project"
topic: "my-topic"
encodingType: "JSON_OBJECT"
credentialId: "gcp-cred"
orderingKeyExpression: "$.tenant_id"
batchSize: 100
Related Nodes
- pubsubsource: Pull messages from a Pub/Sub subscription
- kafkasink: Publish pipeline records to Apache Kafka topics