13 min read

Threat Hunting ICS Communication With Zeek

Threat Hunting ICS Communication With Zeek

The Evolving ICS Threat Landscape

Industrial Control Systems (ICS) face increasingly sophisticated cyber threats, with recent CISA alerts indicating a 300% increase in targeted attacks against critical infrastructure. Understanding these threats is crucial for developing effective monitoring strategies using Zeek's capabilities.

Current ICS Threat Actors

  • Nation-State Groups: Targeting critical infrastructure for strategic advantages
  • Cybercriminal Organizations: Focusing on ransomware and extortion
  • Hacktivists: Pursuing politically motivated disruptions
  • Insider Threats: Exploiting privileged access to systems

Common ICS Attack Vectors

According to Dragos's Year in Review, threat actors frequently exploit these vectors:

Network-Based Attacks

  • Protocol manipulation and abuse
  • Man-in-the-middle attacks on industrial protocols
  • Unauthorized command injection
  • Process variable manipulation
  • Denial of Service (DoS) on critical systems

Process Manipulation Techniques


# Common Attack Patterns
1. Command Injection:
   - Modbus Function Code Manipulation
   - DNP3 Control Operation Abuse
   - Unauthorized Write Commands

2. Process Tampering:
   - Setpoint Modifications
   - Sensor Value Manipulation
   - Control Loop Interference

3. Protocol Abuse:
   - Invalid Packet Sequences
   - Protocol Stack Violations
   - Timing Attacks

Zeek's ICS Monitoring Capabilities

Zeek provides comprehensive visibility into ICS networks through specialized protocol analyzers and logging capabilities:

Supported Industrial Protocols


# Primary ICS Protocols Monitored by Zeek
Protocol    | Key Capabilities
-----------|-----------------
Modbus     | Function codes, register access, command validation
DNP3       | Control operations, data acquisition, authentication
BACnet     | Building automation, device discovery, service requests
S7comm     | PLC communications, program downloads, diagnostic data
EtherNet/IP| CIP services, implicit/explicit messaging
OPC UA     | Information modeling, service monitoring

ICS Threat Hunting Objectives

Primary Detection Goals

  • Process Manipulation: Identify unauthorized changes to industrial processes
  • Command & Control: Detect malicious control operations
  • Reconnaissance: Monitor for system enumeration attempts
  • Protocol Abuse: Identify violations of protocol specifications

Key Monitoring Metrics


# Essential ICS Monitoring Parameters
Category           | Metrics to Monitor
-------------------|-------------------
Command Patterns   | Frequency, timing, source
Process Variables  | Rate of change, value ranges
Protocol Behavior  | Compliance, sequence validity
Network Traffic    | Flow characteristics, peer relationships
Authentication    | Access patterns, credential usage

Threat Intelligence Integration

Effective ICS threat hunting requires integration with current threat intelligence:

Intelligence Sources

  • CISA Advisories: Latest vulnerability and threat information
  • Vendor Bulletins: Product-specific security updates
  • Industry ISACs: Sector-specific threat intelligence
  • Security Research: New attack techniques and vulnerabilities

Threat Indicators


# Key Indicator Categories
Type                    | Examples
------------------------|----------
Network Indicators      | IP addresses, domains, protocols
Behavioral Indicators   | Command sequences, timing patterns
Process Indicators      | Value ranges, state transitions
System Indicators      | Configuration changes, authentication attempts

Baseline Establishment

Establishing normal operational baselines is crucial for effective threat hunting:

Baseline Components

  • Normal protocol usage patterns
  • Typical command sequences
  • Expected process variable ranges
  • Standard communication paths
  • Regular maintenance windows

Sources

Core ICS Protocol Log Analysis

Understanding the structure and content of ICS protocol logs is fundamental for effective threat hunting. According to Zeek's protocol documentation, each log type captures specific aspects of industrial communications that are crucial for security monitoring.

Modbus Protocol Logging

modbus.log Structure


# Key Fields in modbus.log
{
    "ts": "timestamp",
    "uid": "unique_identifier",
    "id.orig_h": "source_ip",
    "id.orig_p": "source_port",
    "id.resp_h": "destination_ip",
    "id.resp_p": "destination_port",
    "unit_id": "device_identifier",
    "func": "function_code",
    "exception": "exception_code",
    "start_reg": "starting_register",
    "num_reg": "register_count",
    "values": "register_values"
}

Critical Modbus Indicators

  • Function Codes:
    • 01/02: Read Coils/Inputs
    • 03/04: Read Holding/Input Registers
    • 05/06: Write Single Coil/Register
    • 15/16: Write Multiple Coils/Registers
  • Anomaly Indicators:
    • Unauthorized write operations
    • Invalid register access
    • Excessive polling
    • Out-of-sequence operations

