Skip to main content

Parser Node

The parser node is a versatile component in ZephFlow that extracts structured data from string fields in your events. It can parse various log formats into structured key-value pairs, allowing you to transform raw log data into structured events for easier analysis and processing.

Key Features

  • Multi-format Parsing: Support for various log formats including Grok patterns, Syslog, CEF, Windows multiline logs, delimited text (CSV/TSV), JSON, and key-value pairs
  • Flexible Field Selection: Parse specific fields within your events
  • Field Management: Option to remove original raw fields after parsing

Config Object

The full config object for the parser node (ParserConfigs.ParserConfig):

FieldTypeRequiredDefaultDescription
targetFieldStringYesField to parse. Must be a string field, otherwise processing will fail
removeTargetFieldbooleanNofalseWhen true, the original field is removed after parsing
extractionConfigExtractionConfigYesDefines the parsing method and format (see Extraction Configurations below)

The extractionConfig object uses a type field as a discriminator to select the parsing method. The available type values are:

type ValueExtraction MethodDescription
grokGrok ExtractionPattern-matching with named regular expressions
windows_multilineWindows Multiline ExtractionMulti-line Windows event logs
syslogSyslog ExtractionRFC 3164/5424 syslog format
cefCEF ExtractionCommon Event Format (ArcSight)
jsonJSON ExtractionEmbedded JSON string parsing
delimited_textDelimited Text ExtractionCSV/TSV/custom delimiter
kv_pairKey-Value Pair ExtractionKey=value formatted data

Example YAML config (within a DAG node definition):

- id: "parse_logs"
commandName: "parser"
config:
targetField: "__raw__"
removeTargetField: true
extractionConfig:
type: "grok"
grokExpression: "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"
outputs:
- "next_node"

Equivalent JSON config:

{
"targetField": "__raw__",
"removeTargetField": true,
"extractionConfig": {
"type": "grok",
"grokExpression": "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"
}
}

Extraction Configurations

ZephFlow provides multiple extraction configurations to handle different log formats efficiently. Each extraction method is optimized for specific log structures and conventions.

Grok Extraction

Grok is a powerful pattern-matching syntax that combines named regular expressions to parse unstructured text into structured data.

Configuration
ParameterDescription
grokExpressionA pattern string that defines how to extract fields from the text
YAML Config
extractionConfig:
type: "grok"
grokExpression: "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"
Grok Pattern Syntax

Grok patterns use the format %{SYNTAX:SEMANTIC} where:

  • SYNTAX is the pattern name (like IP, TIMESTAMP, NUMBER)
  • SEMANTIC is the field name you want to assign the matched value to
Common Grok Patterns
PatternDescription
%{NUMBER}Matches decimal numbers
%{IP}Matches IPv4 addresses
%{TIMESTAMP_ISO8601}Matches ISO8601 timestamps
%{LOGLEVEL}Matches log levels (INFO, ERROR, etc.)
%{GREEDYDATA}Matches everything remaining
%{WORD}Matches word characters (a-z, A-Z, 0-9, _)
%{NOTSPACE}Matches everything until a space
Example

Grok pattern:

%{IPORHOST:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response_code} %{NUMBER:bytes}

Input:

{
"__raw__": "192.168.1.1 - - [10/Oct/2023:13:55:36 -0700] \"GET /index.html HTTP/1.1\" 200 2326"
}

Output:

{
"client_ip": "192.168.1.1",
"ident": "-",
"auth": "-",
"timestamp": "10/Oct/2023:13:55:36 -0700",
"method": "GET",
"request": "/index.html",
"httpversion": "1.1",
"response_code": "200",
"bytes": "2326"
}

Windows Multiline Extraction

This extraction method is specifically designed to parse Windows event logs that span multiple lines, which is common in Windows applications and services logs.

