Home > Member's Vault > Use python and ML (Machine learning) to analysis Anomalous Traffic in AWS CloudFront Logs (Part 1)

Use python and ML (Machine learning) to analysis Anomalous Traffic in AWS CloudFront Logs (Part 1)

As cybersecurity professionals, we constantly face the challenge of sifting through vast volumes of log data to identify potential threats such as bots, vulnerability scanners, or distributed attacks. Recently, I developed a Python-based tool to analyze AWS CloudFront access logs for anomalous traffic patterns. Leveraging unsupervised machine learning via z-score outlier detection, this script extracts behavioral features from logs to flag suspicious client IPs. It’s particularly useful for detecting repetitive exploits, session hijacking via cookie reuse, and low-variety probes—common indicators of automated threats. This approach is lightweight, scalable, and requires no labeled datasets, making it ideal for integration into SIEM workflows or threat hunting pipelines.

Tool Capabilities

  • Log Parsing and Preprocessing: Handles large log files, filters internal subnets, and normalizes data (e.g., timestamp creation, URI construction).
  • Feature Engineering: Computes IP-level metrics like request volume, URI diversity, error rates, and cookie sharing to capture attack signatures.
  • Anomaly Detection (ML Core): Applies log-transformation and z-scores to identify statistical outliers (deviations >3 std devs from mean behavior).
  • Threat Scoring and Ranking: Weighted scoring prioritizes high-repetition or low-variety IPs, with outputs for IP lists, feature hits, and top URIs.
  • Security Insights: Flags potential bots (e.g., hammering /login.jsf) and provides URI/user-agent details for WAF rule creation or incident response.

Sample Python Script

Below is the core script (tested on Python 3.9+ with pandas, numpy, scipy). For production, add logging and integrate with AWS S3 for automated pulls.

import pandas as pd
import numpy as np
from scipy import stats
import urllib.parse
import ipaddress

# Log file path
local_log_path = r'C:\logs\cloudfront-sample.log'  # Replace with your path

# CloudFront log fields
field_names = [
    'date', 'time', 'x-edge-location', 'sc-bytes', 'c-ip', 'cs-method', 'cs(Host)', 'cs-uri-stem', 
    'sc-status', 'cs(Referer)', 'cs(User-Agent)', 'cs-uri-query', 'cs(Cookie)', 'x-edge-result-type', 
    'x-edge-request-id', 'x-host-header', 'cs-protocol', 'cs-bytes', 'time-taken', 'x-forwarded-for', 
    'ssl-protocol', 'ssl-cipher', 'x-edge-response-result-type', 'cs-protocol-version', 'fle-status', 
    'fle-encrypted-fields', 'c-port', 'time-to-first-byte', 'x-edge-detailed-result-type', 
    'sc-content-type', 'sc-content-len', 'sc-range-start', 'sc-range-end'
]

# Parse log
df = pd.read_csv(local_log_path, sep='\t', skiprows=2, names=field_names, on_bad_lines='skip')

# Exclude internal subnet
exclude_subnet = ipaddress.ip_network('192.168.1.0/24')
df = df[~df['c-ip'].apply(lambda ip: ipaddress.ip_address(ip) in exclude_subnet if ip else False)]

# Numeric conversions and datetime
df['sc-status'] = pd.to_numeric(df['sc-status'], errors='coerce').fillna(0).astype(int)
df['time-taken'] = pd.to_numeric(df['time-taken'], errors='coerce').fillna(0).astype(float)
df['datetime'] = pd.to_datetime(df['date'] + ' ' + df['time'], errors='coerce')

# Decode user-agent and build URI
df['user_agent_decoded'] = df['cs(User-Agent)'].apply(lambda ua: urllib.parse.unquote(ua) if isinstance(ua, str) else ua)
df['full_uri'] = df['cs-uri-stem'] + df['cs-uri-query'].apply(lambda q: '' if pd.isna(q) or q == '-' else '?' + q)

# Cookie reuse detection
df['cs(Cookie)'] = df['cs(Cookie)'].replace('-', pd.NA)
cookie_groups = df.groupby('cs(Cookie)')['c-ip'].nunique().reset_index(name='shared_ip_count')
df = df.merge(cookie_groups, on='cs(Cookie)', how='left')
df['shared_ip_count'] = df['shared_ip_count'].fillna(1)

