SQS Source Node
The sqssource node continuously reads messages from an Amazon SQS queue using long polling and emits them as events into the pipeline. Messages are automatically deleted after successful processing.
Both standard queues and FIFO queues are supported. The source uses the AWS SDK v2 and authenticates via explicit credentials or the default AWS credential chain (environment variables, instance profile, etc.).
Key Features
- Long polling: reduces empty receive calls and latency using configurable wait time (up to 20 seconds)
- Automatic deletion: successfully processed messages are deleted from the queue
- Configurable visibility timeout: prevents other consumers from seeing in-flight messages
- Infinite streaming: never exhausts — runs until the pipeline is terminated
Configuration
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
queueUrl | String | Yes | — | Full SQS queue URL, e.g. https://sqs.us-east-1.amazonaws.com/123456789/my-queue |
regionStr | String | Yes | — | AWS region, e.g. us-east-1 |
encodingType | String | Yes | — | Message format. See Encoding Types |
credentialId | String | No | — | ID of AWS credentials in jobContext.otherProperties. Omit to use default credential chain |
maxNumberOfMessages | int | No | 10 | Messages to retrieve per poll (max 10, as per SQS limit) |
waitTimeSeconds | int | No | 20 | Long polling wait time in seconds (max 20) |
visibilityTimeoutSeconds | int | No | 30 | How long a received message is hidden from other consumers |
Getting the Queue URL
The full queue URL is required — not just the queue name. Retrieve it with:
aws sqs get-queue-url --queue-name my-queue
Output:
{
"QueueUrl": "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
}
DAG Example
jobContext:
otherProperties:
aws-cred:
username: AKIAIOSFODNN7EXAMPLE
password: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
metricTags: {}
dlqConfig:
dag:
- id: "source"
commandName: "sqssource"
config:
queueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
regionStr: "us-east-1"
encodingType: "JSON_OBJECT"
credentialId: "aws-cred"
maxNumberOfMessages: 10
waitTimeSeconds: 20
outputs:
- "sink"
- id: "sink"
commandName: "stdout"
config:
encodingType: "JSON_OBJECT"
IAM Permissions
The AWS identity used must have the following SQS permissions on the target queue:
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:us-east-1:123456789012:my-queue"
}
Related Nodes
- sqssink: Write records to an SQS queue
- kafkasource: Alternative stream source for Kafka topics