Elasticsearch Source Node
The elasticsearchsource node queries an Elasticsearch index using the scroll API and emits each matching document as a record in the pipeline. It is a batch source — it pages through every hit until the result set is exhausted.
Typical use cases include reprocessing logs already indexed in Elasticsearch, exporting documents into another data store, or backfilling derived data when an enrichment rule changes.
Key Features
- Scroll-based paging: uses Elasticsearch's
_search/scrollAPI to page through arbitrarily large result sets - Optional query DSL: supply any valid query DSL fragment, or leave empty for
match_all - Multi-index:
indexaccepts comma-separated lists and wildcard patterns (logs-2026-04-*) - Document metadata: each emitted record carries the original
_idand_indexas event metadata
Configuration
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
host | String | Yes | — | Base URL of the Elasticsearch cluster, e.g. https://es.example.com:9200 |
credentialId | String | No | — | ID of a UsernamePasswordCredential in jobContext.otherProperties. Omit for unauthenticated clusters |
index | String | Yes | — | Index name, comma-separated list, or wildcard pattern |
query | String | No | match_all | Elasticsearch query DSL fragment as JSON. Embedded under the query key of the search request |
scrollTimeout | String | No | 5m | How long the scroll context is kept alive between fetches. Elasticsearch duration syntax (30s, 1m, 5m) |
batchSize | int | No | 500 | size parameter on the search request — number of hits per scroll page. Minimum: 1 |
encodingType | String | Yes | — | Format used to deserialize each document's _source. Typically JSON_OBJECT |
Query DSL
The query field is a string containing a valid Elasticsearch query DSL fragment. The fetcher places the parsed JSON under the query key of the search body, so any clause that is valid there is accepted:
{"match_all": {}}
{"range": {"@timestamp": {"gte": "now-1d"}}}
{"bool": {"must": [{"term": {"status": "open"}}]}}
Leaving query blank is equivalent to passing {"match_all": {}}.
Authentication
The connector authenticates using HTTP Basic auth — username and password are taken from a UsernamePasswordCredential. API-key authentication is not supported.
jobContext:
otherProperties:
es-reader:
username: "logs-reader"
password: "secret"
The user must have permission to issue _search and _search/scroll requests against the target index.
DAG Example
jobContext:
otherProperties:
es-reader:
username: "logs-reader"
password: "secret"
metricTags: {}
dlqConfig:
dag:
- id: "source"
commandName: "elasticsearchsource"
config:
host: "https://es.example.com:9200"
credentialId: "es-reader"
index: "logs-2026-04-*"
query: |
{"bool": {"must": [{"match": {"level": "ERROR"}}, {"range": {"@timestamp": {"gte": "now-24h"}}}]}}
scrollTimeout: "5m"
batchSize: 1000
encodingType: "JSON_OBJECT"
outputs:
- "sink"
- id: "sink"
commandName: "stdout"
config:
encodingType: "JSON_OBJECT"
Output Format
Each emitted record is the parsed _source of an Elasticsearch document. The original document identifiers are attached as event metadata:
| Metadata key | Value |
|---|---|
_id | Document _id from Elasticsearch |
_index | Concrete index the document was returned from |
Tuning Notes
- Scroll timeout: must be long enough to cover the time between two
fetch()calls. Longer values are safer for slow downstream processing but consume more cluster memory. - Batch size: larger values reduce HTTP round trips but increase per-request memory pressure on the cluster.
Related Nodes
- elasticsearchsink: Write records back to an Elasticsearch index
- splunksource: Batch-ingest events from Splunk
- jdbcsource: Read records from a relational database