Skip to main content

SQS Sink Node

The sqssink node sends pipeline records as messages to an Amazon SQS queue using the batch send API. Each record becomes one SQS message. Both standard and FIFO queues are supported.

The sink uses the AWS SDK v2 and authenticates via explicit credentials or the default AWS credential chain (environment variables, instance profile, etc.).

Key Features

  • Batch sending: up to 10 messages per SQS SendMessageBatch call for efficiency
  • FIFO queue support: optionally compute MessageGroupId and MessageDeduplicationId from event fields via FEEL expressions
  • Flexible encoding: serialize events as JSON objects, JSON lines, or other supported formats

Configuration

FieldTypeRequiredDefaultDescription
queueUrlStringYesFull SQS queue URL, e.g. https://sqs.us-east-1.amazonaws.com/123456789/my-queue
regionStrStringYesAWS region, e.g. us-east-1
encodingTypeStringYesMessage body format. See Encoding Types
credentialIdStringNoID of AWS credentials in jobContext.otherProperties. Omit to use default credential chain
messageGroupIdExpressionStringNoFEEL expression evaluated against each record to produce the FIFO MessageGroupId
deduplicationIdExpressionStringNoFEEL expression evaluated against each record to produce the MessageDeduplicationId
batchSizeintNo10Messages per SendMessageBatch call (max 10, as per SQS limit)

FIFO Queues

For FIFO queues (queue name ending in .fifo), set messageGroupIdExpression to a FEEL expression that returns a string group identifier. Optionally set deduplicationIdExpression if content-based deduplication is not enabled on the queue.

config:
queueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/alerts.fifo"
regionStr: "us-east-1"
encodingType: "JSON_OBJECT"
messageGroupIdExpression: "$.tenant_id"
deduplicationIdExpression: "$.event_id"

For standard queues, omit both FIFO fields.

DAG Example

jobContext:
otherProperties:
aws-cred:
username: AKIAIOSFODNN7EXAMPLE
password: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
metricTags: {}
dlqConfig:

dag:
- id: "source"
commandName: "stdin"
config:
encodingType: "JSON_OBJECT"
outputs:
- "sink"

- id: "sink"
commandName: "sqssink"
config:
queueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
regionStr: "us-east-1"
encodingType: "JSON_OBJECT"
credentialId: "aws-cred"

IAM Permissions

The AWS identity used must have sqs:SendMessage on the target queue:

{
"Effect": "Allow",
"Action": [
"sqs:SendMessage",
"sqs:GetQueueUrl"
],
"Resource": "arn:aws:sqs:us-east-1:123456789012:my-queue"
}
  • sqssource: Read messages from an SQS queue
  • kafkasink: Alternative stream sink for Kafka topics