Configuration
ParameterDescription
timestampLocationTypeSpecifies where to find the timestamp in the log entry
configAdditional configuration parameters as key-value pairs
YAML Config
extractionConfig:
type: "windows_multiline"
timestampLocationType: "FIRST_LINE"
# Using FROM_FIELD to extract timestamp from a specific field
extractionConfig:
type: "windows_multiline"
timestampLocationType: "FROM_FIELD"
config:
target_field: "event_time"
Timestamp Location Types
TypeDescription
NO_TIMESTAMPLog entries don't contain timestamps
FIRST_LINETimestamp appears in the first line of each log entry
FROM_FIELDTimestamp is found in a specific field (requires setting target_field in config)

Syslog Extraction

The Syslog extraction configuration parses standard syslog formatted messages, which are widely used in system and network device logging.

Configuration
ParameterDescription
timestampPatternJava date format pattern for parsing the timestamp component. required when TIMESTAMP component is present
componentListOrdered list of syslog components present in the log
messageBodyDelimiterCharacter that separates the header from the message body (optional)
YAML Config
extractionConfig:
type: "syslog"
timestampPattern: "MMM d HH:mm:ss"
componentList:
- "TIMESTAMP"
- "DEVICE"
- "APP"
- "PROC_ID"
messageBodyDelimiter: ":"
Syslog Components
ComponentDescriptionParsed Field NameExample
PRIORITYLog priority enclosed in angle bracketspriority<13>
VERSIONSyslog protocol versionversion1
TIMESTAMPTimestamp of the log event. If present, timestampPattern is requiredtimestampOct 11 22:14:15
DEVICEHost name or IP addressdeviceIdserver1
APPApplication or process nameappNamesshd
PROC_IDProcess IDprocId12345
MSG_IDMessage identifiermsgIdID47
STRUCTURED_DATAStructured data in the format [id@domain key="value"]structuredData[exampleSDID@32473 iut="3" eventSource="App"]
Remaining logwhatever remaining content after syslog headercontentFailed password for invalid user admin from 192.168.1.10 port 55279 ssh2
Example

Input:

{
"__raw__": "Oct 11 22:14:15 server1 sshd 12345: Failed password for invalid user admin from 192.168.1.10 port 55279 ssh2"
}

Output (with components: TIMESTAMP, DEVICE, APP, PROC_ID and delimiter ':'):

{
"timestamp": "Oct 11 22:14:15",
"deviceId": "server1",
"appName": "sshd",
"procId": "12345",
"content": "Failed password for invalid user admin from 192.168.1.10 port 55279 ssh2"
}

CEF Extraction

Common Event Format (CEF) is a logging and auditing file format developed by ArcSight, widely used in security information and event management (SIEM) systems.

The CEF extraction config doesn't require any additional parameters.

YAML Config
extractionConfig:
type: "cef"
CEF Format Structure

CEF logs follow this structure:

CEF:Version|Device Vendor|Device Product|Device Version|Signature ID|Name|Severity|Extension

The header part (before the Extension) contains pipe-delimited fields, while the Extension part contains key-value pairs.

Example

When events are ingested as plain strings (e.g., via STRING_LINE encoding), the string content is automatically placed into a field called __raw__. This is why the config targets __raw__ and why the output retains it (since removeTargetField is not set).

Input:

CEF:0|Vendor|Product|1.0|100|Intrusion Detected|10|src=192.168.1.1 dst=10.0.0.1 spt=1234 dpt=80 act=blocked

Output:

{
"severity": 10,
"dst": "10.0.0.1",
"src": "192.168.1.1",
"deviceVendor": "Vendor",
"__raw__": "CEF:0|Vendor|Product|1.0|100|Intrusion Detected|10|src=192.168.1.1 dst=10.0.0.1 spt=1234 dpt=80 act=blocked",
"dpt": "80",
"deviceVersion": "1.0",
"version": 0,
"deviceEventClassId": "100",
"act": "blocked",
"spt": "1234",
"name": "Intrusion Detected",
"deviceProduct": "Product"
}

