Parser Node
The parser node is a versatile component in ZephFlow that extracts structured data from string fields in your events.
It can parse various log formats into structured key-value pairs, allowing you to transform raw log data into structured
events for easier analysis and processing.
Key Features
- Multi-format Parsing: Support for various log formats including Grok patterns, Syslog, CEF, Windows multiline logs, delimited text (CSV/TSV), JSON, and key-value pairs
- Flexible Field Selection: Parse specific fields within your events
- Field Management: Option to remove original raw fields after parsing
Config Object
The full config object for the parser node (ParserConfigs.ParserConfig):
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
targetField | String | Yes | — | Field to parse. Must be a string field, otherwise processing will fail |
removeTargetField | boolean | No | false | When true, the original field is removed after parsing |
extractionConfig | ExtractionConfig | Yes | — | Defines the parsing method and format (see Extraction Configurations below) |
The extractionConfig object uses a type field as a discriminator to select the parsing method. The available type values are:
type Value | Extraction Method | Description |
|---|---|---|
grok | Grok Extraction | Pattern-matching with named regular expressions |
windows_multiline | Windows Multiline Extraction | Multi-line Windows event logs |
syslog | Syslog Extraction | RFC 3164/5424 syslog format |
cef | CEF Extraction | Common Event Format (ArcSight) |
json | JSON Extraction | Embedded JSON string parsing |
delimited_text | Delimited Text Extraction | CSV/TSV/custom delimiter |
kv_pair | Key-Value Pair Extraction | Key=value formatted data |
Example YAML config (within a DAG node definition):
- id: "parse_logs"
commandName: "parser"
config:
targetField: "__raw__"
removeTargetField: true
extractionConfig:
type: "grok"
grokExpression: "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"
outputs:
- "next_node"
Equivalent JSON config:
{
"targetField": "__raw__",
"removeTargetField": true,
"extractionConfig": {
"type": "grok",
"grokExpression": "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"
}
}
Extraction Configurations
ZephFlow provides multiple extraction configurations to handle different log formats efficiently. Each extraction method is optimized for specific log structures and conventions.
Grok Extraction
Grok is a powerful pattern-matching syntax that combines named regular expressions to parse unstructured text into structured data.
Configuration
| Parameter | Description |
|---|---|
grokExpression | A pattern string that defines how to extract fields from the text |
YAML Config
extractionConfig:
type: "grok"
grokExpression: "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"
Grok Pattern Syntax
Grok patterns use the format %{SYNTAX:SEMANTIC} where:
SYNTAXis the pattern name (like IP, TIMESTAMP, NUMBER)SEMANTICis the field name you want to assign the matched value to
Common Grok Patterns
| Pattern | Description |
|---|---|
%{NUMBER} | Matches decimal numbers |
%{IP} | Matches IPv4 addresses |
%{TIMESTAMP_ISO8601} | Matches ISO8601 timestamps |
%{LOGLEVEL} | Matches log levels (INFO, ERROR, etc.) |
%{GREEDYDATA} | Matches everything remaining |
%{WORD} | Matches word characters (a-z, A-Z, 0-9, _) |
%{NOTSPACE} | Matches everything until a space |
Example
Grok pattern:
%{IPORHOST:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response_code} %{NUMBER:bytes}
Input:
{
"__raw__": "192.168.1.1 - - [10/Oct/2023:13:55:36 -0700] \"GET /index.html HTTP/1.1\" 200 2326"
}
Output:
{
"client_ip": "192.168.1.1",
"ident": "-",
"auth": "-",
"timestamp": "10/Oct/2023:13:55:36 -0700",
"method": "GET",
"request": "/index.html",
"httpversion": "1.1",
"response_code": "200",
"bytes": "2326"
}
Windows Multiline Extraction
This extraction method is specifically designed to parse Windows event logs that span multiple lines, which is common in Windows applications and services logs.
Configuration
| Parameter | Description |
|---|---|
timestampLocationType | Specifies where to find the timestamp in the log entry |
config | Additional configuration parameters as key-value pairs |
YAML Config
extractionConfig:
type: "windows_multiline"
timestampLocationType: "FIRST_LINE"
# Using FROM_FIELD to extract timestamp from a specific field
extractionConfig:
type: "windows_multiline"
timestampLocationType: "FROM_FIELD"
config:
target_field: "event_time"
Timestamp Location Types
| Type | Description |
|---|---|
NO_TIMESTAMP | Log entries don't contain timestamps |
FIRST_LINE | Timestamp appears in the first line of each log entry |
FROM_FIELD | Timestamp is found in a specific field (requires setting target_field in config) |
Syslog Extraction
The Syslog extraction configuration parses standard syslog formatted messages, which are widely used in system and network device logging.
Configuration
| Parameter | Description |
|---|---|
timestampPattern | Java date format pattern for parsing the timestamp component. required when TIMESTAMP component is present |
componentList | Ordered list of syslog components present in the log |
messageBodyDelimiter | Character that separates the header from the message body (optional) |
YAML Config
extractionConfig:
type: "syslog"
timestampPattern: "MMM d HH:mm:ss"
componentList:
- "TIMESTAMP"
- "DEVICE"
- "APP"
- "PROC_ID"
messageBodyDelimiter: ":"
Syslog Components
| Component | Description | Parsed Field Name | Example |
|---|---|---|---|
PRIORITY | Log priority enclosed in angle brackets | priority | <13> |
VERSION | Syslog protocol version | version | 1 |
TIMESTAMP | Timestamp of the log event. If present, timestampPattern is required | timestamp | Oct 11 22:14:15 |
DEVICE | Host name or IP address | deviceId | server1 |
APP | Application or process name | appName | sshd |
PROC_ID | Process ID | procId | 12345 |
MSG_ID | Message identifier | msgId | ID47 |
STRUCTURED_DATA | Structured data in the format [id@domain key="value"] | structuredData | [exampleSDID@32473 iut="3" eventSource="App"] |
| Remaining log | whatever remaining content after syslog header | content | Failed password for invalid user admin from 192.168.1.10 port 55279 ssh2 |
Example
Input:
{
"__raw__": "Oct 11 22:14:15 server1 sshd 12345: Failed password for invalid user admin from 192.168.1.10 port 55279 ssh2"
}
Output (with components: TIMESTAMP, DEVICE, APP, PROC_ID and delimiter ':'):
{
"timestamp": "Oct 11 22:14:15",
"deviceId": "server1",
"appName": "sshd",
"procId": "12345",
"content": "Failed password for invalid user admin from 192.168.1.10 port 55279 ssh2"
}
CEF Extraction
Common Event Format (CEF) is a logging and auditing file format developed by ArcSight, widely used in security information and event management (SIEM) systems.
The CEF extraction config doesn't require any additional parameters.
YAML Config
extractionConfig:
type: "cef"
CEF Format Structure
CEF logs follow this structure:
CEF:Version|Device Vendor|Device Product|Device Version|Signature ID|Name|Severity|Extension
The header part (before the Extension) contains pipe-delimited fields, while the Extension part contains key-value pairs.
Example
When events are ingested as plain strings (e.g., via
STRING_LINEencoding), the string content is automatically placed into a field called__raw__. This is why the config targets__raw__and why the output retains it (sinceremoveTargetFieldis not set).
Input:
CEF:0|Vendor|Product|1.0|100|Intrusion Detected|10|src=192.168.1.1 dst=10.0.0.1 spt=1234 dpt=80 act=blocked
Output:
{
"severity": 10,
"dst": "10.0.0.1",
"src": "192.168.1.1",
"deviceVendor": "Vendor",
"__raw__": "CEF:0|Vendor|Product|1.0|100|Intrusion Detected|10|src=192.168.1.1 dst=10.0.0.1 spt=1234 dpt=80 act=blocked",
"dpt": "80",
"deviceVersion": "1.0",
"version": 0,
"deviceEventClassId": "100",
"act": "blocked",
"spt": "1234",
"name": "Intrusion Detected",
"deviceProduct": "Product"
}
Delimited Text Extraction
The Delimited Text extraction configuration parses structured text files where values are separated by a specific delimiter, such as CSV (comma-separated values), TSV (tab-separated values), or custom delimiters.
Configuration
| Parameter | Description |
|---|---|
delimiter | The character(s) used to separate values (e.g., ",", "\t", "|") |
columns | Ordered list of column names corresponding to the delimited values |
YAML Config
extractionConfig:
type: "delimited_text"
delimiter: ","
columns:
- "timestamp"
- "user_id"
- "action"
- "resource"
- "status"
Features
- Correctly handles quoted values containing delimiters
- Supports escaped quotes within values
Example
Input:
{
"__raw__": "2023-10-15T14:32:01Z,12345,LOGIN,/dashboard,SUCCESS"
}
Output (with columns: timestamp, user_id, action, resource, status):
{
"timestamp": "2023-10-15T14:32:01Z",
"user_id": "12345",
"action": "LOGIN",
"resource": "/dashboard",
"status": "SUCCESS"
}
Handling Quoted Values
The parser correctly handles quoted values that contain the delimiter:
Input:
{
"__raw__": "101,\"Smith, John\",john@example.com,ACTIVE"
}
Output:
{
"user_id": "101",
"name": "Smith, John",
"email": "john@example.com",
"status": "ACTIVE"
}
JSON Extraction
The JSON extraction configuration parses a JSON string and stores the resulting structured object in a specified field.
Configuration
| Parameter | Description |
|---|---|
outputFieldName | The field name to store the parsed JSON structure |
YAML Config
extractionConfig:
type: "json"
outputFieldName: "event_data"
Example
Input:
{
"json_string": "{\"user\":\"alice\",\"action\":\"login\",\"ip\":\"192.168.1.100\"}"
}
Output (with outputFieldName: "event_data", removeTargetField: true):
{
"event_data": {
"user": "alice",
"action": "login",
"ip": "192.168.1.100"
}
}
Key-Value Pair Extraction
The Key-Value Pair extraction configuration parses strings containing key-value pairs with configurable separators.
Configuration
| Parameter | Description |
|---|---|
pairSeparator | Character that separates key-value pairs (e.g., ',') |
kvSeparator | Character that separates keys from values (e.g., '=') |
YAML Config
extractionConfig:
type: "kv_pair"
pairSeparator: " "
kvSeparator: "="
Features
- Handles quoted values containing separators
- Supports escaped quotes within values
- Flexible configuration for different key-value formats
- Escape sequence support: The
pairSeparatorandkvSeparatorfields support escape sequences including\t(tab),\n(newline),\r(carriage return), and\\(literal backslash) - Duplicate key aggregation: When the same key appears multiple times, values are aggregated into an array. The first occurrence is stored as a string; subsequent occurrences cause the value to become an array of strings.
Duplicate Key Behavior
When a key appears more than once, the parser automatically aggregates the values:
Input:
{
"metadata": "key1=v1,key1=v2,key2=v3"
}
With pairSeparator="," and kvSeparator="=":
Output:
{
"key1": ["v1", "v2"],
"key2": "v3"
}
Example
Input:
{
"metadata": "user=john status=active role=admin last_login=2023-10-15"
}
Output (with pairSeparator: " ", kvSeparator: "=", removeTargetField: true):
{
"user": "john",
"status": "active",
"role": "admin",
"last_login": "2023-10-15"
}
Handling Quoted Values
The parser correctly handles quoted values containing separators:
Input:
{
"metadata": "name=\"Smith, John\" email=john@example.com dept=\"Sales, North\""
}
Output:
{
"name": "Smith, John",
"email": "john@example.com",
"dept": "Sales, North"
}
Multi-Stage Parsing with DAG
For complex log formats that require multiple parsing stages, you can chain multiple parser nodes together in a DAG (Directed Acyclic Graph). Each parser node processes the output of the previous node, enabling sophisticated processing pipelines for nested or multi-layered log formats.
Key Benefits
- Modularity: Each parsing stage is a separate, testable node
- Flexibility: Easy to add, remove, or modify parsing stages
- Conditional Processing: Use filter nodes to branch based on extracted fields
- Reusability: Common parsing stages can be shared across different pipelines
Complete Example
For a comprehensive example of multi-stage parsing, see the Cisco ASA Log Processing Tutorial, which demonstrates:
- Parsing syslog headers
- Extracting application-specific metadata
- Branching based on message types
- Message-specific field extraction
- Transforming to standardized formats (OCSF)
Best Practices
Selecting the Right Extraction Configuration
- Analyze your log format first to determine which extraction configuration best matches your needs:
- Use Grok for most text-based logs with consistent formats
- Use Syslog for standard system and network device logs
- Use Windows Multiline for Windows Event logs
- Use CEF for security and SIEM-related logs
- Use Delimited Text for CSV, TSV, or other delimiter-separated logs
- Use JSON for parsing embedded JSON strings
- Use Key-Value Pair for logs with key=value formatted data
-
Test with sample data to verify your configuration handles all variations in your log format
-
Create targeted parsers rather than trying to parse everything with one complex configuration
-
Use DAG composition for multi-stage parsing - chain parser nodes together rather than creating overly complex single-stage parsers
Common Pitfalls to Avoid
-
Overly complex Grok patterns can be difficult to maintain - break them down into smaller, reusable patterns
-
Missing components in Syslog configuration - ensure your component list matches the exact format of your logs
-
Incorrect timestamp patterns - test thoroughly with various date formats that appear in your logs
-
Performance considerations - very complex parsing on high-volume logs can impact performance; consider pre-filtering or using simpler patterns where possible
Java SDK Usage
Basic Usage
ZephFlow flow = ZephFlow.startFlow();
flow.parse(parserConfig);
Grok Parser
ParserConfigs.ParserConfig parserConfig = ParserConfigs.ParserConfig.builder()
.targetField("message")
.removeTargetField(true)
.extractionConfig(new GrokExtractionConfig("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:content}"))
.build();
ZephFlow flow = ZephFlow.startFlow();
flow.parse(parserConfig);
Apache Access Log Parser
ParserConfigs.ParserConfig apacheConfig = ParserConfigs.ParserConfig.builder()
.targetField("__raw__")
.extractionConfig(new GrokExtractionConfig(
"%{IPORHOST:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \\[%{HTTPDATE:timestamp}\\] \"%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response_code} %{NUMBER:bytes}"
))
.build();
ASA Log Grok Parser
GrokExtractionConfig grokConfig = GrokExtractionConfig.builder()
.grokExpression("%ASA-%{INT:level}-%{INT:message_number}: %{GREEDYDATA:message_text}")
.build();
Windows Multiline Parser
WindowsMultilineExtractionConfig windowsConfig = WindowsMultilineExtractionConfig.builder()
.timestampLocationType(TimestampLocationType.FIRST_LINE)
.config(Map.of("key", "value"))
.build();
Using FROM_FIELD Timestamp Location
WindowsMultilineExtractionConfig windowsConfig = WindowsMultilineExtractionConfig.builder()
.timestampLocationType(WindowsMultilineExtractionConfig.TimestampLocationType.FROM_FIELD)
.config(Map.of("target_field", "event_time"))
.build();
Syslog Parser
BSD Format
ParserConfigs.ParserConfig syslogConfig = ParserConfigs.ParserConfig.builder()
.targetField("__raw__")
.extractionConfig(SyslogExtractionConfig.builder()
.timestampPattern("MMM d HH:mm:ss")
.componentList(List.of(
SyslogExtractionConfig.ComponentType.TIMESTAMP,
SyslogExtractionConfig.ComponentType.DEVICE,
SyslogExtractionConfig.ComponentType.APP,
SyslogExtractionConfig.ComponentType.PROC_ID
))
.messageBodyDelimiter(':')
.build())
.build();
RFC5424 Format
ParserConfigs.ParserConfig syslog5424Config = ParserConfigs.ParserConfig.builder()
.targetField("log_message")
.extractionConfig(SyslogExtractionConfig.builder()
.timestampPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
.componentList(List.of(
SyslogExtractionConfig.ComponentType.PRIORITY,
SyslogExtractionConfig.ComponentType.VERSION,
SyslogExtractionConfig.ComponentType.TIMESTAMP,
SyslogExtractionConfig.ComponentType.DEVICE,
SyslogExtractionConfig.ComponentType.APP,
SyslogExtractionConfig.ComponentType.PROC_ID,
SyslogExtractionConfig.ComponentType.MSG_ID,
SyslogExtractionConfig.ComponentType.STRUCTURED_DATA
))
.build())
.build();
CEF Parser
ParserConfigs.ParserConfig cefConfig = ParserConfigs.ParserConfig.builder()
.targetField("__raw__")
.extractionConfig(new CefExtractionConfig())
.build();
Delimited Text (CSV) Parser
ParserConfigs.ParserConfig csvConfig = ParserConfigs.ParserConfig.builder()
.targetField("__raw__")
.extractionConfig(DelimitedTextExtractionConfig.builder()
.delimiter(",")
.columns(List.of("timestamp", "user_id", "action", "resource", "status"))
.build())
.build();
JSON Parser
ParserConfigs.ParserConfig jsonConfig = ParserConfigs.ParserConfig.builder()
.targetField("json_string")
.removeTargetField(true)
.extractionConfig(JsonExtractionConfig.builder()
.outputFieldName("event_data")
.build())
.build();
Key-Value Pair Parser
ParserConfigs.ParserConfig kvConfig = ParserConfigs.ParserConfig.builder()
.targetField("metadata")
.removeTargetField(true)
.extractionConfig(KvPairExtractionConfig.builder()
.pairSeparator(" ")
.kvSeparator("=")
.build())
.build();
Multi-Stage Parsing Pipeline
ZephFlow flow = ZephFlow.startFlow();
// Stage 1: Parse syslog header
ZephFlow stage1 = flow.fileSource("logs.txt", EncodingType.STRING_LINE)
.parse(ParserConfigs.ParserConfig.builder()
.targetField("__raw__")
.extractionConfig(SyslogExtractionConfig.builder()
.timestampPattern("MMM dd yyyy HH:mm:ss")
.componentList(List.of(
SyslogExtractionConfig.ComponentType.TIMESTAMP,
SyslogExtractionConfig.ComponentType.DEVICE,
SyslogExtractionConfig.ComponentType.APP
))
.messageBodyDelimiter(':')
.build())
.build());
// Stage 2: Parse application-specific format
ZephFlow stage2 = stage1.parse(ParserConfigs.ParserConfig.builder()
.targetField("content")
.removeTargetField(true)
.extractionConfig(GrokExtractionConfig.builder()
.grokExpression("%{WORD:severity}-%{INT:code}: %{GREEDYDATA:message}")
.build())
.build());
// Stage 3: Branch and parse message-specific details
ZephFlow type1Flow = stage2
.filter("$.code == '305011'")
.parse(ParserConfigs.ParserConfig.builder()
.targetField("message")
.extractionConfig(GrokExtractionConfig.builder()
.grokExpression("%{WORD:action} %{WORD:protocol} from %{IP:src_ip}/%{INT:src_port}")
.build())
.build());
ZephFlow type2Flow = stage2
.filter("$.code == '106023'")
.parse(ParserConfigs.ParserConfig.builder()
.targetField("message")
.extractionConfig(/* different pattern for this message type */)
.build());
// Merge branches and output
ZephFlow output = ZephFlow.merge(type1Flow, type2Flow)
.stdoutSink(EncodingType.JSON_OBJECT);