Threat Hunting ICS Communication With Zeek

The Evolving ICS Threat Landscape
Industrial Control Systems (ICS) face increasingly sophisticated cyber threats, with recent CISA alerts indicating a 300% increase in targeted attacks against critical infrastructure. Understanding these threats is crucial for developing effective monitoring strategies using Zeek's capabilities.
Current ICS Threat Actors
- Nation-State Groups: Targeting critical infrastructure for strategic advantages
- Cybercriminal Organizations: Focusing on ransomware and extortion
- Hacktivists: Pursuing politically motivated disruptions
- Insider Threats: Exploiting privileged access to systems
Common ICS Attack Vectors
According to Dragos's Year in Review, threat actors frequently exploit these vectors:
Network-Based Attacks
- Protocol manipulation and abuse
- Man-in-the-middle attacks on industrial protocols
- Unauthorized command injection
- Process variable manipulation
- Denial of Service (DoS) on critical systems
Process Manipulation Techniques
# Common Attack Patterns
1. Command Injection:
- Modbus Function Code Manipulation
- DNP3 Control Operation Abuse
- Unauthorized Write Commands
2. Process Tampering:
- Setpoint Modifications
- Sensor Value Manipulation
- Control Loop Interference
3. Protocol Abuse:
- Invalid Packet Sequences
- Protocol Stack Violations
- Timing Attacks
Zeek's ICS Monitoring Capabilities
Zeek provides comprehensive visibility into ICS networks through specialized protocol analyzers and logging capabilities:
Supported Industrial Protocols
# Primary ICS Protocols Monitored by Zeek
Protocol | Key Capabilities
-----------|-----------------
Modbus | Function codes, register access, command validation
DNP3 | Control operations, data acquisition, authentication
BACnet | Building automation, device discovery, service requests
S7comm | PLC communications, program downloads, diagnostic data
EtherNet/IP| CIP services, implicit/explicit messaging
OPC UA | Information modeling, service monitoring
ICS Threat Hunting Objectives
Primary Detection Goals
- Process Manipulation: Identify unauthorized changes to industrial processes
- Command & Control: Detect malicious control operations
- Reconnaissance: Monitor for system enumeration attempts
- Protocol Abuse: Identify violations of protocol specifications
Key Monitoring Metrics
# Essential ICS Monitoring Parameters
Category | Metrics to Monitor
-------------------|-------------------
Command Patterns | Frequency, timing, source
Process Variables | Rate of change, value ranges
Protocol Behavior | Compliance, sequence validity
Network Traffic | Flow characteristics, peer relationships
Authentication | Access patterns, credential usage
Threat Intelligence Integration
Effective ICS threat hunting requires integration with current threat intelligence:
Intelligence Sources
- CISA Advisories: Latest vulnerability and threat information
- Vendor Bulletins: Product-specific security updates
- Industry ISACs: Sector-specific threat intelligence
- Security Research: New attack techniques and vulnerabilities
Threat Indicators
# Key Indicator Categories
Type | Examples
------------------------|----------
Network Indicators | IP addresses, domains, protocols
Behavioral Indicators | Command sequences, timing patterns
Process Indicators | Value ranges, state transitions
System Indicators | Configuration changes, authentication attempts
Baseline Establishment
Establishing normal operational baselines is crucial for effective threat hunting:
Baseline Components
- Normal protocol usage patterns
- Typical command sequences
- Expected process variable ranges
- Standard communication paths
- Regular maintenance windows
Sources
- CISA Alert: Increased ICS Threats (December 2023)
- Dragos Year in Review Report
- SANS ICS Security Report
Core ICS Protocol Log Analysis
Understanding the structure and content of ICS protocol logs is fundamental for effective threat hunting. According to Zeek's protocol documentation, each log type captures specific aspects of industrial communications that are crucial for security monitoring.
Modbus Protocol Logging
modbus.log Structure
# Key Fields in modbus.log
{
"ts": "timestamp",
"uid": "unique_identifier",
"id.orig_h": "source_ip",
"id.orig_p": "source_port",
"id.resp_h": "destination_ip",
"id.resp_p": "destination_port",
"unit_id": "device_identifier",
"func": "function_code",
"exception": "exception_code",
"start_reg": "starting_register",
"num_reg": "register_count",
"values": "register_values"
}
Critical Modbus Indicators
- Function Codes:
- 01/02: Read Coils/Inputs
- 03/04: Read Holding/Input Registers
- 05/06: Write Single Coil/Register
- 15/16: Write Multiple Coils/Registers
- Anomaly Indicators:
- Unauthorized write operations
- Invalid register access
- Excessive polling
- Out-of-sequence operations
DNP3 Protocol Logging
dnp3.log Structure
# Key Fields in dnp3.log
{
"ts": "timestamp",
"uid": "unique_identifier",
"id.orig_h": "source_ip",
"id.resp_h": "destination_ip",
"fc_request": "function_code_request",
"fc_reply": "function_code_reply",
"iin": "internal_indication_flags",
"object_type": "data_type",
"range_low": "start_index",
"range_high": "end_index"
}
DNP3 Security Indicators
# Common DNP3 Attack Patterns
Function Code | Description | Suspicious Activity
-------------|----------------------|--------------------
0x01 | Read | Excessive polling
0x02 | Write | Unauthorized writes
0x05 | Direct Operate | Command injection
0x0F | Cold Restart | DoS attempt
0x10 | Warm Restart | System disruption
BACnet Protocol Logging
bacnet.log Structure
# Key Fields in bacnet.log
{
"ts": "timestamp",
"uid": "unique_identifier",
"service": "service_type",
"object_type": "device_object_type",
"instance_number": "object_instance",
"property": "property_identifier",
"value": "property_value",
"result": "service_result"
}
BACnet Monitoring Focus
- Critical Services:
- WriteProperty
- WritePropertyMultiple
- DeviceCommunicationControl
- ReinitializeDevice
- Security Events:
- Unauthorized property writes
- Device control attempts
- Configuration changes
- Authentication failures
S7comm Protocol Logging
s7comm.log Structure
# Key Fields in s7comm.log
{
"ts": "timestamp",
"uid": "unique_identifier",
"rosctr": "remote_operation_control",
"parameter": "parameter_code",
"parameter_value": "parameter_data",
"data_type": "data_type_code",
"bytes": "data_length",
"response": "response_code"
}
S7comm Security Analysis
# Critical Operation Types
Operation | Risk Level | Detection Focus
-------------|------------|----------------
READ_VAR | Medium | Data exfiltration
WRITE_VAR | High | Process manipulation
START/STOP | Critical | System disruption
DOWNLOAD | Critical | Program modification
UPLOAD | High | Logic theft
Advanced Log Analysis Techniques
Cross-Protocol Correlation
# Example correlation pattern
event zeek_init()
{
local r1 = SumStats::Reducer($stream="modbus_ops",
$apply=set(SumStats::UNIQUE));
local r2 = SumStats::Reducer($stream="dnp3_ops",
$apply=set(SumStats::UNIQUE));
SumStats::create([
$name="protocol_correlation",
$epoch=10min,
$reducers=set(r1, r2),
$threshold=10.0,
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
result["modbus_ops"]$unique + result["dnp3_ops"]$unique,
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =
{
NOTICE([$note=MultiProtocol_Activity,
$msg="Correlated protocol activity detected"]);
}
]);
}
Pattern Recognition
- Command sequence analysis
- Timing pattern detection
- Value range monitoring
- Protocol state tracking
- Error response analysis
Log Retention and Analysis
Implement effective log management strategies:
- Maintain logs for at least 90 days
- Implement log rotation policies
- Enable compressed storage for historical analysis
- Implement secure log transmission
Sources
Advanced ICS Log Analysis Framework
Implementing sophisticated log analysis techniques is crucial for detecting advanced threats in ICS environments. According to recent CISA advisories, threat actors are employing increasingly sophisticated techniques to target industrial systems.
Command and Control Pattern Analysis
Detecting Abnormal Command Sequences
# Implementation of command sequence analysis
module CommandSequenceAnalysis;
export {
type CommandSequence: record {
commands: vector of string;
timestamps: vector of time;
source_ips: set[addr];
};
global device_sequences: table[addr] of CommandSequence &create_expire=1hr;
# Define known legitimate command sequences
const legitimate_sequences: set[string] = {
"read-read-write",
"read-write-read",
"configure-write-read"
} &redef;
}
function analyze_command_sequence(device: addr, command: string, source: addr)
{
if (device !in device_sequences) {
device_sequences[device] = CommandSequence(
$commands=vector(),
$timestamps=vector(),
$source_ips=set()
);
}
local seq = device_sequences[device];
seq$commands[|seq$commands|] = command;
seq$timestamps[|seq$timestamps|] = current_time();
add seq$source_ips[source];
if (|seq$commands| >= 3) {
local sequence_str = join_string_vec(seq$commands, "-");
if (sequence_str !in legitimate_sequences) {
NOTICE([
$note=ICS::Suspicious_Command_Sequence,
$msg=fmt("Suspicious command sequence detected: %s from %s to %s",
sequence_str, source, device)
]);
}
}
}
Protocol State Analysis
State Transition Monitoring
# Monitor protocol state transitions
module ProtocolStateAnalysis;
export {
type ProtocolState: record {
current_state: string;
last_transition: time;
transition_count: count;
invalid_transitions: count;
};
global device_states: table[addr] of ProtocolState &create_expire=2hrs;
# Define valid state transitions
const valid_transitions: table[string] of set[string] = {
["IDLE"] = set("CONNECTING", "READING"),
["CONNECTING"] = set("CONNECTED", "IDLE"),
["CONNECTED"] = set("READING", "WRITING", "IDLE"),
["READING"] = set("CONNECTED", "WRITING", "IDLE"),
["WRITING"] = set("CONNECTED", "READING", "IDLE")
} &redef;
}
function check_state_transition(device: addr, new_state: string)
{
if (device !in device_states) {
device_states[device] = ProtocolState($current_state="IDLE",
$last_transition=current_time(),
$transition_count=0,
$invalid_transitions=0);
}
local state = device_states[device];
if (state$current_state !in valid_transitions ||
new_state !in valid_transitions[state$current_state]) {
state$invalid_transitions += 1;
if (state$invalid_transitions >= 3) {
NOTICE([
$note=ICS::Invalid_State_Transition,
$msg=fmt("Multiple invalid state transitions detected for %s",
device)
]);
}
}
state$current_state = new_state;
state$last_transition = current_time();
state$transition_count += 1;
}
Temporal Pattern Analysis
Activity Timing Analysis
# Analyze temporal patterns in ICS activities
module TemporalAnalysis;
export {
type TimeProfile: record {
hourly_patterns: vector of count;
last_update: time;
anomaly_count: count;
};
global device_timing: table[addr] of TimeProfile &create_expire=7days;
}
function analyze_temporal_pattern(device: addr, activity_time: time)
{
if (device !in device_timing) {
device_timing[device] = TimeProfile(
$hourly_patterns=vector(24) of 0,
$last_update=current_time(),
$anomaly_count=0
);
}
local profile = device_timing[device];
local current_hour = strftime("%H", activity_time);
local hour_index = to_count(current_hour);
profile$hourly_patterns[hour_index] += 1;
# Check for activity during unusual hours
if (hour_index < 6 || hour_index > 18) {
profile$anomaly_count += 1;
if (profile$anomaly_count >= 5) {
NOTICE([
$note=ICS::Unusual_Timing_Pattern,
$msg=fmt("Unusual activity timing detected for device %s", device)
]);
}
}
}
Value Range Analysis
Process Variable Monitoring
# Monitor process variable changes
module ValueAnalysis;
export {
type ValueProfile: record {
min_value: double;
max_value: double;
last_value: double;
rapid_changes: count;
};
global value_profiles: table[string] of ValueProfile &create_expire=1day;
}
function analyze_value_change(tag_name: string, new_value: double)
{
if (tag_name !in value_profiles) {
value_profiles[tag_name] = ValueProfile($min_value=new_value,
$max_value=new_value,
$last_value=new_value,
$rapid_changes=0);
return;
}
local profile = value_profiles[tag_name];
local value_change = |new_value - profile$last_value|;
local value_range = profile$max_value - profile$min_value;
# Check for rapid value changes
if (value_change > (value_range * 0.2)) {
profile$rapid_changes += 1;
if (profile$rapid_changes >= 3) {
NOTICE([
$note=ICS::Rapid_Value_Change,
$msg=fmt("Rapid value changes detected for %s", tag_name)
]);
}
}
profile$last_value = new_value;
profile$min_value = new_value < profile$min_value ? new_value : profile$min_value;
profile$max_value = new_value > profile$max_value ? new_value : profile$max_value;
}
Protocol Violation Detection
Protocol Conformance Checking
- Message format validation
- Field value range checking
- Protocol sequence verification
- Timing requirement validation
- Authentication verification
Sources
- Process historian data
- Asset management systems
- Configuration management databases
- Vulnerability scanners
- Threat intelligence feeds
- Real-time process data correlation
- Historical trend analysis
- Maintenance window validation
- Configuration change tracking
- Vulnerability context association
- Response time analysis
- Detection effectiveness metrics
- False positive rate tracking
- Resolution time monitoring
- Resource utilization assessment
CISA's ICS Incident Response Playbook, proper documentation and evidence handling are crucial for successful incident resolution.
Initial Investigation Framework
Evidence Collection System
# Implement evidence collection framework
module IncidentEvidence;
export {
type EvidenceItem: record {
timestamp: time;
source: string;
type: string;
data: string;
hash: string;
metadata: table[string] of string;
};
global evidence_collection: table[string] of vector of EvidenceItem;
}
function collect_evidence(incident_id: string, evidence: EvidenceItem)
{
# Generate evidence hash
evidence$hash = md5_hash(cat(
strftime("%Y-%m-%d %H:%M:%S", evidence$timestamp),
evidence$source,
evidence$data
));
if (incident_id !in evidence_collection)
evidence_collection[incident_id] = vector();
evidence_collection[incident_id][|evidence_collection[incident_id]|] = evidence;
# Log evidence collection
NOTICE([
$note=ICS::Evidence_Collected,
$msg=fmt("Evidence collected for incident %s: %s",
incident_id, evidence$type)
]);
}
Root Cause Analysis
Causal Chain Analysis
# Implement root cause analysis system
module RootCauseAnalysis;
export {
type CausalEvent: record {
timestamp: time;
event_type: string;
description: string;
severity: count;
related_events: set[string];
};
global causal_chains: table[string] of vector of CausalEvent;
}
function analyze_causal_chain(incident_id: string, event: CausalEvent)
{
if (incident_id !in causal_chains)
causal_chains[incident_id] = vector();
causal_chains[incident_id][|causal_chains[incident_id]|] = event;
# Analyze event relationships
if (|causal_chains[incident_id]| >= 3) {
local chain = "";
for (idx in causal_chains[incident_id])
chain = string_cat(chain,
causal_chains[incident_id][idx]$event_type,
" → ");
NOTICE([
$note=ICS::Causal_Chain_Identified,
$msg=fmt("Causal chain identified for incident %s: %s",
incident_id, chain)
]);
}
}
Incident Documentation
Documentation Management
# Implement incident documentation system
module IncidentDocumentation;
export {
type IncidentReport: record {
incident_id: string;
timestamp: time;
classification: string;
description: string;
affected_systems: set[addr];
timeline: vector of string;
evidence_refs: set[string];
mitigation_steps: vector of string;
};
global incident_reports: table[string] of IncidentReport;
}
function document_incident(report: IncidentReport)
{
# Validate required fields
if (|report$affected_systems| == 0 ||
|report$timeline| == 0 ||
|report$mitigation_steps| == 0) {
NOTICE([
$note=ICS::Incomplete_Documentation,
$msg=fmt("Incident report %s missing required information",
report$incident_id)
]);
return;
}
incident_reports[report$incident_id] = report;
# Generate summary
local summary = fmt("Incident %s documented:\n", report$incident_id);
summary = string_cat(summary,
"Classification: ", report$classification, "\n",
"Affected Systems: ", |report$affected_systems|, "\n",
"Timeline Events: ", |report$timeline|, "\n",
"Mitigation Steps: ", |report$mitigation_steps|);
NOTICE([
$note=ICS::Incident_Documented,
$msg=summary
]);
}
Lessons Learned Framework
Improvement Tracking System
# Implement lessons learned tracking
module LessonsLearned;
export {
type LessonLearned: record {
incident_id: string;
category: string;
description: string;
recommendations: vector of string;
implementation_status: string;
priority: count;
};
global lessons_database: vector of LessonLearned;
}
function track_lesson(lesson: LessonLearned)
{
lessons_database[|lessons_database|] = lesson;
# Prioritize implementations
if (lesson$priority >= 4) {
NOTICE([
$note=ICS::High_Priority_Lesson,
$msg=fmt("High-priority lesson identified from incident %s: %s",
lesson$incident_id, lesson$description)
]);
}
# Track implementation status
if (lesson$implementation_status == "pending" &&
lesson$priority >= 3) {
schedule 7days { check_lesson_implementation(lesson) };
}
}
function check_lesson_implementation(lesson: LessonLearned)
{
if (lesson$implementation_status == "pending") {
NOTICE([
$note=ICS::Lesson_Implementation_Overdue,
$msg=fmt("Implementation of lesson from incident %s is overdue",
lesson$incident_id)
]);
}
}
Continuous Improvement Process
Metrics and Analysis
Process Enhancement Steps
# Key Improvement Areas
1. Detection Capabilities
- Rule effectiveness
- Coverage gaps
- False positive reduction
2. Response Procedures
- Communication efficiency
- Team coordination
- Tool integration
3. Documentation Quality
- Completeness
- Accuracy
- Accessibility
4. Training Requirements
- Skill gaps
- Knowledge transfer
- Procedure updates
Sources
ICS Threat Hunting Troubleshooting Framework
Effective troubleshooting in ICS environments requires systematic approaches to common challenges. According to SANS ICS Security Research, proper handling of technical challenges is crucial for maintaining reliable threat detection.
Encrypted Traffic Analysis
TLS/SSL Traffic Handler
# Implement encrypted traffic analysis
module EncryptedTrafficAnalysis;
export {
type EncryptedSession: record {
start_time: time;
cipher_suite: string;
certificate_info: string;
ja3_fingerprint: string;
bytes_in: count;
bytes_out: count;
};
global encrypted_sessions: table[conn_id] of EncryptedSession;
}
function analyze_encrypted_traffic(c: connection, ssl: SSL::Info)
{
if (c$id !in encrypted_sessions) {
encrypted_sessions[c$id] = EncryptedSession(
$start_time=network_time(),
$cipher_suite=ssl$cipher,
$certificate_info=ssl$cert_subject,
$ja3_fingerprint=ssl$ja3,
$bytes_in=0,
$bytes_out=0
);
}
# Update byte counts
encrypted_sessions[c$id]$bytes_in += c$orig$size;
encrypted_sessions[c$id]$bytes_out += c$resp$size;
# Analyze suspicious patterns
if (ssl$ja3 !in known_good_ja3_hashes) {
NOTICE([
$note=ICS::Suspicious_SSL_Fingerprint,
$msg=fmt("Unknown JA3 fingerprint detected: %s", ssl$ja3),
$conn=c
]);
}
}
Proprietary Protocol Handler
Protocol Variation Management
# Handle proprietary protocol variations
module ProprietaryProtocolHandler;
export {
type ProtocolVariant: record {
vendor: string;
version: string;
parsing_rules: vector of pattern;
field_mappings: table[string] of string;
known_deviations: set[string];
};
global protocol_variants: table[string] of ProtocolVariant;
}
function handle_proprietary_protocol(data: string, protocol_id: string): bool
{
if (protocol_id !in protocol_variants)
return F;
local variant = protocol_variants[protocol_id];
# Try each parsing rule
for (idx in variant$parsing_rules) {
if (variant$parsing_rules[idx] in data) {
process_variant_data(data, variant);
return T;
}
}
# Log parsing failure
NOTICE([
$note=ICS::Protocol_Parse_Failure,
$msg=fmt("Failed to parse proprietary protocol: %s", protocol_id)
]);
return F;
}
function process_variant_data(data: string, variant: ProtocolVariant)
{
# Apply vendor-specific field mappings
local fields: table[string] of string;
for ([field_name, field_pattern] in variant$field_mappings) {
local match = find_all(data, pattern_compile(field_pattern));
if (|match| > 0)
fields[field_name] = match[0];
}
analyze_fields(fields, variant);
}
High-Volume Data Management
Data Volume Optimization
# Implement data volume management
module DataVolumeManager;
export {
type VolumeMetrics: record {
current_rate: double;
peak_rate: double;
total_processed: count;
sampling_active: bool;
};
global volume_metrics: VolumeMetrics;
const RATE_THRESHOLD = 10000.0; # Events per second
}
function manage_data_volume(current_rate: double)
{
volume_metrics$current_rate = current_rate;
if (current_rate > volume_metrics$peak_rate)
volume_metrics$peak_rate = current_rate;
# Implement adaptive sampling
if (current_rate > RATE_THRESHOLD && !volume_metrics$sampling_active) {
activate_sampling();
NOTICE([
$note=ICS::High_Volume_Sampling,
$msg=fmt("Activated sampling due to high data rate: %.2f eps",
current_rate)
]);
}
}
function activate_sampling()
{
volume_metrics$sampling_active = T;
# Calculate sampling rate based on current load
local sampling_rate = RATE_THRESHOLD / volume_metrics$current_rate;
sampling_rate = sampling_rate < 0.1 ? 0.1 : sampling_rate;
set_sampling_rate(sampling_rate);
}
False Positive Reduction
Alert Validation System
# Implement false positive reduction
module AlertValidator;
export {
type AlertValidation: record {
confidence_score: double;
context_matches: count;
historical_matches: count;
validation_rules: set[string];
};
global alert_validations: table[string] of AlertValidation;
}
function validate_alert(alert_id: string, context: table[string] of string): bool
{
if (alert_id !in alert_validations)
return F;
local validation = alert_validations[alert_id];
local score = 0.0;
# Check context matches
for (rule in validation$validation_rules) {
if (rule in context) {
validation$context_matches += 1;
score += 0.2;
}
}
# Check historical patterns
if (validation$historical_matches > 0)
score += 0.3;
# Update confidence score
validation$confidence_score = score;
return score >= 0.7;
}
Protocol Parsing Error Handling
Error Recovery System
# Implement protocol parsing error recovery
module ParsingErrorHandler;
export {
type ParsingError: record {
error_type: string;
error_count: count;
last_occurrence: time;
recovery_attempts: count;
};
global parsing_errors: table[string] of ParsingError;
}
function handle_parsing_error(protocol: string, error: string): bool
{
local error_id = cat(protocol, "-", error);
if (error_id !in parsing_errors) {
parsing_errors[error_id] = ParsingError(
$error_type=error,
$error_count=0,
$last_occurrence=network_time(),
$recovery_attempts=0
);
}
local err = parsing_errors[error_id];
err$error_count += 1;
err$last_occurrence = network_time();
# Implement recovery strategy
if (err$recovery_attempts < 3) {
err$recovery_attempts += 1;
return attempt_recovery(protocol, error);
}
NOTICE([
$note=ICS::Parsing_Error_Threshold,
$msg=fmt("Multiple parsing errors for %s: %s", protocol, error)
]);
return F;
}
CISA Joint Advisory on ICS Threats
Enhanced ICS Log Correlation Framework
Effective threat detection in ICS environments requires comprehensive correlation of multiple data sources. According to SANS ICS Security Research, integrated analysis can significantly improve threat detection accuracy.
Network Flow Analysis Integration
Flow Correlation Engine
# Implement network flow correlation
module FlowCorrelation;
export {
type FlowProfile: record {
start_time: time;
last_seen: time;
bytes_in: count;
bytes_out: count;
conn_state: string;
protocols: set[string];
};
global device_flows: table[addr] of FlowProfile &create_expire=1day;
}
event connection_state_remove(c: connection)
{
local device = c$id$resp_h;
if (device !in device_flows) {
device_flows[device] = FlowProfile(
$start_time=c$start_time,
$last_seen=c$start_time,
$bytes_in=0,
$bytes_out=0,
$conn_state=c$conn$conn_state,
$protocols=set()
);
}
device_flows[device]$bytes_in += c$orig$size;
device_flows[device]$bytes_out += c$resp$size;
device_flows[device]$last_seen = c$start_time;
add device_flows[device]$protocols[c$proto];
# Analyze flow patterns
if (|device_flows[device]$protocols| > 2) {
NOTICE([
$note=ICS::Unusual_Protocol_Mix,
$msg=fmt("Device %s using multiple protocols: %s",
device,
join_string_set(device_flows[device]$protocols, ", "))
]);
}
}
Asset Inventory Correlation
Asset Context Integration
# Asset context correlation system
module AssetCorrelation;
export {
type AssetProfile: record {
asset_type: string;
authorized_protocols: set[string];
authorized_peers: set[addr];
operational_hours: set[count];
criticality: string;
};
global asset_inventory: table[addr] of AssetProfile;
}
function load_asset_inventory()
{
# Example asset profile
asset_inventory[192.168.1.100] = AssetProfile(
$asset_type="PLC",
$authorized_protocols=set("MODBUS", "S7COMM"),
$authorized_peers=set(192.168.1.10, 192.168.1.20),
$operational_hours=set(8, 9, 10, 11, 12, 13, 14, 15, 16),
$criticality="HIGH"
);
}
function check_asset_compliance(device: addr, protocol: string,
peer: addr, hour: count)
{
if (device in asset_inventory) {
local profile = asset_inventory[device];
if (protocol !in profile$authorized_protocols ||
peer !in profile$authorized_peers ||
hour !in profile$operational_hours) {
NOTICE([
$note=ICS::Asset_Policy_Violation,
$msg=fmt("Policy violation for %s asset: %s",
profile$asset_type, device)
]);
}
}
}
Behavioral Baseline Integration
Behavioral Pattern Analysis
# Behavioral baseline monitoring system
module BehaviorAnalysis;
export {
type BehaviorProfile: record {
command_frequencies: table[string] of count;
value_ranges: table[string] of interval;
timing_patterns: vector of count;
peer_relationships: set[addr];
};
global behavior_baselines: table[addr] of BehaviorProfile;
}
function analyze_behavior(device: addr, command: string,
value: double, peer: addr)
{
if (device !in behavior_baselines) {
behavior_baselines[device] = BehaviorProfile(
$command_frequencies=table(),
$value_ranges=table(),
$timing_patterns=vector(24) of 0,
$peer_relationships=set()
);
}
local profile = behavior_baselines[device];
local current_hour = strftime("%H", current_time());
# Update behavior profile
if (command !in profile$command_frequencies)
profile$command_frequencies[command] = 0;
profile$command_frequencies[command] += 1;
# Check for behavioral anomalies
if (profile$command_frequencies[command] > 100) {
NOTICE([
$note=ICS::Command_Frequency_Anomaly,
$msg=fmt("Unusual command frequency for device %s: %s",
device, command)
]);
}
}
Alert Correlation Engine
Multi-Source Alert Correlation
# Alert correlation system
module AlertCorrelation;
export {
type AlertContext: record {
timestamp: time;
alert_type: string;
severity: string;
source_data: string;
related_assets: set[addr];
};
global alert_queue: vector of AlertContext;
global correlation_window = 5min &redef;
}
function correlate_alerts()
{
local current_time = current_time();
local window_start = current_time - correlation_window;
local relevant_alerts = vector();
# Gather relevant alerts within time window
for (idx in alert_queue) {
if (alert_queue[idx]$timestamp >= window_start)
relevant_alerts[|relevant_alerts|] = alert_queue[idx];
}
# Look for alert patterns
if (|relevant_alerts| >= 3) {
local asset_involvement = set();
for (alert in relevant_alerts)
for (asset in alert$related_assets)
add asset_involvement[asset];
if (|asset_involvement| >= 2) {
NOTICE([
$note=ICS::Correlated_Attack_Pattern,
$msg="Multiple alerts across different assets detected"
]);
}
}
}
Context Enhancement Strategies
Data Enrichment Sources
Integration Points
Sources
Advanced ICS Threat Hunting Framework
Implementing sophisticated threat hunting techniques requires comprehensive query patterns and detection methodologies. According to CISA's latest advisory, advanced persistent threats are increasingly targeting ICS environments with sophisticated techniques.
Pattern-Based Detection Queries
Command Injection Detection
# Implement command injection detection
module CommandInjection;
export {
type CommandPattern: record {
regex: pattern;
description: string;
severity: count;
};
global suspicious_patterns: table[string] of CommandPattern = {
["write_multiple"] = CommandPattern($regex=/write.*multiple.*registers/,
$description="Multiple register write",
$severity=4),
["force_command"] = CommandPattern($regex=/force.*coil/,
$description="Forced coil operation",
$severity=5)
} &redef;
}
function analyze_command(command: string, source: addr)
{
for (pattern_name in suspicious_patterns) {
if (suspicious_patterns[pattern_name]$regex in command) {
NOTICE([
$note=ICS::Suspicious_Command_Pattern,
$msg=fmt("Detected %s from %s: %s",
suspicious_patterns[pattern_name]$description,
source,
command)
]);
}
}
}
Anomaly Detection Framework
Statistical Anomaly Detection
# Implement statistical anomaly detection
module AnomalyDetection;
export {
type StatisticalProfile: record {
mean: double;
std_dev: double;
sample_count: count;
values: vector of double;
};
global value_profiles: table[string] of StatisticalProfile;
}
function update_statistics(tag: string, value: double)
{
if (tag !in value_profiles) {
value_profiles[tag] = StatisticalProfile($mean=0.0,
$std_dev=0.0,
$sample_count=0,
$values=vector());
}
local profile = value_profiles[tag];
profile$values[|profile$values|] = value;
profile$sample_count += 1;
# Update mean and standard deviation
if (profile$sample_count > 1) {
local sum = 0.0;
local sq_sum = 0.0;
for (idx in profile$values) {
sum += profile$values[idx];
sq_sum += profile$values[idx] * profile$values[idx];
}
profile$mean = sum / profile$sample_count;
profile$std_dev = sqrt((sq_sum / profile$sample_count) -
(profile$mean * profile$mean));
# Check for anomalies
if (|value - profile$mean| > (3 * profile$std_dev)) {
NOTICE([
$note=ICS::Statistical_Anomaly,
$msg=fmt("Value anomaly detected for %s: %f", tag, value)
]);
}
}
}
Protocol-Specific Hunt Patterns
Modbus Hunt Queries
# Implement Modbus-specific hunting patterns
module ModbusHunting;
export {
type ModbusOperation: record {
function_code: count;
register_start: count;
register_count: count;
timestamp: time;
};
global modbus_operations: table[addr] of vector of ModbusOperation;
}
function hunt_modbus_patterns(src: addr, operation: ModbusOperation)
{
if (src !in modbus_operations)
modbus_operations[src] = vector();
modbus_operations[src][|modbus_operations[src]|] = operation;
# Hunt for register scanning
if (|modbus_operations[src]| >= 10) {
local sequential_count = 0;
local last_reg = 0;
for (idx in modbus_operations[src]) {
if (idx > 0) {
if (modbus_operations[src][idx]$register_start ==
last_reg + modbus_operations[src][idx-1]$register_count)
sequential_count += 1;
}
last_reg = modbus_operations[src][idx]$register_start;
}
if (sequential_count >= 8) {
NOTICE([
$note=ICS::Modbus_Register_Scan,
$msg=fmt("Possible Modbus register scanning from %s", src)
]);
}
}
}
Advanced Correlation Queries
Multi-Protocol Attack Detection
# Implement multi-protocol attack detection
module ProtocolCorrelation;
export {
type ProtocolEvent: record {
protocol: string;
event_type: string;
timestamp: time;
details: string;
};
global protocol_sequences: table[addr] of vector of ProtocolEvent;
}
function analyze_protocol_sequence(src: addr, event: ProtocolEvent)
{
if (src !in protocol_sequences)
protocol_sequences[src] = vector();
protocol_sequences[src][|protocol_sequences[src]|] = event;
if (|protocol_sequences[src]| >= 3) {
local last_three = vector();
for (i in range(|protocol_sequences[src]|-3, |protocol_sequences[src]|))
last_three[|last_three|] = protocol_sequences[src][i]$protocol;
# Hunt for protocol abuse patterns
if ("MODBUS" in last_three && "S7COMM" in last_three) {
NOTICE([
$note=ICS::Mixed_Protocol_Attack,
$msg=fmt("Suspicious multi-protocol sequence from %s", src)
]);
}
}
}
Behavioral Analysis Queries
Command Frequency Analysis
# Implement command frequency analysis
module BehaviorAnalysis;
export {
type CommandMetrics: record {
hourly_counts: vector of count;
total_count: count;
last_update: time;
};
global command_patterns: table[addr, string] of CommandMetrics;
}
function analyze_command_frequency(src: addr, cmd: string)
{
local key = cat(src, cmd);
if ([src, cmd] !in command_patterns) {
command_patterns[src, cmd] = CommandMetrics($hourly_counts=vector(24) of 0,
$total_count=0,
$last_update=current_time());
}
local metrics = command_patterns[src, cmd];
local current_hour = to_count(strftime("%H", current_time()));
metrics$hourly_counts[current_hour] += 1;
metrics$total_count += 1;
# Check for unusual frequency
if (metrics$hourly_counts[current_hour] >
(metrics$total_count / 24) * 3) {
NOTICE([
$note=ICS::Command_Frequency_Anomaly,
$msg=fmt("Unusual command frequency detected for %s: %s", src, cmd)
]);
}
}
Member discussion