Skip to main content

Cisco ASA Log Processing with ZephFlow

This tutorial demonstrates how to build a data transformation pipeline using ZephFlow to process Cisco ASA firewall logs and convert them into a standardized format. The example code illustrates ZephFlow's powerful capabilities for log parsing, filtering, and transformation.

Introduction

Network security devices like Cisco ASA firewalls generate vast amounts of log data in proprietary formats that can be challenging to analyze. This example demonstrates how to use ZephFlow to:

  1. Parse Cisco ASA log messages
  2. Extract structured information from different message types
  3. Transform the data into a standardized format (OCSF - Open Cybersecurity Schema Framework)
  4. Output the normalized data for analysis or storage

The example handles multiple Cisco ASA message types (106023, 302013, 305011, etc.), each representing different network events like connection builds, teardowns, access denials, and more.

You can find the source code at https://github.com/fleaktech/zephflow-examples

Project Setup

Add ZephFlow to Your Project

First, set up your Java project with the ZephFlow sdk dependency:

implementation 'io.fleak.zephflow:sdk:0.2.0'

Understanding the Example

The example processes Cisco ASA logs through a series of transformation steps. Here's the high-level pipeline structure:

cisco-asa-to-ocsf-dag.png The pipeline follows this flow:

  1. Read log data from a file source
  2. Parse the syslog header (timestamp, device ID, etc.)
  3. Parse the ASA-specific header (message ID, severity, etc.)
  4. Branch into multiple processing paths based on message type
  5. For each message type:
  • Extract specific fields using Grok patterns
  • Transform data into a standardized format
  1. Output the transformed data as JSON

How the Pipeline Works

Let's examine the key components:

Source Configuration

The pipeline begins by setting up a file source that reads Cisco ASA log data:

ZephFlow flow = ZephFlow.startFlow();
var inputFlow = flow.fileSource(absolutePath, EncodingType.STRING_LINE);

The EncodingType.STRING_LINE tells the file source node to treat every text line as a separate event. Every string event is put into a reserved field called __raw__.

A log message Oct 10 2018 12:34:56 localhost CiscoASA[999]: %ASA-6-305011: Built dynamic TCP translation from inside:172.31.98.44/1772 to outside:100.66.98.44/8256 will become:

{
"__raw__": "Oct 10 2018 12:34:56 localhost CiscoASA[999]: %ASA-6-305011: Built dynamic TCP translation from inside:172.31.98.44/1772 to outside:100.66.98.44/8256"
}

Syslog Header Parsing

info

For more info about the parser node, check the Parser Node Reference.

The first stage parses the syslog header structure:

var syslogHeaderParsedFlow =
inputFlow.parse(
ParserConfigs.ParserConfig.builder()
.targetField(MiscUtils.FIELD_NAME_RAW) // field name: __raw__
.extractionConfig(
SyslogExtractionConfig.builder()
.componentList(
List.of(
SyslogExtractionConfig.ComponentType.TIMESTAMP,
SyslogExtractionConfig.ComponentType.DEVICE,
SyslogExtractionConfig.ComponentType.APP))
.messageBodyDelimiter(':')
.timestampPattern("MMM dd yyyy HH:mm:ss")
.build())
.build());

The above code attaches a Parser node with Syslog Extraction configuration to the DAG. This extracts the syslog headers:

  • Timestamp (e.g., "Oct 10 2023 12:34:56")
  • Device ID (e.g., "firewall1")
  • Application name (e.g., "CiscoASA[123]")

The above example log will become:

{
"__raw__": "Oct 10 2018 12:34:56 localhost CiscoASA[999]: %ASA-6-305011: Built dynamic TCP translation from inside:172.31.98.44/1772 to outside:100.66.98.44/8256",
"deviceId": "localhost",
"appName": "CiscoASA[999]",
"content": "%ASA-6-305011: Built dynamic TCP translation from inside:172.31.98.44/1772 to outside:100.66.98.44/8256",
"timestamp": "Oct 10 2018 12:34:56"
}

ASA Header Parsing

The second stage parses the Cisco ASA specific header. This time it's using the Grok extraction configuration:

var asaHeaderParsedFlow =
syslogHeaderParsedFlow.parse(
ParserConfigs.ParserConfig.builder()
.targetField(SyslogExtractionRule.LOG_CONTENT_KEY) // field name: content
.removeTargetField(true)
.extractionConfig(
GrokExtractionConfig.builder()
.grokExpression(
"%ASA-%{INT:level}-%{INT:message_number}: %{GREEDYDATA:message_text}")
.build())
.build());

This extracts:

  • Message severity level (e.g., "6")
  • Message number (e.g., "302013")
  • Message text (the remaining content)

This will transform the log into:

{
"message_text": "Built dynamic TCP translation from inside:172.31.98.44/1772 to outside:100.66.98.44/8256",
"level": "6",
"appName": "CiscoASA[999]",
"message_number": "305011",
"__raw__": "Oct 10 2018 12:34:56 localhost CiscoASA[999]: %ASA-6-305011: Built dynamic TCP translation from inside:172.31.98.44/1772 to outside:100.66.98.44/8256",
"deviceId": "localhost",
"timestamp": "Oct 10 2018 12:34:56"
}

