Skip to main content

Pub/Sub Source Node

The pubsubsource node pulls messages from a Google Cloud Pub/Sub subscription and emits each message as a pipeline record.

Messages are automatically acknowledged after successful processing. The node uses long polling by default for low-latency delivery and runs indefinitely until the pipeline is terminated. Both service account credentials and Application Default Credentials are supported.

Key Features

  • Configurable batch pull: control how many messages are pulled per request with maxMessages
  • Automatic acknowledgement: messages are acked after successful processing
  • Long polling by default: reduces latency by waiting for messages rather than returning immediately
  • ADC support: authenticate with a service account JSON credential or Application Default Credentials
  • Infinite streaming: runs until the pipeline is terminated

Configuration

FieldTypeRequiredDefaultDescription
projectIdStringNoGCP project ID. If omitted, the projectId from the resolved GcpCredential is used; an error is raised if neither is set.
subscriptionStringYesPub/Sub subscription name only (e.g. my-sub). The full path projects/<projectId>/subscriptions/<subscription> is constructed automatically.
encodingTypeStringYesMessage data format. See Encoding Types
credentialIdStringNoID of credentials in jobContext.otherProperties. Omit to use default credential chain.
maxMessagesIntegerNo100Maximum messages to pull per request (max 1000)
returnImmediatelyBooleanNofalseWhen true, returns immediately even if no messages are available (disables long polling)
ackDeadlineExtensionSecondsIntegerNoSeconds to extend the acknowledgement deadline while processing (max 600)

GCP Permissions

The service account used must have the pubsub.subscriptions.consume permission on the target subscription. The built-in roles/pubsub.subscriber role includes this permission.

Minimal IAM binding example (via gcloud):

gcloud pubsub subscriptions add-iam-policy-binding my-sub \
--member="serviceAccount:my-sa@my-project.iam.gserviceaccount.com" \
--role="roles/pubsub.subscriber"

DAG Example

jobContext:
otherProperties:
gcp-cred:
authType: SERVICE_ACCOUNT_JSON_KEYFILE
jsonKeyContent: '{"type":"service_account","project_id":"my-project",...}'
projectId: "my-project"
metricTags: {}
dlqConfig:

dag:
- id: "source"
commandName: "pubsubsource"
config:
projectId: "my-project"
subscription: "my-sub"
encodingType: "JSON_OBJECT"
credentialId: "gcp-cred"
maxMessages: 100
outputs:
- "sink"

- id: "sink"
commandName: "stdout"
config:
encodingType: "JSON_OBJECT"
  • pubsubsink: Publish pipeline records to a Pub/Sub topic
  • kafkasource: Consume messages from Apache Kafka topics