DNP3 Protocol Logging

dnp3.log Structure


# Key Fields in dnp3.log
{
    "ts": "timestamp",
    "uid": "unique_identifier",
    "id.orig_h": "source_ip",
    "id.resp_h": "destination_ip",
    "fc_request": "function_code_request",
    "fc_reply": "function_code_reply",
    "iin": "internal_indication_flags",
    "object_type": "data_type",
    "range_low": "start_index",
    "range_high": "end_index"
}

DNP3 Security Indicators


# Common DNP3 Attack Patterns
Function Code | Description           | Suspicious Activity
-------------|----------------------|--------------------
0x01         | Read                 | Excessive polling
0x02         | Write                | Unauthorized writes
0x05         | Direct Operate       | Command injection
0x0F         | Cold Restart         | DoS attempt
0x10         | Warm Restart         | System disruption

BACnet Protocol Logging

bacnet.log Structure


# Key Fields in bacnet.log
{
    "ts": "timestamp",
    "uid": "unique_identifier",
    "service": "service_type",
    "object_type": "device_object_type",
    "instance_number": "object_instance",
    "property": "property_identifier",
    "value": "property_value",
    "result": "service_result"
}

BACnet Monitoring Focus

  • Critical Services:
    • WriteProperty
    • WritePropertyMultiple
    • DeviceCommunicationControl
    • ReinitializeDevice
  • Security Events:
    • Unauthorized property writes
    • Device control attempts
    • Configuration changes
    • Authentication failures

S7comm Protocol Logging

s7comm.log Structure


# Key Fields in s7comm.log
{
    "ts": "timestamp",
    "uid": "unique_identifier",
    "rosctr": "remote_operation_control",
    "parameter": "parameter_code",
    "parameter_value": "parameter_data",
    "data_type": "data_type_code",
    "bytes": "data_length",
    "response": "response_code"
}

S7comm Security Analysis


# Critical Operation Types
Operation    | Risk Level | Detection Focus
-------------|------------|----------------
READ_VAR     | Medium     | Data exfiltration
WRITE_VAR    | High      | Process manipulation
START/STOP   | Critical  | System disruption
DOWNLOAD     | Critical  | Program modification
UPLOAD       | High      | Logic theft

Advanced Log Analysis Techniques

Cross-Protocol Correlation


# Example correlation pattern
event zeek_init()
{
    local r1 = SumStats::Reducer($stream="modbus_ops",
                                $apply=set(SumStats::UNIQUE));
    local r2 = SumStats::Reducer($stream="dnp3_ops",
                                $apply=set(SumStats::UNIQUE));
    
    SumStats::create([
        $name="protocol_correlation",
        $epoch=10min,
        $reducers=set(r1, r2),
        $threshold=10.0,
        $threshold_val(key: SumStats::Key, result: SumStats::Result) =
            result["modbus_ops"]$unique + result["dnp3_ops"]$unique,
        $threshold_crossed(key: SumStats::Key, result: SumStats::Result) =
            {
            NOTICE([$note=MultiProtocol_Activity,
                   $msg="Correlated protocol activity detected"]);
            }
    ]);
}

Pattern Recognition

  • Command sequence analysis
  • Timing pattern detection
  • Value range monitoring
  • Protocol state tracking
  • Error response analysis

Log Retention and Analysis

Implement effective log management strategies:

  • Maintain logs for at least 90 days
  • Implement log rotation policies
  • Enable compressed storage for historical analysis
  • Implement secure log transmission

Sources

Advanced ICS Log Analysis Framework

Implementing sophisticated log analysis techniques is crucial for detecting advanced threats in ICS environments. According to recent CISA advisories, threat actors are employing increasingly sophisticated techniques to target industrial systems.

Command and Control Pattern Analysis

Detecting Abnormal Command Sequences


# Implementation of command sequence analysis
module CommandSequenceAnalysis;

export {
    type CommandSequence: record {
        commands: vector of string;
        timestamps: vector of time;
        source_ips: set[addr];
    };

    global device_sequences: table[addr] of CommandSequence &create_expire=1hr;
    
    # Define known legitimate command sequences
    const legitimate_sequences: set[string] = {
        "read-read-write",
        "read-write-read",
        "configure-write-read"
    } &redef;
}