Message-Type Specific Processing

Up to this point, we have extracted the message type (number) for each log. For each message type, the pipeline branches into dedicated processing paths. Here's an example for message 106023:

var msg305011Flow = asaHeaderParsedFlow
.filter("$.message_number=='305011'")
.parse(
ParserConfigs.ParserConfig.builder()
.targetField("message_text")
.extractionConfig(
GrokExtractionConfig.builder()
.grokExpression(
"%{WORD:action} %{WORD:translation_type} %{WORD:protocol} translation from %{WORD:source_interface}:%{IP:source_ip}/%{INT:source_port} to %{WORD:dest_interface}:%{IP:dest_ip}/%{INT:dest_port}")
.build())
.build())
.eval("dict(...detailed transformation...)");

Each branch:

  1. Filters for a specific message type
  2. Parses the message text with a message-specific Grok pattern and produces a fully parsed log entry:
{
"level": "6",
"dest_interface": "outside",
"appName": "CiscoASA[999]",
"message_number": "305011",
"__raw__": "Oct 10 2018 12:34:56 localhost CiscoASA[999]: %ASA-6-305011: Built dynamic TCP translation from inside:172.31.98.44/1772 to outside:100.66.98.44/8256",
"deviceId": "localhost",
"source_ip": "172.31.98.44",
"translation_type": "dynamic",
"message_text": "Built dynamic TCP translation from inside:172.31.98.44/1772 to outside:100.66.98.44/8256",
"protocol": "TCP",
"source_interface": "inside",
"source_port": "1772",
"dest_ip": "100.66.98.44",
"action": "Built",
"dest_port": "8256",
"timestamp": "Oct 10 2018 12:34:56"
}

  1. Transforms the parsed log using the Fleak Eval Expression Language (FEEL) into OCSF format:
{
"metadata": {
"log_level": "6",
"product": {
"name": "CiscoASA",
"vendor_name": "Cisco"
},
"logged_time": 1539174896000,
"version": "1.4.0",
"log_name": "ASA",
"event_code": "305011"
},
"category_uid": 4,
"status_code": "305011",
"src_endpoint": {
"interface_name": "inside",
"port": "1772",
"type_id": 1,
"ip": "172.31.98.44"
},
"message": "Built dynamic TCP translation from inside:172.31.98.44/1772 to outside:100.66.98.44/8256",
"url": {
"scheme": "tcp",
"port": 8256,
"hostname": "100.66.98.44"
},
"status_detail": "Built dynamic TCP translation from inside:172.31.98.44/1772 to outside:100.66.98.44/8256",
"proxy": {
"interface_name": "outside",
"type": "Server",
"port": 8256,
"type_id": 1,
"ip": "100.66.98.44"
},
"status_id": 1,
"connection_info": {
"boundary": "External",
"boundary_id": 3,
"protocol_num": 6,
"direction_id": 2,
"direction": "Outbound",
"protocol_name": "tcp"
},
"class_uid": 4001,
"activity_id": 1,
"time": 1539174896000,
"severity_id": 1,
"dst_endpoint": {
"interface_name": "outside",
"port": "8256",
"type_id": 1,
"ip": "100.66.98.44"
},
"type_uid": 4001001,
"status": "Success"
}

info

The complex eval expressions in this example were generated using the free Fleak OCSF mapping app, which helps create data transformations to the Open Cybersecurity Schema Framework.

Merging and Output

Finally, all the branches are merged and sent to a stdout sink:

var ocsfFlow =
ZephFlow.merge(
msg106023Flow,
msg113019Flow,
msg113039Flow,
// ... other flows ...
);
var outputFlow = ocsfFlow.stdoutSink(EncodingType.JSON_OBJECT);
outputFlow.execute("job_id","test_env","test_service");

Change the Source and Destination

ZephFlow supports interacting with other data source/destination systems. For example, you can read from a source Kafka topic and write to a destination topic:

// Read from Kafka
ZephFlow flow = ZephFlow.startFlow()
.kafkaSource(
"broker-address:9092", // Kafka broker address
"topic-name", // Topic to consume from
"consumer-group-id", // Consumer group ID
EncodingType.JSON_OBJECT, // Encoding type of messages
null // Additional Kafka properties (optional)
);


// Output to Kafka
var outputFlow = ocsfFlow.kafkaSink(
"broker-address:9092", // Kafka broker address
"output-topic", // Topic to publish to
null, // Partition key expression (optional)
EncodingType.JSON_OBJECT, // Encoding type for messages
null // Additional Kafka properties (optional)
);

Conclusion

This example demonstrates ZephFlow's powerful capabilities for log processing pipelines. By combining filtering, parsing, and transformation operations in a flexible DAG structure, you can build sophisticated data processing workflows.

The modular nature of ZephFlow allows you to:

  • Process multiple message types with specialized handling
  • Extract structured data from unstructured logs
  • Transform data into standardized formats
  • Output the results to various destinations

For more information on ZephFlow and its capabilities, refer to the ZephFlow documentation.