Delimited Text Extraction

The Delimited Text extraction configuration parses structured text files where values are separated by a specific delimiter, such as CSV (comma-separated values), TSV (tab-separated values), or custom delimiters.

Configuration
ParameterDescription
delimiterThe character(s) used to separate values (e.g., ",", "\t", "|")
columnsOrdered list of column names corresponding to the delimited values
YAML Config
extractionConfig:
type: "delimited_text"
delimiter: ","
columns:
- "timestamp"
- "user_id"
- "action"
- "resource"
- "status"
Features
  • Correctly handles quoted values containing delimiters
  • Supports escaped quotes within values
Example

Input:

{
"__raw__": "2023-10-15T14:32:01Z,12345,LOGIN,/dashboard,SUCCESS"
}

Output (with columns: timestamp, user_id, action, resource, status):

{
"timestamp": "2023-10-15T14:32:01Z",
"user_id": "12345",
"action": "LOGIN",
"resource": "/dashboard",
"status": "SUCCESS"
}
Handling Quoted Values

The parser correctly handles quoted values that contain the delimiter:

Input:

{
"__raw__": "101,\"Smith, John\",john@example.com,ACTIVE"
}

Output:

{
"user_id": "101",
"name": "Smith, John",
"email": "john@example.com",
"status": "ACTIVE"
}

JSON Extraction

The JSON extraction configuration parses a JSON string and stores the resulting structured object in a specified field.

Configuration
ParameterDescription
outputFieldNameThe field name to store the parsed JSON structure
YAML Config
extractionConfig:
type: "json"
outputFieldName: "event_data"
Example

Input:

{
"json_string": "{\"user\":\"alice\",\"action\":\"login\",\"ip\":\"192.168.1.100\"}"
}

Output (with outputFieldName: "event_data", removeTargetField: true):

{
"event_data": {
"user": "alice",
"action": "login",
"ip": "192.168.1.100"
}
}

Key-Value Pair Extraction

The Key-Value Pair extraction configuration parses strings containing key-value pairs with configurable separators.

Configuration
ParameterDescription
pairSeparatorCharacter that separates key-value pairs (e.g., ',')
kvSeparatorCharacter that separates keys from values (e.g., '=')
YAML Config
extractionConfig:
type: "kv_pair"
pairSeparator: " "
kvSeparator: "="
Features
  • Handles quoted values containing separators
  • Supports escaped quotes within values
  • Flexible configuration for different key-value formats
  • Escape sequence support: The pairSeparator and kvSeparator fields support escape sequences including \t (tab), \n (newline), \r (carriage return), and \\ (literal backslash)
  • Duplicate key aggregation: When the same key appears multiple times, values are aggregated into an array. The first occurrence is stored as a string; subsequent occurrences cause the value to become an array of strings.
Duplicate Key Behavior

When a key appears more than once, the parser automatically aggregates the values:

Input:

{
"metadata": "key1=v1,key1=v2,key2=v3"
}

With pairSeparator="," and kvSeparator="=":

Output:

{
"key1": ["v1", "v2"],
"key2": "v3"
}
Example

Input:

{
"metadata": "user=john status=active role=admin last_login=2023-10-15"
}

Output (with pairSeparator: " ", kvSeparator: "=", removeTargetField: true):

{
"user": "john",
"status": "active",
"role": "admin",
"last_login": "2023-10-15"
}
Handling Quoted Values

The parser correctly handles quoted values containing separators:

Input:

{
"metadata": "name=\"Smith, John\" email=john@example.com dept=\"Sales, North\""
}

Output:

{
"name": "Smith, John",
"email": "john@example.com",
"dept": "Sales, North"
}

Multi-Stage Parsing with DAG

For complex log formats that require multiple parsing stages, you can chain multiple parser nodes together in a DAG (Directed Acyclic Graph). Each parser node processes the output of the previous node, enabling sophisticated processing pipelines for nested or multi-layered log formats.

