Pub/Sub Source Node
The pubsubsource node pulls messages from a Google Cloud Pub/Sub subscription and emits each message as a pipeline record.
Messages are automatically acknowledged after successful processing. The node uses long polling by default for low-latency delivery and runs indefinitely until the pipeline is terminated. Both service account credentials and Application Default Credentials are supported.
Key Features
- Configurable batch pull: control how many messages are pulled per request with
maxMessages - Automatic acknowledgement: messages are acked after successful processing
- Long polling by default: reduces latency by waiting for messages rather than returning immediately
- ADC support: authenticate with a service account JSON credential or Application Default Credentials
- Infinite streaming: runs until the pipeline is terminated
Configuration
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
projectId | String | No | — | GCP project ID. If omitted, the projectId from the resolved GcpCredential is used; an error is raised if neither is set. |
subscription | String | Yes | — | Pub/Sub subscription name only (e.g. my-sub). The full path projects/<projectId>/subscriptions/<subscription> is constructed automatically. |
encodingType | String | Yes | — | Message data format. See Encoding Types |
credentialId | String | No | — | ID of credentials in jobContext.otherProperties. Omit to use default credential chain. |
maxMessages | Integer | No | 100 | Maximum messages to pull per request (max 1000) |
returnImmediately | Boolean | No | false | When true, returns immediately even if no messages are available (disables long polling) |
ackDeadlineExtensionSeconds | Integer | No | — | Seconds to extend the acknowledgement deadline while processing (max 600) |
GCP Permissions
The service account used must have the pubsub.subscriptions.consume permission on the target subscription. The built-in roles/pubsub.subscriber role includes this permission.
Minimal IAM binding example (via gcloud):
gcloud pubsub subscriptions add-iam-policy-binding my-sub \
--member="serviceAccount:my-sa@my-project.iam.gserviceaccount.com" \
--role="roles/pubsub.subscriber"
DAG Example
jobContext:
otherProperties:
gcp-cred:
authType: SERVICE_ACCOUNT_JSON_KEYFILE
jsonKeyContent: '{"type":"service_account","project_id":"my-project",...}'
projectId: "my-project"
metricTags: {}
dlqConfig:
dag:
- id: "source"
commandName: "pubsubsource"
config:
projectId: "my-project"
subscription: "my-sub"
encodingType: "JSON_OBJECT"
credentialId: "gcp-cred"
maxMessages: 100
outputs:
- "sink"
- id: "sink"
commandName: "stdout"
config:
encodingType: "JSON_OBJECT"
Related Nodes
- pubsubsink: Publish pipeline records to a Pub/Sub topic
- kafkasource: Consume messages from Apache Kafka topics