In today’s threat landscape, where bots, vulnerability scanners, and distributed attacks are commonplace, analyzing CDN logs like those from AWS CloudFront is essential for proactive threat detection. As a cybersecurity practitioner, I’ve developed a Python tool that uses unsupervised machine learning via z-score outlier detection to identify anomalous behavior in these logs. By extracting features and applying statistical analysis, it flags suspicious IPs without requiring labeled data—making it ideal for integration into SIEM systems, incident response workflows, or automated alerting.
This tool is particularly useful for spotting patterns like repetitive endpoint hammering (e.g., brute-force attempts), session hijacking via cookie reuse, or low-diversity probes indicative of automated scripts. Below, I’ll outline the methodology, provide the script, and share a sample output with sanitized data.
Methodology
- Log Parsing and Preprocessing: Filter internal subnets, normalize timestamps, and construct URIs.
- Feature Engineering: Compute IP-level metrics such as request volume, URI diversity, error rates, and cookie sharing.
- Anomaly Detection: Log-transform features for normalization, then use z-scores to detect deviations (>3 standard deviations from mean behavior).
- Scoring and Ranking: Weighted sum of z-scores prioritizes high-risk behaviors (e.g., repetition weighted x3).
- Output: Feature hit counts, top URIs, all anomalous IPs, and details for top 10 (URIs/user-agents) for actionable insights.
The unsupervised nature relies on the assumption that normal traffic dominates the logs; outliers stand out. For enhanced accuracy, train on clean baselines to establish a normal pattern before analyzing suspect files (detailed below).
Enhancement: Learning Normal Traffic Patterns Before Analyzing Suspect Files
To improve detection and reduce false positives, incorporate a training phase using known normal logs (e.g., three clean CloudFront files from periods without attacks). This learns a baseline of typical behavior (means and std devs for features) and uses it to score deviations in the suspect file. This is especially effective for environments with consistent normal traffic, as anomalies (e.g., sudden bot spikes) become more pronounced against the baseline.
- Training Phase: Aggregate features from normal logs to compute baseline stats.
- Testing Phase: Apply baseline to suspect log for z-scores and scoring.
- Benefits: Better handles varying traffic; if normal logs show low repetition, bots hammering one URI score higher.
- Implementation Tip: Use chunked processing for large files; adjust thresholds based on validation.
Python Script
Here’s the complete script (Python 3.9+ with pandas, numpy, scipy). Customize paths and thresholds.
import pandas as pd
import numpy as np
from scipy import stats
import urllib.parse
import ipaddress
# Log file path
local_log_path = r'C:\logs\cloudfront-sample.log' # Replace with your path
# CloudFront log fields
field_names = [
'date', 'time', 'x-edge-location', 'sc-bytes', 'c-ip', 'cs-method', 'cs(Host)', 'cs-uri-stem',
'sc-status', 'cs(Referer)', 'cs(User-Agent)', 'cs-uri-query', 'cs(Cookie)', 'x-edge-result-type',
'x-edge-request-id', 'x-host-header', 'cs-protocol', 'cs-bytes', 'time-taken', 'x-forwarded-for',
'ssl-protocol', 'ssl-cipher', 'x-edge-response-result-type', 'cs-protocol-version', 'fle-status',
'fle-encrypted-fields', 'c-port', 'time-to-first-byte', 'x-edge-detailed-result-type',
'sc-content-type', 'sc-content-len', 'sc-range-start', 'sc-range-end'
]
# Parse log
df = pd.read_csv(local_log_path, sep='\t', skiprows=2, names=field_names, on_bad_lines='skip')
# Exclude internal subnet
exclude_subnet = ipaddress.ip_network('192.168.1.0/24')
df = df[~df['c-ip'].apply(lambda ip: ipaddress.ip_address(ip) in exclude_subnet if ip else False)]
# Numeric conversions and datetime
df['sc-status'] = pd.to_numeric(df['sc-status'], errors='coerce').fillna(0).astype(int)
df['time-taken'] = pd.to_numeric(df['time-taken'], errors='coerce').fillna(0).astype(float)
df['datetime'] = pd.to_datetime(df['date'] + ' ' + df['time'], errors='coerce')
# Decode user-agent and build URI
df['user_agent_decoded'] = df['cs(User-Agent)'].apply(lambda ua: urllib.parse.unquote(ua) if isinstance(ua, str) else ua)
df['full_uri'] = df['cs-uri-stem'] + df['cs-uri-query'].apply(lambda q: '' if pd.isna(q) or q == '-' else '?' + q)
# Cookie reuse detection
df['cs(Cookie)'] = df['cs(Cookie)'].replace('-', pd.NA)
cookie_groups = df.groupby('cs(Cookie)')['c-ip'].nunique().reset_index(name='shared_ip_count')
df = df.merge(cookie_groups, on='cs(Cookie)', how='left')
df['shared_ip_count'] = df['shared_ip_count'].fillna(1)
# Group by IP and compute features
ip_groups = df.groupby('c-ip')
features_dict = {
'request_count': ip_groups.size(),
'unique_uris': ip_groups['cs-uri-stem'].nunique(),
'error_count': ip_groups['sc-status'].apply(lambda s: (s >= 400).sum()),
'avg_time_taken': ip_groups['time-taken'].mean(),
'request_rate': ip_groups.apply(lambda g: len(g) / ((g['datetime'].max() - g['datetime'].min()).total_seconds() / 60) if (g['datetime'].max() - g['datetime'].min()).total_seconds() > 0 else 0, include_groups=False),
'repetition_ratio': ip_groups.apply(lambda g: len(g) / g['full_uri'].nunique() if g['full_uri'].nunique() > 0 else 0, include_groups=False),
'max_cookie_reuse': ip_groups['shared_ip_count'].max(),
'low_uri_variety': (ip_groups['cs-uri-stem'].nunique() < 3).astype(int)
}
features = pd.DataFrame(features_dict).reset_index().fillna(0)
# Log-transform and z-scores
transformed_cols = ['request_count', 'unique_uris', 'error_count', 'avg_time_taken', 'request_rate', 'repetition_ratio', 'max_cookie_reuse']
transformed = features[transformed_cols].copy()
for col in ['request_count', 'request_rate', 'repetition_ratio', 'max_cookie_reuse']:
transformed[col] = np.log1p(transformed[col])
z_scores = transformed.apply(stats.zscore).fillna(0)
z_scores.columns = [col + '_z' for col in transformed_cols]
features = pd.concat([features, z_scores], axis=1)
# Flag and score anomalies
flag_cols = [col + '_z' for col in transformed_cols]
features['is_anomalous'] = features[flag_cols].gt(3).any(axis=1) | (features['low_uri_variety'] == 1)
score_sum = np.abs(z_scores.iloc[:, :5]).sum(axis=1) + 3 * np.abs(z_scores['repetition_ratio_z']) + 2 * np.abs(z_scores['max_cookie_reuse_z']) + 2 * features['low_uri_variety']
features['anomaly_score'] = score_sum
# Output
print("Number of IPs hitting each anomalous feature:")
feature_hits = {col.replace('_z', '') + '_anomalous': features[col].gt(3).sum() for col in flag_cols}
feature_hits['low_uri_variety_anomalous'] = features['low_uri_variety'].sum()
for feature, count in feature_hits.items():
print(f"{feature}: {count}")
uri_counts = df['full_uri'].value_counts().head(10)
print("\nTop 10 URIs by total request count:")
print(uri_counts)
anomalous_ips_all = features[features['is_anomalous']].sort_values('anomaly_score', ascending=False)
if anomalous_ips_all.empty:
print("\nNo anomalous IPs detected.")
else:
print("\nAll anomalous IPs:")
print(anomalous_ips_all[['c-ip', 'anomaly_score']]) # Simplified
top_10_ips = anomalous_ips_all.head(10)['c-ip']
print("\nURIs and User-Agents for top 10 anomalous IPs:")
for ip in top_10_ips:
ip_df = df[df['c-ip'] == ip]
print(f"\nIP: {ip}")
print("URIs:")
print(ip_df['full_uri'].value_counts().head(10))
print("User-Agents:")
print(ip_df['user_agent_decoded'].value_counts().head(10))
Sample Output:
Number of IPs hitting each anomalous feature:
request_count_anomalous: 95
unique_uris_anomalous: 12
error_count_anomalous: 20
avg_time_taken_anomalous: 28
request_rate_anomalous: 45
repetition_ratio_anomalous: 80
max_cookie_reuse_anomalous: 5
low_uri_variety_anomalous: 2300
Top 10 URIs by total request count:
/app/login.jsf: 7500
/app/api/data.jsf: 150
/app/static/js/core.js?ln=lib&v=1.0: 100
/app/static/css/styles.css?ln=lib&v=1.0: 90
/app/static/js/plugins.js?ln=lib&v=1.0: 85
/app/api/fetch.jsf: 80
/app/static/images/logo.png: 70
/app/static/js/main.js?ln=lib&v=1.0: 65
/app/auth/session.jsf: 60
/app/static/fonts/icon.css?ln=lib&v=1.0: 55
All anomalous IPs:
c-ip anomaly_score
0 10.0.0.123 65.4321
1 10.0.0.124 35.9876
2 10.0.0.125 34.5678
3 10.0.0.126 32.1234
... ... ...
URIs and User-Agents for top 10 anomalous IPs:
IP: 10.0.0.123
URIs:
/app/login.jsf: 500
User-Agents:
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36: 500
IP: 10.0.0.124
URIs:
/app/login.jsf: 130
User-Agents:
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36: 130
...
Benefits for Cybersecurity
- Threat Identification: Detects bots hammering endpoints or reusing cookies.
- Unsupervised ML: Z-scores highlight deviations without training data, ideal for dynamic environments.
- Scalable & Actionable: Handles large logs; outputs support WAF rules or IR.