Key Benefits

  • Modularity: Each parsing stage is a separate, testable node
  • Flexibility: Easy to add, remove, or modify parsing stages
  • Conditional Processing: Use filter nodes to branch based on extracted fields
  • Reusability: Common parsing stages can be shared across different pipelines

Complete Example

For a comprehensive example of multi-stage parsing, see the Cisco ASA Log Processing Tutorial, which demonstrates:

  • Parsing syslog headers
  • Extracting application-specific metadata
  • Branching based on message types
  • Message-specific field extraction
  • Transforming to standardized formats (OCSF)

Best Practices

Selecting the Right Extraction Configuration

  1. Analyze your log format first to determine which extraction configuration best matches your needs:
  • Use Grok for most text-based logs with consistent formats
  • Use Syslog for standard system and network device logs
  • Use Windows Multiline for Windows Event logs
  • Use CEF for security and SIEM-related logs
  • Use Delimited Text for CSV, TSV, or other delimiter-separated logs
  • Use JSON for parsing embedded JSON strings
  • Use Key-Value Pair for logs with key=value formatted data
  1. Test with sample data to verify your configuration handles all variations in your log format

  2. Create targeted parsers rather than trying to parse everything with one complex configuration

  3. Use DAG composition for multi-stage parsing - chain parser nodes together rather than creating overly complex single-stage parsers

Common Pitfalls to Avoid

  1. Overly complex Grok patterns can be difficult to maintain - break them down into smaller, reusable patterns

  2. Missing components in Syslog configuration - ensure your component list matches the exact format of your logs

  3. Incorrect timestamp patterns - test thoroughly with various date formats that appear in your logs

  4. Performance considerations - very complex parsing on high-volume logs can impact performance; consider pre-filtering or using simpler patterns where possible

Java SDK Usage

Basic Usage

ZephFlow flow = ZephFlow.startFlow();
flow.parse(parserConfig);

Grok Parser

ParserConfigs.ParserConfig parserConfig = ParserConfigs.ParserConfig.builder()
.targetField("message")
.removeTargetField(true)
.extractionConfig(new GrokExtractionConfig("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:content}"))
.build();

ZephFlow flow = ZephFlow.startFlow();
flow.parse(parserConfig);

Apache Access Log Parser

ParserConfigs.ParserConfig apacheConfig = ParserConfigs.ParserConfig.builder()
.targetField("__raw__")
.extractionConfig(new GrokExtractionConfig(
"%{IPORHOST:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \\[%{HTTPDATE:timestamp}\\] \"%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response_code} %{NUMBER:bytes}"
))
.build();

ASA Log Grok Parser

GrokExtractionConfig grokConfig = GrokExtractionConfig.builder()
.grokExpression("%ASA-%{INT:level}-%{INT:message_number}: %{GREEDYDATA:message_text}")
.build();

Windows Multiline Parser

WindowsMultilineExtractionConfig windowsConfig = WindowsMultilineExtractionConfig.builder()
.timestampLocationType(TimestampLocationType.FIRST_LINE)
.config(Map.of("key", "value"))
.build();

Using FROM_FIELD Timestamp Location

WindowsMultilineExtractionConfig windowsConfig = WindowsMultilineExtractionConfig.builder()
.timestampLocationType(WindowsMultilineExtractionConfig.TimestampLocationType.FROM_FIELD)
.config(Map.of("target_field", "event_time"))
.build();

Syslog Parser

BSD Format

ParserConfigs.ParserConfig syslogConfig = ParserConfigs.ParserConfig.builder()
.targetField("__raw__")
.extractionConfig(SyslogExtractionConfig.builder()
.timestampPattern("MMM d HH:mm:ss")
.componentList(List.of(
SyslogExtractionConfig.ComponentType.TIMESTAMP,
SyslogExtractionConfig.ComponentType.DEVICE,
SyslogExtractionConfig.ComponentType.APP,
SyslogExtractionConfig.ComponentType.PROC_ID
))
.messageBodyDelimiter(':')
.build())
.build();

