Skip to main content

SQS Source Node

The sqssource node continuously reads messages from an Amazon SQS queue using long polling and emits them as events into the pipeline. Messages are automatically deleted after successful processing.

Both standard queues and FIFO queues are supported. The source uses the AWS SDK v2 and authenticates via explicit credentials or the default AWS credential chain (environment variables, instance profile, etc.).

Key Features

  • Long polling: reduces empty receive calls and latency using configurable wait time (up to 20 seconds)
  • Automatic deletion: successfully processed messages are deleted from the queue
  • Configurable visibility timeout: prevents other consumers from seeing in-flight messages
  • Infinite streaming: never exhausts — runs until the pipeline is terminated

Configuration

FieldTypeRequiredDefaultDescription
queueUrlStringYesFull SQS queue URL, e.g. https://sqs.us-east-1.amazonaws.com/123456789/my-queue
regionStrStringYesAWS region, e.g. us-east-1
encodingTypeStringYesMessage format. See Encoding Types
credentialIdStringNoID of AWS credentials in jobContext.otherProperties. Omit to use default credential chain
maxNumberOfMessagesintNo10Messages to retrieve per poll (max 10, as per SQS limit)
waitTimeSecondsintNo20Long polling wait time in seconds (max 20)
visibilityTimeoutSecondsintNo30How long a received message is hidden from other consumers

Getting the Queue URL

The full queue URL is required — not just the queue name. Retrieve it with:

aws sqs get-queue-url --queue-name my-queue

Output:

{
"QueueUrl": "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
}

DAG Example

jobContext:
otherProperties:
aws-cred:
username: AKIAIOSFODNN7EXAMPLE
password: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
metricTags: {}
dlqConfig:

dag:
- id: "source"
commandName: "sqssource"
config:
queueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
regionStr: "us-east-1"
encodingType: "JSON_OBJECT"
credentialId: "aws-cred"
maxNumberOfMessages: 10
waitTimeSeconds: 20
outputs:
- "sink"

- id: "sink"
commandName: "stdout"
config:
encodingType: "JSON_OBJECT"

IAM Permissions

The AWS identity used must have the following SQS permissions on the target queue:

{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:us-east-1:123456789012:my-queue"
}
  • sqssink: Write records to an SQS queue
  • kafkasource: Alternative stream source for Kafka topics