# Group by IP and compute features
ip_groups = df.groupby('c-ip')
features_dict = {
    'request_count': ip_groups.size(),
    'unique_uris': ip_groups['cs-uri-stem'].nunique(),
    'error_count': ip_groups['sc-status'].apply(lambda s: (s >= 400).sum()),
    'avg_time_taken': ip_groups['time-taken'].mean(),
    'request_rate': ip_groups.apply(lambda g: len(g) / ((g['datetime'].max() - g['datetime'].min()).total_seconds() / 60) if (g['datetime'].max() - g['datetime'].min()).total_seconds() > 0 else 0, include_groups=False),
    'repetition_ratio': ip_groups.apply(lambda g: len(g) / g['full_uri'].nunique() if g['full_uri'].nunique() > 0 else 0, include_groups=False),
    'max_cookie_reuse': ip_groups['shared_ip_count'].max(),
    'low_uri_variety': (ip_groups['cs-uri-stem'].nunique() < 3).astype(int)
}
features = pd.DataFrame(features_dict).reset_index().fillna(0)

# Log-transform and z-scores
transformed_cols = ['request_count', 'unique_uris', 'error_count', 'avg_time_taken', 'request_rate', 'repetition_ratio', 'max_cookie_reuse']
transformed = features[transformed_cols].copy()
for col in ['request_count', 'request_rate', 'repetition_ratio', 'max_cookie_reuse']:
    transformed[col] = np.log1p(transformed[col])
z_scores = transformed.apply(stats.zscore).fillna(0)
z_scores.columns = [col + '_z' for col in transformed_cols]
features = pd.concat([features, z_scores], axis=1)

# Flag and score anomalies
flag_cols = [col + '_z' for col in transformed_cols]
features['is_anomalous'] = features[flag_cols].gt(3).any(axis=1) | (features['low_uri_variety'] == 1)
score_sum = np.abs(z_scores.iloc[:, :5]).sum(axis=1) + 3 * np.abs(z_scores['repetition_ratio_z']) + 2 * np.abs(z_scores['max_cookie_reuse_z']) + 2 * features['low_uri_variety']
features['anomaly_score'] = score_sum

# Output
print("Number of IPs hitting each anomalous feature:")
feature_hits = {col.replace('_z', '') + '_anomalous': features[col].gt(3).sum() for col in flag_cols}
feature_hits['low_uri_variety_anomalous'] = features['low_uri_variety'].sum()
for feature, count in feature_hits.items():
    print(f"{feature}: {count}")

uri_counts = df['full_uri'].value_counts().head(10)
print("\nTop 10 URIs by total request count across all IPs:")
print(uri_counts)

anomalous_ips_all = features[features['is_anomalous']].sort_values('anomaly_score', ascending=False)
print("\nAll anomalous IPs:")
print(anomalous_ips_all[['c-ip', 'anomaly_score']])  # Simplified

# Top 10 URIs/user-agents
top_10_ips = anomalous_ips_all.head(10)['c-ip']
for ip in top_10_ips:
    ip_df = df[df['c-ip'] == ip]
    print(f"\nIP: {ip}")
    print("URIs:")
    print(ip_df['full_uri'].value_counts().head(10))
    print("User-Agents:")
    print(ip_df['user_agent_decoded'].value_counts().head(10))

Sample Output (Fabricated Data)

Number of IPs hitting each anomalous feature:
request_count_anomalous: 95
unique_uris_anomalous: 12
error_count_anomalous: 20
avg_time_taken_anomalous: 28
request_rate_anomalous: 45
repetition_ratio_anomalous: 80
max_cookie_reuse_anomalous: 5
low_uri_variety_anomalous: 2300

Top 10 URIs by total request count across all IPs:
/app/login.jsf: 7500
/app/api/data.jsf: 150
/app/static/js/core.js?ln=lib&v=1.0: 100
/app/static/css/styles.css?ln=lib&v=1.0: 90
/app/static/js/plugins.js?ln=lib&v=1.0: 85
/app/api/fetch.jsf: 80
/app/static/images/logo.png: 70
/app/static/js/main.js?ln=lib&v=1.0: 65
/app/auth/session.jsf: 60
/app/static/fonts/icon.css?ln=lib&v=1.0: 55

All anomalous IPs:
         c-ip  anomaly_score
0  10.0.0.123      65.4321
1  10.0.0.124      35.9876
2  10.0.0.125      34.5678
3  10.0.0.126      32.1234
...        ...          ...

IP: 10.0.0.123
URIs:
/app/login.jsf: 500
User-Agents:
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36: 500

IP: 10.0.0.124
URIs:
/app/login.jsf: 130
User-Agents:
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36: 130

IP: 10.0.0.125
URIs:
/app/login.jsf: 125
User-Agents:
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36: 125

Why Use This in Cybersecurity?

  • Threat Detection: Spots bots hammering endpoints (e.g., /app/login.jsf with 7500 requests) or reusing cookies.
  • Unsupervised ML: Z-scores highlight deviations (e.g., high repetition = potential brute-force).
  • Scalable: Handles large logs; integrate with SIEM for real-time alerts.

Leave a Comment