Skip to main content

Elasticsearch Source Node

The elasticsearchsource node queries an Elasticsearch index using the scroll API and emits each matching document as a record in the pipeline. It is a batch source — it pages through every hit until the result set is exhausted.

Typical use cases include reprocessing logs already indexed in Elasticsearch, exporting documents into another data store, or backfilling derived data when an enrichment rule changes.

Key Features

  • Scroll-based paging: uses Elasticsearch's _search/scroll API to page through arbitrarily large result sets
  • Optional query DSL: supply any valid query DSL fragment, or leave empty for match_all
  • Multi-index: index accepts comma-separated lists and wildcard patterns (logs-2026-04-*)
  • Document metadata: each emitted record carries the original _id and _index as event metadata

Configuration

FieldTypeRequiredDefaultDescription
hostStringYesBase URL of the Elasticsearch cluster, e.g. https://es.example.com:9200
credentialIdStringNoID of a UsernamePasswordCredential in jobContext.otherProperties. Omit for unauthenticated clusters
indexStringYesIndex name, comma-separated list, or wildcard pattern
queryStringNomatch_allElasticsearch query DSL fragment as JSON. Embedded under the query key of the search request
scrollTimeoutStringNo5mHow long the scroll context is kept alive between fetches. Elasticsearch duration syntax (30s, 1m, 5m)
batchSizeintNo500size parameter on the search request — number of hits per scroll page. Minimum: 1
encodingTypeStringYesFormat used to deserialize each document's _source. Typically JSON_OBJECT

Query DSL

The query field is a string containing a valid Elasticsearch query DSL fragment. The fetcher places the parsed JSON under the query key of the search body, so any clause that is valid there is accepted:

{"match_all": {}}
{"range": {"@timestamp": {"gte": "now-1d"}}}
{"bool": {"must": [{"term": {"status": "open"}}]}}

Leaving query blank is equivalent to passing {"match_all": {}}.

Authentication

The connector authenticates using HTTP Basic auth — username and password are taken from a UsernamePasswordCredential. API-key authentication is not supported.

jobContext:
otherProperties:
es-reader:
username: "logs-reader"
password: "secret"

The user must have permission to issue _search and _search/scroll requests against the target index.

DAG Example

jobContext:
otherProperties:
es-reader:
username: "logs-reader"
password: "secret"
metricTags: {}
dlqConfig:

dag:
- id: "source"
commandName: "elasticsearchsource"
config:
host: "https://es.example.com:9200"
credentialId: "es-reader"
index: "logs-2026-04-*"
query: |
{"bool": {"must": [{"match": {"level": "ERROR"}}, {"range": {"@timestamp": {"gte": "now-24h"}}}]}}
scrollTimeout: "5m"
batchSize: 1000
encodingType: "JSON_OBJECT"
outputs:
- "sink"

- id: "sink"
commandName: "stdout"
config:
encodingType: "JSON_OBJECT"

Output Format

Each emitted record is the parsed _source of an Elasticsearch document. The original document identifiers are attached as event metadata:

Metadata keyValue
_idDocument _id from Elasticsearch
_indexConcrete index the document was returned from

Tuning Notes

  • Scroll timeout: must be long enough to cover the time between two fetch() calls. Longer values are safer for slow downstream processing but consume more cluster memory.
  • Batch size: larger values reduce HTTP round trips but increase per-request memory pressure on the cluster.