Splunk Source Node
The splunkSource node in ZephFlow enables you to ingest data from Splunk Enterprise or Splunk Cloud Platform by executing SPL search queries against the Splunk REST API.
Overview
The Splunk source node connects to a Splunk instance via its management port, submits a search job using SPL (Search Processing Language), waits for the job to complete, and emits the resulting events into your ZephFlow pipeline. This is a batch source — it runs the query once per pipeline execution.
Key Features
- SPL Query Support: Execute any valid SPL search query against your Splunk instance
- Credential Management: Authenticate with username/password credentials
- SSL Configuration: Optional SSL certificate validation for HTTPS connections
Parameters
| Parameter | Type | Description | Required |
|---|---|---|---|
splunkUrl | String | Full URL of the Splunk management endpoint (e.g., https://host:8089) | Yes |
searchQuery | String | SPL query to execute (e.g., search index=main sourcetype=syslog) | Yes |
validateCertificates | boolean | Whether to validate SSL certificates for HTTPS connections | No |
credential | UsernamePasswordCredential | Username and password for Splunk authentication | Yes |
Config Object
The full config object for the Splunk source node (SplunkSourceDto.Config):
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
splunkUrl | String | Yes | — | Full URL of the Splunk management endpoint (e.g., https://host:8089) |
searchQuery | String | Yes | — | SPL query to execute |
credentialId | String | Yes | — | Credential ID for Splunk authentication |
validateCertificates | boolean | No | false | Whether to validate SSL certificates for HTTPS connections |
batchSize | int | No | 10000 | Number of results to fetch per batch |
jobInitTimeoutMs | long | No | 300000 | Timeout in milliseconds for search job initialization |
earliestTime | String | No | null | Earliest time bound for the search (Splunk time format) |
latestTime | String | No | null | Latest time bound for the search (Splunk time format) |
Java SDK Usage
Basic Usage
UsernamePasswordCredential credential = new UsernamePasswordCredential(
"splunk-username",
"splunk-password"
);
ZephFlow flow = ZephFlow.startFlow()
.splunkSource(
"https://splunk.example.com:8089", // Splunk management URL
"search index=main sourcetype=access_log | head 100", // SPL query
false, // Validate SSL certificates
credential // Credentials
);
Log Ingestion Pipeline
UsernamePasswordCredential credential = new UsernamePasswordCredential(
"splunk-user", "splunk-pass"
);
ZephFlow flow = ZephFlow.startFlow()
.splunkSource(
"https://splunk.example.com:8089",
"search index=main sourcetype=access_log | head 1000",
false,
credential
)
.eval("dict_merge($, dict(ingested=true))")
.kafkaSink("kafka:9092", "splunk-events", null, EncodingType.JSON_OBJECT, null);
With SSL Validation
ZephFlow flow = ZephFlow.startFlow()
.splunkSource(
"https://splunk-prod.internal:8089",
"search index=security sourcetype=firewall action=blocked",
true,
credential
)
.filter("$.severity == \"high\"")
.s3Sink("us-east-1", "security-logs", "blocked-events", EncodingType.JSON_OBJECT_LINE);
Related Nodes
- kafkaSource: Consume streaming data from Kafka
- s3Sink: Write data to S3