Skip to main content

SQS Sink Node

The sqssink node sends pipeline records as messages to an Amazon SQS queue using the batch send API. Each record becomes one SQS message. Both standard and FIFO queues are supported.

The sink uses the AWS SDK v2 and authenticates via explicit credentials or the default AWS credential chain (environment variables, instance profile, etc.).

Key Features

  • Batch sending: up to 10 messages per SQS SendMessageBatch call for efficiency
  • FIFO queue support: optionally compute MessageGroupId and MessageDeduplicationId from record fields via FEEL expressions
  • Flexible encoding: serialize records as JSON objects, JSON lines, or other supported formats

Configuration

FieldTypeRequiredDefaultDescription
queueUrlStringYesFull SQS queue URL, e.g. https://sqs.us-east-1.amazonaws.com/123456789/my-queue
regionStrStringYesAWS region, e.g. us-east-1
encodingTypeStringYesRequired and validated against the supported Encoding Types; note that message bodies are always serialized as JSON objects
credentialIdStringNoID of AWS credentials in jobContext.otherProperties. Omit to use default credential chain
messageGroupIdExpressionStringNoPath expression evaluated against each record to produce the FIFO MessageGroupId
deduplicationIdExpressionStringNoPath expression evaluated against each record to produce the MessageDeduplicationId
batchSizeIntegerNo10Messages per SendMessageBatch call (max 10, as per SQS limit)

FIFO Queues

For FIFO queues (queue name ending in .fifo), set messageGroupIdExpression to a path expression that returns a string group identifier. Optionally set deduplicationIdExpression if content-based deduplication is not enabled on the queue.

config:
queueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/alerts.fifo"
regionStr: "us-east-1"
encodingType: "JSON_OBJECT"
messageGroupIdExpression: "$.tenant_id"
deduplicationIdExpression: "$.event_id"

For standard queues, omit both FIFO fields.

DAG Example

jobContext:
otherProperties:
aws-cred:
username: AKIAIOSFODNN7EXAMPLE
password: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
metricTags: {}
dlqConfig:

dag:
- id: "source"
commandName: "stdin"
config:
encodingType: "JSON_OBJECT"
outputs:
- "sink"

- id: "sink"
commandName: "sqssink"
config:
queueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
regionStr: "us-east-1"
encodingType: "JSON_OBJECT"
credentialId: "aws-cred"

IAM Permissions

The AWS identity used must have sqs:SendMessage on the target queue:

{
"Effect": "Allow",
"Action": [
"sqs:SendMessage",
"sqs:GetQueueUrl"
],
"Resource": "arn:aws:sqs:us-east-1:123456789012:my-queue"
}
  • sqssource: Read messages from an SQS queue
  • kafkasink: Alternative stream sink for Kafka topics