function analyze_command_sequence(device: addr, command: string, source: addr)
{
    if (device !in device_sequences) {
        device_sequences[device] = CommandSequence(
            $commands=vector(),
            $timestamps=vector(),
            $source_ips=set()
        );
    }

    local seq = device_sequences[device];
    seq$commands[|seq$commands|] = command;
    seq$timestamps[|seq$timestamps|] = current_time();
    add seq$source_ips[source];

    if (|seq$commands| >= 3) {
        local sequence_str = join_string_vec(seq$commands, "-");
        
        if (sequence_str !in legitimate_sequences) {
            NOTICE([
                $note=ICS::Suspicious_Command_Sequence,
                $msg=fmt("Suspicious command sequence detected: %s from %s to %s",
                        sequence_str, source, device)
            ]);
        }
    }
}

Protocol State Analysis

State Transition Monitoring


# Monitor protocol state transitions
module ProtocolStateAnalysis;

export {
    type ProtocolState: record {
        current_state: string;
        last_transition: time;
        transition_count: count;
        invalid_transitions: count;
    };

    global device_states: table[addr] of ProtocolState &create_expire=2hrs;

    # Define valid state transitions
    const valid_transitions: table[string] of set[string] = {
        ["IDLE"] = set("CONNECTING", "READING"),
        ["CONNECTING"] = set("CONNECTED", "IDLE"),
        ["CONNECTED"] = set("READING", "WRITING", "IDLE"),
        ["READING"] = set("CONNECTED", "WRITING", "IDLE"),
        ["WRITING"] = set("CONNECTED", "READING", "IDLE")
    } &redef;
}

function check_state_transition(device: addr, new_state: string)
{
    if (device !in device_states) {
        device_states[device] = ProtocolState($current_state="IDLE",
                                            $last_transition=current_time(),
                                            $transition_count=0,
                                            $invalid_transitions=0);
    }

    local state = device_states[device];
    
    if (state$current_state !in valid_transitions || 
        new_state !in valid_transitions[state$current_state]) {
        state$invalid_transitions += 1;
        
        if (state$invalid_transitions >= 3) {
            NOTICE([
                $note=ICS::Invalid_State_Transition,
                $msg=fmt("Multiple invalid state transitions detected for %s", 
                        device)
            ]);
        }
    }

    state$current_state = new_state;
    state$last_transition = current_time();
    state$transition_count += 1;
}

Temporal Pattern Analysis

Activity Timing Analysis


# Analyze temporal patterns in ICS activities
module TemporalAnalysis;

export {
    type TimeProfile: record {
        hourly_patterns: vector of count;
        last_update: time;
        anomaly_count: count;
    };

    global device_timing: table[addr] of TimeProfile &create_expire=7days;
}

function analyze_temporal_pattern(device: addr, activity_time: time)
{
    if (device !in device_timing) {
        device_timing[device] = TimeProfile(
            $hourly_patterns=vector(24) of 0,
            $last_update=current_time(),
            $anomaly_count=0
        );
    }

    local profile = device_timing[device];
    local current_hour = strftime("%H", activity_time);
    local hour_index = to_count(current_hour);

    profile$hourly_patterns[hour_index] += 1;

    # Check for activity during unusual hours
    if (hour_index < 6 || hour_index > 18) {
        profile$anomaly_count += 1;
        
        if (profile$anomaly_count >= 5) {
            NOTICE([
                $note=ICS::Unusual_Timing_Pattern,
                $msg=fmt("Unusual activity timing detected for device %s", device)
            ]);
        }
    }
}

Value Range Analysis

Process Variable Monitoring


# Monitor process variable changes
module ValueAnalysis;

export {
    type ValueProfile: record {
        min_value: double;
        max_value: double;
        last_value: double;
        rapid_changes: count;
    };

    global value_profiles: table[string] of ValueProfile &create_expire=1day;
}

function analyze_value_change(tag_name: string, new_value: double)
{
    if (tag_name !in value_profiles) {
        value_profiles[tag_name] = ValueProfile($min_value=new_value,
                                              $max_value=new_value,
                                              $last_value=new_value,
                                              $rapid_changes=0);
        return;
    }

    local profile = value_profiles[tag_name];
    local value_change = |new_value - profile$last_value|;
    local value_range = profile$max_value - profile$min_value;

    # Check for rapid value changes
    if (value_change > (value_range * 0.2)) {
        profile$rapid_changes += 1;
        
        if (profile$rapid_changes >= 3) {
            NOTICE([
                $note=ICS::Rapid_Value_Change,
                $msg=fmt("Rapid value changes detected for %s", tag_name)
            ]);
        }
    }

    profile$last_value = new_value;
    profile$min_value = new_value < profile$min_value ? new_value : profile$min_value;
    profile$max_value = new_value > profile$max_value ? new_value : profile$max_value;
}

