Skip to main content

Splunk Source Node

The splunkSource node in ZephFlow enables you to ingest data from Splunk Enterprise or Splunk Cloud Platform by executing SPL search queries against the Splunk REST API.

Overview

The Splunk source node connects to a Splunk instance via its management port, submits a search job using SPL (Search Processing Language), waits for the job to complete, and emits the resulting events into your ZephFlow pipeline. This is a batch source — it runs the query once per pipeline execution.

Key Features

  • SPL Query Support: Execute any valid SPL search query against your Splunk instance
  • Credential Management: Authenticate with username/password credentials
  • SSL Configuration: Optional SSL certificate validation for HTTPS connections

Parameters

ParameterTypeDescriptionRequired
splunkUrlStringFull URL of the Splunk management endpoint (e.g., https://host:8089)Yes
searchQueryStringSPL query to execute (e.g., search index=main sourcetype=syslog)Yes
validateCertificatesbooleanWhether to validate SSL certificates for HTTPS connectionsNo
credentialUsernamePasswordCredentialUsername and password for Splunk authenticationYes

Config Object

The full config object for the Splunk source node (SplunkSourceDto.Config):

FieldTypeRequiredDefaultDescription
splunkUrlStringYesFull URL of the Splunk management endpoint (e.g., https://host:8089)
searchQueryStringYesSPL query to execute
credentialIdStringYesCredential ID for Splunk authentication
validateCertificatesbooleanNofalseWhether to validate SSL certificates for HTTPS connections
batchSizeintNo10000Number of results to fetch per batch
jobInitTimeoutMslongNo300000Timeout in milliseconds for search job initialization
earliestTimeStringNonullEarliest time bound for the search (Splunk time format)
latestTimeStringNonullLatest time bound for the search (Splunk time format)

Java SDK Usage

Basic Usage

UsernamePasswordCredential credential = new UsernamePasswordCredential(
"splunk-username",
"splunk-password"
);

ZephFlow flow = ZephFlow.startFlow()
.splunkSource(
"https://splunk.example.com:8089", // Splunk management URL
"search index=main sourcetype=access_log | head 100", // SPL query
false, // Validate SSL certificates
credential // Credentials
);

Log Ingestion Pipeline

UsernamePasswordCredential credential = new UsernamePasswordCredential(
"splunk-user", "splunk-pass"
);

ZephFlow flow = ZephFlow.startFlow()
.splunkSource(
"https://splunk.example.com:8089",
"search index=main sourcetype=access_log | head 1000",
false,
credential
)
.eval("dict_merge($, dict(ingested=true))")
.kafkaSink("kafka:9092", "splunk-events", null, EncodingType.JSON_OBJECT, null);

With SSL Validation

ZephFlow flow = ZephFlow.startFlow()
.splunkSource(
"https://splunk-prod.internal:8089",
"search index=security sourcetype=firewall action=blocked",
true,
credential
)
.filter("$.severity == \"high\"")
.s3Sink("us-east-1", "security-logs", "blocked-events", EncodingType.JSON_OBJECT_LINE);