Elasticsearch Sink Node
The elasticsearchsink node writes pipeline records to an Elasticsearch index using the Bulk API. Records are buffered in memory and flushed as a single NDJSON bulk request when the batch reaches batchSize.
Typical use cases include indexing enriched events for full-text search, hydrating Kibana dashboards from a streaming pipeline, and mirroring records from another data store into Elasticsearch.
Key Features
- Bulk API: every flush is a single
/_bulkPOST containing the whole batch as NDJSON - Partial-failure handling: if Elasticsearch reports per-item errors, only the failed items are routed to the configured error/DLQ output; successful items are still committed
- Configurable batching: tune
batchSizeto balance throughput against latency
Configuration
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
host | String | Yes | — | Base URL of the Elasticsearch cluster, e.g. https://es.example.com:9200 |
credentialId | String | No | — | ID of a UsernamePasswordCredential in jobContext.otherProperties. Omit for unauthenticated clusters. API-key auth is not supported |
index | String | Yes | — | Destination index name |
batchSize | Integer | No | 500 | Number of records buffered before a bulk request is sent. Minimum: 1 |
Authentication
The connector authenticates using HTTP Basic auth. The user must have create and index privileges on the target index.
jobContext:
otherProperties:
es-writer:
username: "events-writer"
password: "secret"
If credentialId is omitted, no authentication header is sent.
Bulk Request Format
Every flush serializes the batch as NDJSON in the form expected by /_bulk:
{"index": {"_index": "workflow-events"}}
{"id": "evt-1", "level": "INFO", "message": "..."}
{"index": {"_index": "workflow-events"}}
{"id": "evt-2", "level": "WARN", "message": "..."}
Each record uses the index action — Elasticsearch assigns a document _id automatically.
Error Handling
| Outcome | Behavior |
|---|---|
HTTP non-200 from /_bulk | The whole batch is reported as failed and routed to the error/DLQ output |
HTTP 200 with errors: false | All records are counted as successful |
HTTP 200 with errors: true | Per-item statuses are inspected; successful items are committed and only the failed items are routed to error output with the cluster-supplied error reason |
DAG Example
jobContext:
otherProperties:
es-writer:
username: "events-writer"
password: "secret"
metricTags: {}
dlqConfig:
dag:
- id: "source"
commandName: "kafkasource"
config:
broker: "kafka:9092"
topic: "processed-events"
groupId: "es-indexer"
encodingType: "JSON_OBJECT"
outputs:
- "sink"
- id: "sink"
commandName: "elasticsearchsink"
config:
host: "https://es.example.com:9200"
credentialId: "es-writer"
index: "workflow-events"
batchSize: 1000
Tuning Batch Size
batchSize is a tradeoff between throughput and latency:
- Larger batches reduce request overhead and increase indexing throughput.
- Smaller batches keep records visible in Elasticsearch sooner and reduce per-request memory usage on the cluster.
The default of 500 is a reasonable starting point; high-volume pipelines can raise it to 2000–5000.
Related Nodes
- elasticsearchsource: Read documents from an Elasticsearch index
- kafkasink: Stream records to a Kafka topic
- s3sink: Write records to AWS S3