Protocol Violation Detection

Protocol Conformance Checking

  • Message format validation
  • Field value range checking
  • Protocol sequence verification
  • Timing requirement validation
  • Authentication verification

Sources

CISA's ICS Incident Response Playbook, proper documentation and evidence handling are crucial for successful incident resolution.

Initial Investigation Framework

Evidence Collection System


# Implement evidence collection framework
module IncidentEvidence;

export {
    type EvidenceItem: record {
        timestamp: time;
        source: string;
        type: string;
        data: string;
        hash: string;
        metadata: table[string] of string;
    };

    global evidence_collection: table[string] of vector of EvidenceItem;
}

function collect_evidence(incident_id: string, evidence: EvidenceItem)
{
    # Generate evidence hash
    evidence$hash = md5_hash(cat(
        strftime("%Y-%m-%d %H:%M:%S", evidence$timestamp),
        evidence$source,
        evidence$data
    ));

    if (incident_id !in evidence_collection)
        evidence_collection[incident_id] = vector();

    evidence_collection[incident_id][|evidence_collection[incident_id]|] = evidence;

    # Log evidence collection
    NOTICE([
        $note=ICS::Evidence_Collected,
        $msg=fmt("Evidence collected for incident %s: %s",
                incident_id, evidence$type)
    ]);
}

Root Cause Analysis

Causal Chain Analysis


# Implement root cause analysis system
module RootCauseAnalysis;

export {
    type CausalEvent: record {
        timestamp: time;
        event_type: string;
        description: string;
        severity: count;
        related_events: set[string];
    };

    global causal_chains: table[string] of vector of CausalEvent;
}

function analyze_causal_chain(incident_id: string, event: CausalEvent)
{
    if (incident_id !in causal_chains)
        causal_chains[incident_id] = vector();

    causal_chains[incident_id][|causal_chains[incident_id]|] = event;

    # Analyze event relationships
    if (|causal_chains[incident_id]| >= 3) {
        local chain = "";
        for (idx in causal_chains[incident_id])
            chain = string_cat(chain, 
                             causal_chains[incident_id][idx]$event_type,
                             " → ");

        NOTICE([
            $note=ICS::Causal_Chain_Identified,
            $msg=fmt("Causal chain identified for incident %s: %s",
                    incident_id, chain)
        ]);
    }
}

Incident Documentation

Documentation Management


# Implement incident documentation system
module IncidentDocumentation;

export {
    type IncidentReport: record {
        incident_id: string;
        timestamp: time;
        classification: string;
        description: string;
        affected_systems: set[addr];
        timeline: vector of string;
        evidence_refs: set[string];
        mitigation_steps: vector of string;
    };

    global incident_reports: table[string] of IncidentReport;
}

function document_incident(report: IncidentReport)
{
    # Validate required fields
    if (|report$affected_systems| == 0 ||
        |report$timeline| == 0 ||
        |report$mitigation_steps| == 0) {
        NOTICE([
            $note=ICS::Incomplete_Documentation,
            $msg=fmt("Incident report %s missing required information",
                    report$incident_id)
        ]);
        return;
    }

    incident_reports[report$incident_id] = report;

    # Generate summary
    local summary = fmt("Incident %s documented:\n", report$incident_id);
    summary = string_cat(summary, 
                        "Classification: ", report$classification, "\n",
                        "Affected Systems: ", |report$affected_systems|, "\n",
                        "Timeline Events: ", |report$timeline|, "\n",
                        "Mitigation Steps: ", |report$mitigation_steps|);

    NOTICE([
        $note=ICS::Incident_Documented,
        $msg=summary
    ]);
}

Lessons Learned Framework

Improvement Tracking System


# Implement lessons learned tracking
module LessonsLearned;

export {
    type LessonLearned: record {
        incident_id: string;
        category: string;
        description: string;
        recommendations: vector of string;
        implementation_status: string;
        priority: count;
    };

    global lessons_database: vector of LessonLearned;
}

function track_lesson(lesson: LessonLearned)
{
    lessons_database[|lessons_database|] = lesson;

    # Prioritize implementations
    if (lesson$priority >= 4) {
        NOTICE([
            $note=ICS::High_Priority_Lesson,
            $msg=fmt("High-priority lesson identified from incident %s: %s",
                    lesson$incident_id, lesson$description)
        ]);
    }

    # Track implementation status
    if (lesson$implementation_status == "pending" && 
        lesson$priority >= 3) {
        schedule 7days { check_lesson_implementation(lesson) };
    }
}