RFC5424 Format

ParserConfigs.ParserConfig syslog5424Config = ParserConfigs.ParserConfig.builder()
.targetField("log_message")
.extractionConfig(SyslogExtractionConfig.builder()
.timestampPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
.componentList(List.of(
SyslogExtractionConfig.ComponentType.PRIORITY,
SyslogExtractionConfig.ComponentType.VERSION,
SyslogExtractionConfig.ComponentType.TIMESTAMP,
SyslogExtractionConfig.ComponentType.DEVICE,
SyslogExtractionConfig.ComponentType.APP,
SyslogExtractionConfig.ComponentType.PROC_ID,
SyslogExtractionConfig.ComponentType.MSG_ID,
SyslogExtractionConfig.ComponentType.STRUCTURED_DATA
))
.build())
.build();

CEF Parser

ParserConfigs.ParserConfig cefConfig = ParserConfigs.ParserConfig.builder()
.targetField("__raw__")
.extractionConfig(new CefExtractionConfig())
.build();

Delimited Text (CSV) Parser

ParserConfigs.ParserConfig csvConfig = ParserConfigs.ParserConfig.builder()
.targetField("__raw__")
.extractionConfig(DelimitedTextExtractionConfig.builder()
.delimiter(",")
.columns(List.of("timestamp", "user_id", "action", "resource", "status"))
.build())
.build();

JSON Parser

ParserConfigs.ParserConfig jsonConfig = ParserConfigs.ParserConfig.builder()
.targetField("json_string")
.removeTargetField(true)
.extractionConfig(JsonExtractionConfig.builder()
.outputFieldName("event_data")
.build())
.build();

Key-Value Pair Parser

ParserConfigs.ParserConfig kvConfig = ParserConfigs.ParserConfig.builder()
.targetField("metadata")
.removeTargetField(true)
.extractionConfig(KvPairExtractionConfig.builder()
.pairSeparator(" ")
.kvSeparator("=")
.build())
.build();

Multi-Stage Parsing Pipeline

ZephFlow flow = ZephFlow.startFlow();

// Stage 1: Parse syslog header
ZephFlow stage1 = flow.fileSource("logs.txt", EncodingType.STRING_LINE)
.parse(ParserConfigs.ParserConfig.builder()
.targetField("__raw__")
.extractionConfig(SyslogExtractionConfig.builder()
.timestampPattern("MMM dd yyyy HH:mm:ss")
.componentList(List.of(
SyslogExtractionConfig.ComponentType.TIMESTAMP,
SyslogExtractionConfig.ComponentType.DEVICE,
SyslogExtractionConfig.ComponentType.APP
))
.messageBodyDelimiter(':')
.build())
.build());

// Stage 2: Parse application-specific format
ZephFlow stage2 = stage1.parse(ParserConfigs.ParserConfig.builder()
.targetField("content")
.removeTargetField(true)
.extractionConfig(GrokExtractionConfig.builder()
.grokExpression("%{WORD:severity}-%{INT:code}: %{GREEDYDATA:message}")
.build())
.build());

// Stage 3: Branch and parse message-specific details
ZephFlow type1Flow = stage2
.filter("$.code == '305011'")
.parse(ParserConfigs.ParserConfig.builder()
.targetField("message")
.extractionConfig(GrokExtractionConfig.builder()
.grokExpression("%{WORD:action} %{WORD:protocol} from %{IP:src_ip}/%{INT:src_port}")
.build())
.build());

ZephFlow type2Flow = stage2
.filter("$.code == '106023'")
.parse(ParserConfigs.ParserConfig.builder()
.targetField("message")
.extractionConfig(/* different pattern for this message type */)
.build());

// Merge branches and output
ZephFlow output = ZephFlow.merge(type1Flow, type2Flow)
.stdoutSink(EncodingType.JSON_OBJECT);