SQS Sink Node
The sqssink node sends pipeline records as messages to an Amazon SQS queue using the batch send API. Each record becomes one SQS message. Both standard and FIFO queues are supported.
The sink uses the AWS SDK v2 and authenticates via explicit credentials or the default AWS credential chain (environment variables, instance profile, etc.).
Key Features
- Batch sending: up to 10 messages per SQS
SendMessageBatchcall for efficiency - FIFO queue support: optionally compute
MessageGroupIdandMessageDeduplicationIdfrom record fields via FEEL expressions - Flexible encoding: serialize records as JSON objects, JSON lines, or other supported formats
Configuration
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
queueUrl | String | Yes | — | Full SQS queue URL, e.g. https://sqs.us-east-1.amazonaws.com/123456789/my-queue |
regionStr | String | Yes | — | AWS region, e.g. us-east-1 |
encodingType | String | Yes | — | Required and validated against the supported Encoding Types; note that message bodies are always serialized as JSON objects |
credentialId | String | No | — | ID of AWS credentials in jobContext.otherProperties. Omit to use default credential chain |
messageGroupIdExpression | String | No | — | Path expression evaluated against each record to produce the FIFO MessageGroupId |
deduplicationIdExpression | String | No | — | Path expression evaluated against each record to produce the MessageDeduplicationId |
batchSize | Integer | No | 10 | Messages per SendMessageBatch call (max 10, as per SQS limit) |
FIFO Queues
For FIFO queues (queue name ending in .fifo), set messageGroupIdExpression to a path expression that returns a string group identifier. Optionally set deduplicationIdExpression if content-based deduplication is not enabled on the queue.
config:
queueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/alerts.fifo"
regionStr: "us-east-1"
encodingType: "JSON_OBJECT"
messageGroupIdExpression: "$.tenant_id"
deduplicationIdExpression: "$.event_id"
For standard queues, omit both FIFO fields.
DAG Example
jobContext:
otherProperties:
aws-cred:
username: AKIAIOSFODNN7EXAMPLE
password: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
metricTags: {}
dlqConfig:
dag:
- id: "source"
commandName: "stdin"
config:
encodingType: "JSON_OBJECT"
outputs:
- "sink"
- id: "sink"
commandName: "sqssink"
config:
queueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
regionStr: "us-east-1"
encodingType: "JSON_OBJECT"
credentialId: "aws-cred"
IAM Permissions
The AWS identity used must have sqs:SendMessage on the target queue:
{
"Effect": "Allow",
"Action": [
"sqs:SendMessage",
"sqs:GetQueueUrl"
],
"Resource": "arn:aws:sqs:us-east-1:123456789012:my-queue"
}