function check_lesson_implementation(lesson: LessonLearned)
{
    if (lesson$implementation_status == "pending") {
        NOTICE([
            $note=ICS::Lesson_Implementation_Overdue,
            $msg=fmt("Implementation of lesson from incident %s is overdue",
                    lesson$incident_id)
        ]);
    }
}

Continuous Improvement Process

Metrics and Analysis

Process Enhancement Steps


# Key Improvement Areas
1. Detection Capabilities
   - Rule effectiveness
   - Coverage gaps
   - False positive reduction

2. Response Procedures
   - Communication efficiency
   - Team coordination
   - Tool integration

3. Documentation Quality
   - Completeness
   - Accuracy
   - Accessibility

4. Training Requirements
   - Skill gaps
   - Knowledge transfer
   - Procedure updates

Sources

ICS Threat Hunting Troubleshooting Framework

Effective troubleshooting in ICS environments requires systematic approaches to common challenges. According to SANS ICS Security Research, proper handling of technical challenges is crucial for maintaining reliable threat detection.

Encrypted Traffic Analysis

TLS/SSL Traffic Handler


# Implement encrypted traffic analysis
module EncryptedTrafficAnalysis;

export {
    type EncryptedSession: record {
        start_time: time;
        cipher_suite: string;
        certificate_info: string;
        ja3_fingerprint: string;
        bytes_in: count;
        bytes_out: count;
    };

    global encrypted_sessions: table[conn_id] of EncryptedSession;
}

function analyze_encrypted_traffic(c: connection, ssl: SSL::Info)
{
    if (c$id !in encrypted_sessions) {
        encrypted_sessions[c$id] = EncryptedSession(
            $start_time=network_time(),
            $cipher_suite=ssl$cipher,
            $certificate_info=ssl$cert_subject,
            $ja3_fingerprint=ssl$ja3,
            $bytes_in=0,
            $bytes_out=0
        );
    }

    # Update byte counts
    encrypted_sessions[c$id]$bytes_in += c$orig$size;
    encrypted_sessions[c$id]$bytes_out += c$resp$size;

    # Analyze suspicious patterns
    if (ssl$ja3 !in known_good_ja3_hashes) {
        NOTICE([
            $note=ICS::Suspicious_SSL_Fingerprint,
            $msg=fmt("Unknown JA3 fingerprint detected: %s", ssl$ja3),
            $conn=c
        ]);
    }
}

Proprietary Protocol Handler

Protocol Variation Management


# Handle proprietary protocol variations
module ProprietaryProtocolHandler;

export {
    type ProtocolVariant: record {
        vendor: string;
        version: string;
        parsing_rules: vector of pattern;
        field_mappings: table[string] of string;
        known_deviations: set[string];
    };

    global protocol_variants: table[string] of ProtocolVariant;
}

function handle_proprietary_protocol(data: string, protocol_id: string): bool
{
    if (protocol_id !in protocol_variants)
        return F;

    local variant = protocol_variants[protocol_id];
    
    # Try each parsing rule
    for (idx in variant$parsing_rules) {
        if (variant$parsing_rules[idx] in data) {
            process_variant_data(data, variant);
            return T;
        }
    }

    # Log parsing failure
    NOTICE([
        $note=ICS::Protocol_Parse_Failure,
        $msg=fmt("Failed to parse proprietary protocol: %s", protocol_id)
    ]);
    
    return F;
}

function process_variant_data(data: string, variant: ProtocolVariant)
{
    # Apply vendor-specific field mappings
    local fields: table[string] of string;
    
    for ([field_name, field_pattern] in variant$field_mappings) {
        local match = find_all(data, pattern_compile(field_pattern));
        if (|match| > 0)
            fields[field_name] = match[0];
    }

    analyze_fields(fields, variant);
}

High-Volume Data Management

Data Volume Optimization


# Implement data volume management
module DataVolumeManager;

export {
    type VolumeMetrics: record {
        current_rate: double;
        peak_rate: double;
        total_processed: count;
        sampling_active: bool;
    };

    global volume_metrics: VolumeMetrics;
    const RATE_THRESHOLD = 10000.0; # Events per second
}

function manage_data_volume(current_rate: double)
{
    volume_metrics$current_rate = current_rate;
    
    if (current_rate > volume_metrics$peak_rate)
        volume_metrics$peak_rate = current_rate;

    # Implement adaptive sampling
    if (current_rate > RATE_THRESHOLD && !volume_metrics$sampling_active) {
        activate_sampling();
        NOTICE([
            $note=ICS::High_Volume_Sampling,
            $msg=fmt("Activated sampling due to high data rate: %.2f eps", 
                    current_rate)
        ]);
    }
}

