Skip to main content

Elasticsearch Sink Node

The elasticsearchsink node writes pipeline records to an Elasticsearch index using the Bulk API. Records are buffered in memory and flushed as a single NDJSON bulk request when the batch reaches batchSize.

Typical use cases include indexing enriched events for full-text search, hydrating Kibana dashboards from a streaming pipeline, and mirroring records from another data store into Elasticsearch.

Key Features

  • Bulk API: every flush is a single /_bulk POST containing the whole batch as NDJSON
  • Partial-failure handling: if Elasticsearch reports per-item errors, only the failed items are routed to the configured error/DLQ output; successful items are still committed
  • Configurable batching: tune batchSize to balance throughput against latency

Configuration

FieldTypeRequiredDefaultDescription
hostStringYesBase URL of the Elasticsearch cluster, e.g. https://es.example.com:9200
credentialIdStringNoID of a UsernamePasswordCredential in jobContext.otherProperties. Omit for unauthenticated clusters. API-key auth is not supported
indexStringYesDestination index name
batchSizeIntegerNo500Number of records buffered before a bulk request is sent. Minimum: 1

Authentication

The connector authenticates using HTTP Basic auth. The user must have create and index privileges on the target index.

jobContext:
otherProperties:
es-writer:
username: "events-writer"
password: "secret"

If credentialId is omitted, no authentication header is sent.

Bulk Request Format

Every flush serializes the batch as NDJSON in the form expected by /_bulk:

{"index": {"_index": "workflow-events"}}
{"id": "evt-1", "level": "INFO", "message": "..."}
{"index": {"_index": "workflow-events"}}
{"id": "evt-2", "level": "WARN", "message": "..."}

Each record uses the index action — Elasticsearch assigns a document _id automatically.

Error Handling

OutcomeBehavior
HTTP non-200 from /_bulkThe whole batch is reported as failed and routed to the error/DLQ output
HTTP 200 with errors: falseAll records are counted as successful
HTTP 200 with errors: truePer-item statuses are inspected; successful items are committed and only the failed items are routed to error output with the cluster-supplied error reason

DAG Example

jobContext:
otherProperties:
es-writer:
username: "events-writer"
password: "secret"
metricTags: {}
dlqConfig:

dag:
- id: "source"
commandName: "kafkasource"
config:
broker: "kafka:9092"
topic: "processed-events"
groupId: "es-indexer"
encodingType: "JSON_OBJECT"
outputs:
- "sink"

- id: "sink"
commandName: "elasticsearchsink"
config:
host: "https://es.example.com:9200"
credentialId: "es-writer"
index: "workflow-events"
batchSize: 1000

Tuning Batch Size

batchSize is a tradeoff between throughput and latency:

  • Larger batches reduce request overhead and increase indexing throughput.
  • Smaller batches keep records visible in Elasticsearch sooner and reduce per-request memory usage on the cluster.

The default of 500 is a reasonable starting point; high-volume pipelines can raise it to 20005000.