function activate_sampling()
{
    volume_metrics$sampling_active = T;
    
    # Calculate sampling rate based on current load
    local sampling_rate = RATE_THRESHOLD / volume_metrics$current_rate;
    sampling_rate = sampling_rate < 0.1 ? 0.1 : sampling_rate;

    set_sampling_rate(sampling_rate);
}

False Positive Reduction

Alert Validation System


# Implement false positive reduction
module AlertValidator;

export {
    type AlertValidation: record {
        confidence_score: double;
        context_matches: count;
        historical_matches: count;
        validation_rules: set[string];
    };

    global alert_validations: table[string] of AlertValidation;
}

function validate_alert(alert_id: string, context: table[string] of string): bool
{
    if (alert_id !in alert_validations)
        return F;

    local validation = alert_validations[alert_id];
    local score = 0.0;

    # Check context matches
    for (rule in validation$validation_rules) {
        if (rule in context) {
            validation$context_matches += 1;
            score += 0.2;
        }
    }

    # Check historical patterns
    if (validation$historical_matches > 0)
        score += 0.3;

    # Update confidence score
    validation$confidence_score = score;

    return score >= 0.7;
}

Protocol Parsing Error Handling

Error Recovery System


# Implement protocol parsing error recovery
module ParsingErrorHandler;

export {
    type ParsingError: record {
        error_type: string;
        error_count: count;
        last_occurrence: time;
        recovery_attempts: count;
    };

    global parsing_errors: table[string] of ParsingError;
}

function handle_parsing_error(protocol: string, error: string): bool
{
    local error_id = cat(protocol, "-", error);
    
    if (error_id !in parsing_errors) {
        parsing_errors[error_id] = ParsingError(
            $error_type=error,
            $error_count=0,
            $last_occurrence=network_time(),
            $recovery_attempts=0
        );
    }

    local err = parsing_errors[error_id];
    err$error_count += 1;
    err$last_occurrence = network_time();

    # Implement recovery strategy
    if (err$recovery_attempts < 3) {
        err$recovery_attempts += 1;
        return attempt_recovery(protocol, error);
    }

    NOTICE([
        $note=ICS::Parsing_Error_Threshold,
        $msg=fmt("Multiple parsing errors for %s: %s", protocol, error)
    ]);
    
    return F;
}

CISA Joint Advisory on ICS Threats

Enhanced ICS Log Correlation Framework

Effective threat detection in ICS environments requires comprehensive correlation of multiple data sources. According to SANS ICS Security Research, integrated analysis can significantly improve threat detection accuracy.

Network Flow Analysis Integration

Flow Correlation Engine


# Implement network flow correlation
module FlowCorrelation;

export {
    type FlowProfile: record {
        start_time: time;
        last_seen: time;
        bytes_in: count;
        bytes_out: count;
        conn_state: string;
        protocols: set[string];
    };

    global device_flows: table[addr] of FlowProfile &create_expire=1day;
}

event connection_state_remove(c: connection)
{
    local device = c$id$resp_h;
    
    if (device !in device_flows) {
        device_flows[device] = FlowProfile(
            $start_time=c$start_time,
            $last_seen=c$start_time,
            $bytes_in=0,
            $bytes_out=0,
            $conn_state=c$conn$conn_state,
            $protocols=set()
        );
    }

    device_flows[device]$bytes_in += c$orig$size;
    device_flows[device]$bytes_out += c$resp$size;
    device_flows[device]$last_seen = c$start_time;
    add device_flows[device]$protocols[c$proto];

    # Analyze flow patterns
    if (|device_flows[device]$protocols| > 2) {
        NOTICE([
            $note=ICS::Unusual_Protocol_Mix,
            $msg=fmt("Device %s using multiple protocols: %s",
                    device,
                    join_string_set(device_flows[device]$protocols, ", "))
        ]);
    }
}

Asset Inventory Correlation

Asset Context Integration


# Asset context correlation system
module AssetCorrelation;

export {
    type AssetProfile: record {
        asset_type: string;
        authorized_protocols: set[string];
        authorized_peers: set[addr];
        operational_hours: set[count];
        criticality: string;
    };

    global asset_inventory: table[addr] of AssetProfile;
}

function load_asset_inventory()
{
    # Example asset profile
    asset_inventory[192.168.1.100] = AssetProfile(
        $asset_type="PLC",
        $authorized_protocols=set("MODBUS", "S7COMM"),
        $authorized_peers=set(192.168.1.10, 192.168.1.20),
        $operational_hours=set(8, 9, 10, 11, 12, 13, 14, 15, 16),
        $criticality="HIGH"
    );
}

function check_asset_compliance(device: addr, protocol: string, 
                              peer: addr, hour: count)
{
    if (device in asset_inventory) {
        local profile = asset_inventory[device];
        
        if (protocol !in profile$authorized_protocols ||
            peer !in profile$authorized_peers ||
            hour !in profile$operational_hours) {
            
            NOTICE([
                $note=ICS::Asset_Policy_Violation,
                $msg=fmt("Policy violation for %s asset: %s",
                        profile$asset_type, device)
            ]);
        }
    }
}

Behavioral Baseline Integration

Behavioral Pattern Analysis


# Behavioral baseline monitoring system
module BehaviorAnalysis;

export {
    type BehaviorProfile: record {
        command_frequencies: table[string] of count;
        value_ranges: table[string] of interval;
        timing_patterns: vector of count;
        peer_relationships: set[addr];
    };

    global behavior_baselines: table[addr] of BehaviorProfile;
}

function analyze_behavior(device: addr, command: string, 
                        value: double, peer: addr)
{
    if (device !in behavior_baselines) {
        behavior_baselines[device] = BehaviorProfile(
            $command_frequencies=table(),
            $value_ranges=table(),
            $timing_patterns=vector(24) of 0,
            $peer_relationships=set()
        );
    }

    local profile = behavior_baselines[device];
    local current_hour = strftime("%H", current_time());

    # Update behavior profile
    if (command !in profile$command_frequencies)
        profile$command_frequencies[command] = 0;
    profile$command_frequencies[command] += 1;

    # Check for behavioral anomalies
    if (profile$command_frequencies[command] > 100) {
        NOTICE([
            $note=ICS::Command_Frequency_Anomaly,
            $msg=fmt("Unusual command frequency for device %s: %s",
                    device, command)
        ]);
    }
}

Alert Correlation Engine

Multi-Source Alert Correlation


# Alert correlation system
module AlertCorrelation;

export {
    type AlertContext: record {
        timestamp: time;
        alert_type: string;
        severity: string;
        source_data: string;
        related_assets: set[addr];
    };

    global alert_queue: vector of AlertContext;
    global correlation_window = 5min &redef;
}

function correlate_alerts()
{
    local current_time = current_time();
    local window_start = current_time - correlation_window;
    local relevant_alerts = vector();

    # Gather relevant alerts within time window
    for (idx in alert_queue) {
        if (alert_queue[idx]$timestamp >= window_start)
            relevant_alerts[|relevant_alerts|] = alert_queue[idx];
    }

    # Look for alert patterns
    if (|relevant_alerts| >= 3) {
        local asset_involvement = set();
        for (alert in relevant_alerts)
            for (asset in alert$related_assets)
                add asset_involvement[asset];

        if (|asset_involvement| >= 2) {
            NOTICE([
                $note=ICS::Correlated_Attack_Pattern,
                $msg="Multiple alerts across different assets detected"
            ]);
        }
    }
}

Context Enhancement Strategies

Data Enrichment Sources

Integration Points

Sources

Advanced ICS Threat Hunting Framework

Implementing sophisticated threat hunting techniques requires comprehensive query patterns and detection methodologies. According to CISA's latest advisory, advanced persistent threats are increasingly targeting ICS environments with sophisticated techniques.

Pattern-Based Detection Queries

Command Injection Detection


# Implement command injection detection
module CommandInjection;

export {
    type CommandPattern: record {
        regex: pattern;
        description: string;
        severity: count;
    };

    global suspicious_patterns: table[string] of CommandPattern = {
        ["write_multiple"] = CommandPattern($regex=/write.*multiple.*registers/,
                                          $description="Multiple register write",
                                          $severity=4),
        ["force_command"] = CommandPattern($regex=/force.*coil/,
                                         $description="Forced coil operation",
                                         $severity=5)
    } &redef;
}

function analyze_command(command: string, source: addr)
{
    for (pattern_name in suspicious_patterns) {
        if (suspicious_patterns[pattern_name]$regex in command) {
            NOTICE([
                $note=ICS::Suspicious_Command_Pattern,
                $msg=fmt("Detected %s from %s: %s",
                        suspicious_patterns[pattern_name]$description,
                        source,
                        command)
            ]);
        }
    }
}

Anomaly Detection Framework

Statistical Anomaly Detection


# Implement statistical anomaly detection
module AnomalyDetection;

export {
    type StatisticalProfile: record {
        mean: double;
        std_dev: double;
        sample_count: count;
        values: vector of double;
    };

    global value_profiles: table[string] of StatisticalProfile;
}

function update_statistics(tag: string, value: double)
{
    if (tag !in value_profiles) {
        value_profiles[tag] = StatisticalProfile($mean=0.0,
                                               $std_dev=0.0,
                                               $sample_count=0,
                                               $values=vector());
    }

    local profile = value_profiles[tag];
    profile$values[|profile$values|] = value;
    profile$sample_count += 1;

    # Update mean and standard deviation
    if (profile$sample_count > 1) {
        local sum = 0.0;
        local sq_sum = 0.0;

        for (idx in profile$values) {
            sum += profile$values[idx];
            sq_sum += profile$values[idx] * profile$values[idx];
        }

        profile$mean = sum / profile$sample_count;
        profile$std_dev = sqrt((sq_sum / profile$sample_count) - 
                             (profile$mean * profile$mean));

        # Check for anomalies
        if (|value - profile$mean| > (3 * profile$std_dev)) {
            NOTICE([
                $note=ICS::Statistical_Anomaly,
                $msg=fmt("Value anomaly detected for %s: %f", tag, value)
            ]);
        }
    }
}

Protocol-Specific Hunt Patterns

Modbus Hunt Queries


# Implement Modbus-specific hunting patterns
module ModbusHunting;

export {
    type ModbusOperation: record {
        function_code: count;
        register_start: count;
        register_count: count;
        timestamp: time;
    };

    global modbus_operations: table[addr] of vector of ModbusOperation;
}

function hunt_modbus_patterns(src: addr, operation: ModbusOperation)
{
    if (src !in modbus_operations)
        modbus_operations[src] = vector();

    modbus_operations[src][|modbus_operations[src]|] = operation;

    # Hunt for register scanning
    if (|modbus_operations[src]| >= 10) {
        local sequential_count = 0;
        local last_reg = 0;

        for (idx in modbus_operations[src]) {
            if (idx > 0) {
                if (modbus_operations[src][idx]$register_start == 
                    last_reg + modbus_operations[src][idx-1]$register_count)
                    sequential_count += 1;
            }
            last_reg = modbus_operations[src][idx]$register_start;
        }

        if (sequential_count >= 8) {
            NOTICE([
                $note=ICS::Modbus_Register_Scan,
                $msg=fmt("Possible Modbus register scanning from %s", src)
            ]);
        }
    }
}

Advanced Correlation Queries

Multi-Protocol Attack Detection


# Implement multi-protocol attack detection
module ProtocolCorrelation;

export {
    type ProtocolEvent: record {
        protocol: string;
        event_type: string;
        timestamp: time;
        details: string;
    };

    global protocol_sequences: table[addr] of vector of ProtocolEvent;
}

function analyze_protocol_sequence(src: addr, event: ProtocolEvent)
{
    if (src !in protocol_sequences)
        protocol_sequences[src] = vector();

    protocol_sequences[src][|protocol_sequences[src]|] = event;

    if (|protocol_sequences[src]| >= 3) {
        local last_three = vector();
        for (i in range(|protocol_sequences[src]|-3, |protocol_sequences[src]|))
            last_three[|last_three|] = protocol_sequences[src][i]$protocol;

        # Hunt for protocol abuse patterns
        if ("MODBUS" in last_three && "S7COMM" in last_three) {
            NOTICE([
                $note=ICS::Mixed_Protocol_Attack,
                $msg=fmt("Suspicious multi-protocol sequence from %s", src)
            ]);
        }
    }
}

Behavioral Analysis Queries

Command Frequency Analysis


# Implement command frequency analysis
module BehaviorAnalysis;

export {
    type CommandMetrics: record {
        hourly_counts: vector of count;
        total_count: count;
        last_update: time;
    };

    global command_patterns: table[addr, string] of CommandMetrics;
}

function analyze_command_frequency(src: addr, cmd: string)
{
    local key = cat(src, cmd);
    if ([src, cmd] !in command_patterns) {
        command_patterns[src, cmd] = CommandMetrics($hourly_counts=vector(24) of 0,
                                                  $total_count=0,
                                                  $last_update=current_time());
    }

    local metrics = command_patterns[src, cmd];
    local current_hour = to_count(strftime("%H", current_time()));
    
    metrics$hourly_counts[current_hour] += 1;
    metrics$total_count += 1;

    # Check for unusual frequency
    if (metrics$hourly_counts[current_hour] > 
        (metrics$total_count / 24) * 3) {
        NOTICE([
            $note=ICS::Command_Frequency_Anomaly,
            $msg=fmt("Unusual command frequency detected for %s: %s", src, cmd)
        ]);
    }
}