Anomaly detection is the identification of rare items, events, or observations that differ significantly from the majority of data. From fraud detection and network intrusion to equipment failure prediction and quality control, anomaly detection is crucial in many real-world applications. This lesson covers statistical methods, machine learning approaches (Isolation Forest, One-Class SVM, Local Outlier Factor), deep learning techniques (Autoencoders), and practical implementations for various types of anomalies.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from sklearn.datasets import make_blobs, make_moons
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score, roc_curve
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor
from sklearn.covariance import EllipticEnvelope
import warnings
warnings.filterwarnings('ignore')
# Set random seed
np.random.seed(42)
# Set style
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
print("="*60)
print("ANOMALY DETECTION FUNDAMENTALS")
print("="*60)
anomaly_concepts = """
ANOMALY DETECTION KEY CONCEPTS:
1. TYPES OF ANOMALIES:
• Point Anomalies: Individual instances far from others
• Contextual Anomalies: Normal in different context
• Collective Anomalies: Collection of instances anomalous together
2. APPROACHES:
• Statistical Methods: Z-score, IQR, Gaussian distribution
• Distance-Based: KNN, LOF (Local Outlier Factor)
• Density-Based: DBSCAN, OPTICS
• Isolation-Based: Isolation Forest
• Model-Based: One-Class SVM, Autoencoders
3. CHALLENGES:
• Imbalanced data (anomalies are rare)
• High-dimensional data
• Evolving normal behavior
• Lack of labeled anomalies
• Interpretability
4. APPLICATIONS:
• Fraud Detection: Credit card, insurance fraud
• Network Security: Intrusion detection
• Manufacturing: Quality control, defect detection
• Healthcare: Disease outbreak, abnormal patient data
• Finance: Market manipulation, unusual trading
• IoT: Sensor malfunction, equipment failure
5. EVALUATION METRICS:
• Precision/Recall (when labels available)
• F1-Score
• ROC-AUC
• Precision@K (top K anomalies)
• Contamination rate
6. KEY PARAMETERS:
• Contamination: Expected proportion of anomalies
• Threshold: Decision boundary
• Number of neighbors (for local methods)
• Kernel parameters (for kernel methods)
"""
print(anomaly_concepts)
class StatisticalAnomalyDetector:
"""Statistical methods for anomaly detection"""
def __init__(self):
self.methods = {}
self.thresholds = {}
def z_score_detection(self, data, threshold=3):
"""Detect anomalies using Z-score"""
# Calculate Z-scores
z_scores = np.abs(stats.zscore(data))
# Flag anomalies
anomalies = z_scores > threshold
self.thresholds['z_score'] = threshold
return anomalies, z_scores
def iqr_detection(self, data, multiplier=1.5):
"""Detect anomalies using Interquartile Range (IQR)"""
Q1 = np.percentile(data, 25)
Q3 = np.percentile(data, 75)
IQR = Q3 - Q1
lower_bound = Q1 - multiplier * IQR
upper_bound = Q3 + multiplier * IQR
anomalies = (data < lower_bound) | (data > upper_bound)
self.thresholds['iqr'] = {
'lower': lower_bound,
'upper': upper_bound
}
return anomalies, (lower_bound, upper_bound)
def mahalanobis_distance(self, data):
"""Calculate Mahalanobis distance for multivariate anomaly detection"""
# Calculate mean and covariance
mean = np.mean(data, axis=0)
cov = np.cov(data.T)
inv_cov = np.linalg.inv(cov)
# Calculate Mahalanobis distance for each point
distances = []
for point in data:
diff = point - mean
distance = np.sqrt(diff.T @ inv_cov @ diff)
distances.append(distance)
distances = np.array(distances)
# Use chi-square distribution for threshold
threshold = stats.chi2.ppf(0.95, df=data.shape[1])
anomalies = distances > threshold
return anomalies, distances
def grubbs_test(self, data, alpha=0.05):
"""Grubbs test for outlier detection"""
n = len(data)
mean = np.mean(data)
std = np.std(data)
# Calculate Grubbs statistic for each point
grubbs_stats = np.abs(data - mean) / std
# Critical value from t-distribution
t_critical = stats.t.ppf(1 - alpha/(2*n), n - 2)
grubbs_critical = ((n - 1) / np.sqrt(n)) * np.sqrt(t_critical**2 / (n - 2 + t_critical**2))
anomalies = grubbs_stats > grubbs_critical
return anomalies, grubbs_stats
def demonstrate_statistical_methods(self):
"""Demonstrate various statistical anomaly detection methods"""
# Generate sample data with anomalies
np.random.seed(42)
normal_data = np.random.normal(100, 15, 1000)
anomalies_data = np.random.uniform(150, 200, 50)
data = np.concatenate([normal_data, anomalies_data])
np.random.shuffle(data)
# Apply different methods
methods = {
'Z-Score': self.z_score_detection(data, threshold=3),
'IQR': self.iqr_detection(data, multiplier=1.5),
'Grubbs': self.grubbs_test(data, alpha=0.05)
}
# Visualization
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# Original data distribution
axes[0, 0].hist(data, bins=50, edgecolor='black', alpha=0.7)
axes[0, 0].axvline(x=np.mean(normal_data), color='red',
linestyle='--', label='Normal Mean')
axes[0, 0].set_xlabel('Value')
axes[0, 0].set_ylabel('Frequency')
axes[0, 0].set_title('Data Distribution')
axes[0, 0].legend()
# Z-Score method
anomalies_z, z_scores = methods['Z-Score']
axes[0, 1].scatter(range(len(data)), data, c=anomalies_z,
cmap='coolwarm', alpha=0.6, edgecolor='black', linewidth=0.5)
axes[0, 1].axhline(y=np.mean(data) + 3*np.std(data), color='red',
linestyle='--', alpha=0.5, label='±3σ')
axes[0, 1].axhline(y=np.mean(data) - 3*np.std(data), color='red',
linestyle='--', alpha=0.5)
axes[0, 1].set_xlabel('Index')
axes[0, 1].set_ylabel('Value')
axes[0, 1].set_title(f'Z-Score Method ({np.sum(anomalies_z)} anomalies)')
axes[0, 1].legend()
# IQR method
anomalies_iqr, bounds = methods['IQR']
axes[1, 0].boxplot(data, vert=False)
axes[1, 0].scatter(data[anomalies_iqr], np.ones(np.sum(anomalies_iqr)),
color='red', s=50, label='Anomalies')
axes[1, 0].axvline(x=bounds[0], color='red', linestyle='--',
alpha=0.5, label='IQR Bounds')
axes[1, 0].axvline(x=bounds[1], color='red', linestyle='--', alpha=0.5)
axes[1, 0].set_xlabel('Value')
axes[1, 0].set_title(f'IQR Method ({np.sum(anomalies_iqr)} anomalies)')
axes[1, 0].legend()
# Comparison of methods
axes[1, 1].bar(['Z-Score', 'IQR', 'Grubbs'],
[np.sum(methods['Z-Score'][0]),
np.sum(methods['IQR'][0]),
np.sum(methods['Grubbs'][0])],
color=['blue', 'green', 'orange'])
axes[1, 1].set_ylabel('Number of Anomalies Detected')
axes[1, 1].set_title('Method Comparison')
axes[1, 1].axhline(y=50, color='red', linestyle='--',
label='True Anomalies (50)')
axes[1, 1].legend()
plt.suptitle('Statistical Anomaly Detection Methods', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
return methods
# Demonstrate statistical methods
stat_detector = StatisticalAnomalyDetector()
print("\n" + "="*60)
print("STATISTICAL ANOMALY DETECTION")
print("="*60)
results = stat_detector.demonstrate_statistical_methods()
# Compare detection rates
print("\nDetection Results:")
for method_name, (anomalies, _) in results.items():
print(f" {method_name}: {np.sum(anomalies)} anomalies detected")
print(f" Detection rate: {np.sum(anomalies)/len(anomalies)*100:.2f}%")
class MLAnomalyDetectors:
"""Machine learning methods for anomaly detection"""
def __init__(self):
self.models = {}
self.results = {}
def generate_anomaly_dataset(self, n_samples=1000, contamination=0.1):
"""Generate dataset with anomalies"""
# Generate normal data (clustered)
n_normal = int(n_samples * (1 - contamination))
n_anomalies = n_samples - n_normal
# Normal data - multiple clusters
X_normal, _ = make_blobs(n_samples=n_normal, centers=3,
n_features=2, cluster_std=0.5, random_state=42)
# Anomalies - uniformly distributed
X_anomalies = np.random.uniform(low=-6, high=6,
size=(n_anomalies, 2))
# Combine
X = np.vstack([X_normal, X_anomalies])
y = np.hstack([np.zeros(n_normal), np.ones(n_anomalies)])
# Shuffle
indices = np.random.permutation(len(X))
X, y = X[indices], y[indices]
return X, y
def isolation_forest_detection(self, X, contamination=0.1):
"""Isolation Forest for anomaly detection"""
# Create and fit model
iso_forest = IsolationForest(
contamination=contamination,
random_state=42,
n_estimators=100
)
# Fit and predict
y_pred = iso_forest.fit_predict(X)
# Convert to binary (1 for anomaly, 0 for normal)
y_pred_binary = (y_pred == -1).astype(int)
# Get anomaly scores
scores = iso_forest.score_samples(X)
self.models['isolation_forest'] = iso_forest
return y_pred_binary, scores
def one_class_svm_detection(self, X, nu=0.1):
"""One-Class SVM for anomaly detection"""
# Create and fit model
oc_svm = OneClassSVM(
nu=nu, # Similar to contamination
kernel='rbf',
gamma='scale'
)
# Fit and predict
y_pred = oc_svm.fit_predict(X)
# Convert to binary
y_pred_binary = (y_pred == -1).astype(int)
# Get decision scores
scores = oc_svm.score_samples(X)
self.models['one_class_svm'] = oc_svm
return y_pred_binary, scores
def local_outlier_factor_detection(self, X, contamination=0.1):
"""Local Outlier Factor for anomaly detection"""
# Create and fit model
lof = LocalOutlierFactor(
n_neighbors=20,
contamination=contamination,
novelty=False # For outlier detection
)
# Fit and predict
y_pred = lof.fit_predict(X)
# Convert to binary
y_pred_binary = (y_pred == -1).astype(int)
# Get outlier scores
scores = lof.negative_outlier_factor_
self.models['lof'] = lof
return y_pred_binary, scores
def elliptic_envelope_detection(self, X, contamination=0.1):
"""Elliptic Envelope for anomaly detection"""
# Create and fit model
envelope = EllipticEnvelope(
contamination=contamination,
random_state=42
)
# Fit and predict
y_pred = envelope.fit_predict(X)
# Convert to binary
y_pred_binary = (y_pred == -1).astype(int)
# Get decision scores
scores = envelope.score_samples(X)
self.models['elliptic_envelope'] = envelope
return y_pred_binary, scores
def compare_ml_methods(self):
"""Compare different ML anomaly detection methods"""
# Generate data
X, y_true = self.generate_anomaly_dataset(n_samples=1000,
contamination=0.1)
# Standardize features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Apply different methods
methods = {
'Isolation Forest': self.isolation_forest_detection(X_scaled, 0.1),
'One-Class SVM': self.one_class_svm_detection(X_scaled, 0.1),
'Local Outlier Factor': self.local_outlier_factor_detection(X_scaled, 0.1),
'Elliptic Envelope': self.elliptic_envelope_detection(X_scaled, 0.1)
}
# Calculate metrics
results = {}
for name, (y_pred, scores) in methods.items():
# Calculate metrics
tp = np.sum((y_pred == 1) & (y_true == 1))
fp = np.sum((y_pred == 1) & (y_true == 0))
tn = np.sum((y_pred == 0) & (y_true == 0))
fn = np.sum((y_pred == 0) & (y_true == 1))
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
results[name] = {
'precision': precision,
'recall': recall,
'f1_score': f1,
'predictions': y_pred,
'scores': scores
}
# Visualization
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
# Original data
axes[0, 0].scatter(X[:, 0], X[:, 1], c=y_true, cmap='coolwarm',
alpha=0.6, edgecolor='black', linewidth=0.5)
axes[0, 0].set_title('True Anomalies')
axes[0, 0].set_xlabel('Feature 1')
axes[0, 0].set_ylabel('Feature 2')
# Method predictions
method_names = list(methods.keys())
positions = [(0, 1), (0, 2), (1, 0), (1, 1)]
for (name, result), (i, j) in zip(results.items(), positions):
axes[i, j].scatter(X[:, 0], X[:, 1], c=result['predictions'],
cmap='coolwarm', alpha=0.6, edgecolor='black', linewidth=0.5)
axes[i, j].set_title(f'{name}\nF1: {result["f1_score"]:.3f}')
axes[i, j].set_xlabel('Feature 1')
axes[i, j].set_ylabel('Feature 2')
# Performance comparison
axes[1, 2].bar(range(len(results)),
[r['f1_score'] for r in results.values()],
color=['blue', 'green', 'orange', 'purple'])
axes[1, 2].set_xticks(range(len(results)))
axes[1, 2].set_xticklabels([name.split()[0] for name in results.keys()],
rotation=45, ha='right')
axes[1, 2].set_ylabel('F1-Score')
axes[1, 2].set_title('Method Performance Comparison')
axes[1, 2].grid(True, alpha=0.3, axis='y')
plt.suptitle('Machine Learning Anomaly Detection Methods', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
return results
# ML anomaly detection
ml_detector = MLAnomalyDetectors()
print("\n" + "="*60)
print("MACHINE LEARNING ANOMALY DETECTION")
print("="*60)
ml_results = ml_detector.compare_ml_methods()
# Print performance metrics
print("\nPerformance Metrics:")
for method, metrics in ml_results.items():
print(f"\n{method}:")
print(f" Precision: {metrics['precision']:.3f}")
print(f" Recall: {metrics['recall']:.3f}")
print(f" F1-Score: {metrics['f1_score']:.3f}")
class TimeSeriesAnomalyDetector:
"""Anomaly detection for time series data"""
def __init__(self):
self.models = {}
self.anomalies = {}
def generate_time_series_with_anomalies(self, n_points=1000):
"""Generate time series with various types of anomalies"""
t = np.arange(n_points)
# Base signal
trend = t * 0.01
seasonal = 10 * np.sin(2 * np.pi * t / 50)
noise = np.random.normal(0, 1, n_points)
# Combine
signal = trend + seasonal + noise + 50
# Add anomalies
anomaly_indices = []
# Point anomalies (spikes)
spike_indices = [200, 450, 700]
for idx in spike_indices:
signal[idx] += np.random.choice([-1, 1]) * 25
anomaly_indices.append(idx)
# Contextual anomalies (seasonal disruption)
for idx in range(550, 570):
signal[idx] = 50 + trend[idx] + noise[idx] # Remove seasonal component
anomaly_indices.append(idx)
# Collective anomalies (level shift)
for idx in range(800, 850):
signal[idx] += 15
anomaly_indices.append(idx)
return t, signal, anomaly_indices
def moving_average_detection(self, signal, window_size=50, n_sigmas=3):
"""Detect anomalies using moving average and standard deviation"""
# Calculate moving average and std
moving_avg = pd.Series(signal).rolling(window=window_size, center=True).mean()
moving_std = pd.Series(signal).rolling(window=window_size, center=True).std()
# Define bounds
upper_bound = moving_avg + (n_sigmas * moving_std)
lower_bound = moving_avg - (n_sigmas * moving_std)
# Detect anomalies
anomalies = (signal > upper_bound) | (signal < lower_bound)
return anomalies, moving_avg, upper_bound, lower_bound
def seasonal_decomposition_detection(self, signal, period=50):
"""Detect anomalies using seasonal decomposition"""
from statsmodels.tsa.seasonal import seasonal_decompose
# Perform decomposition
decomposition = seasonal_decompose(signal, model='additive', period=period)
# Get residuals
residuals = decomposition.resid
# Remove NaN values
residuals = residuals[~np.isnan(residuals)]
# Detect anomalies in residuals using IQR
Q1 = np.percentile(residuals, 25)
Q3 = np.percentile(residuals, 75)
IQR = Q3 - Q1
lower_bound = Q1 - 2.5 * IQR
upper_bound = Q3 + 2.5 * IQR
# Create anomaly mask
anomalies = np.zeros(len(signal), dtype=bool)
residual_anomalies = (residuals < lower_bound) | (residuals > upper_bound)
# Map back to original series (accounting for NaN from decomposition)
start_idx = len(signal) - len(residuals)
anomalies[start_idx:] = residual_anomalies
return anomalies, decomposition
def demonstrate_time_series_detection(self):
"""Demonstrate time series anomaly detection methods"""
# Generate data
t, signal, true_anomalies = self.generate_time_series_with_anomalies()
# Apply detection methods
ma_anomalies, ma_avg, ma_upper, ma_lower = self.moving_average_detection(signal)
sd_anomalies, decomposition = self.seasonal_decomposition_detection(signal)
# Visualization
fig, axes = plt.subplots(3, 1, figsize=(15, 10), sharex=True)
# Original signal with true anomalies
axes[0].plot(t, signal, label='Signal', alpha=0.7)
axes[0].scatter(true_anomalies, signal[true_anomalies],
color='red', s=50, zorder=5, label='True Anomalies')
axes[0].set_ylabel('Value')
axes[0].set_title('Original Time Series with Anomalies')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# Moving average detection
axes[1].plot(t, signal, alpha=0.5, label='Signal')
axes[1].plot(t, ma_avg, color='green', label='Moving Average')
axes[1].fill_between(t, ma_lower, ma_upper, alpha=0.2, color='green')
ma_detected = np.where(ma_anomalies)[0]
axes[1].scatter(ma_detected, signal[ma_detected],
color='red', s=30, label='Detected Anomalies')
axes[1].set_ylabel('Value')
axes[1].set_title(f'Moving Average Detection ({np.sum(ma_anomalies)} anomalies)')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# Seasonal decomposition detection
axes[2].plot(t, signal, alpha=0.5, label='Signal')
sd_detected = np.where(sd_anomalies)[0]
axes[2].scatter(sd_detected, signal[sd_detected],
color='orange', s=30, label='Detected Anomalies')
axes[2].set_xlabel('Time')
axes[2].set_ylabel('Value')
axes[2].set_title(f'Seasonal Decomposition Detection ({np.sum(sd_anomalies)} anomalies)')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.suptitle('Time Series Anomaly Detection Methods', fontsize=14, y=1.002)
plt.tight_layout()
plt.show()
# Calculate detection accuracy
true_anomaly_mask = np.zeros(len(signal), dtype=bool)
true_anomaly_mask[true_anomalies] = True
methods_accuracy = {
'Moving Average': self.calculate_detection_accuracy(ma_anomalies, true_anomaly_mask),
'Seasonal Decomposition': self.calculate_detection_accuracy(sd_anomalies, true_anomaly_mask)
}
return methods_accuracy
def calculate_detection_accuracy(self, predicted, actual):
"""Calculate detection accuracy metrics"""
tp = np.sum(predicted & actual)
fp = np.sum(predicted & ~actual)
fn = np.sum(~predicted & actual)
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
return {'precision': precision, 'recall': recall, 'f1': f1}
# Time series anomaly detection
ts_detector = TimeSeriesAnomalyDetector()
print("\n" + "="*60)
print("TIME SERIES ANOMALY DETECTION")
print("="*60)
ts_results = ts_detector.demonstrate_time_series_detection()
print("\nTime Series Detection Accuracy:")
for method, metrics in ts_results.items():
print(f"\n{method}:")
print(f" Precision: {metrics['precision']:.3f}")
print(f" Recall: {metrics['recall']:.3f}")
print(f" F1-Score: {metrics['f1']:.3f}")
print("\n" + "="*60)
print("ANOMALY DETECTION BEST PRACTICES")
print("="*60)
best_practices = """
KEY GUIDELINES:
1. UNDERSTAND YOUR DATA:
• Distribution characteristics
• Normal behavior patterns
• Types of anomalies expected
• Temporal dependencies
• Feature correlations
2. CHOOSE RIGHT METHOD:
• Statistical: Simple, interpretable, univariate
• Isolation Forest: No assumptions, fast, good for mixed types
• One-Class SVM: Good boundaries, works in high dimensions
• LOF: Density-based, handles local anomalies
• Autoencoders: Complex patterns, high dimensions
3. PREPROCESSING:
• Handle missing values carefully
• Scale/normalize features
• Consider feature engineering
• Remove known outliers from training
• Address class imbalance
4. PARAMETER TUNING:
• Contamination: Estimate carefully
• Number of neighbors: Based on data density
• Threshold: Use validation data
• Ensemble multiple methods
5. EVALUATION:
• Use appropriate metrics (Precision@K for ranking)
• Consider business impact of FP vs FN
• Validate with domain experts
• Monitor performance over time
6. PRODUCTION CONSIDERATIONS:
• Real-time vs batch processing
• Model update frequency
• Handling concept drift
• Scalability requirements
• Interpretability needs
"""
print(best_practices)
# Application examples
applications = """
REAL-WORLD APPLICATIONS:
1. FRAUD DETECTION:
Method: Isolation Forest + Rules
Features: Transaction amount, frequency, location
Challenges: Evolving fraud patterns
2. NETWORK INTRUSION:
Method: One-Class SVM + Deep Learning
Features: Packet statistics, flow patterns
Challenges: High-dimensional, real-time
3. MANUFACTURING DEFECTS:
Method: Statistical Process Control + Autoencoders
Features: Sensor readings, quality metrics
Challenges: Multi-variate, temporal
4. HEALTH MONITORING:
Method: LSTM + Threshold-based
Features: Vital signs, lab results
Challenges: Patient-specific baselines
5. FINANCIAL MARKETS:
Method: GARCH + Isolation Forest
Features: Price, volume, volatility
Challenges: Non-stationary, regime changes
"""
print(applications)
# Method selection guide
selection_guide = """
METHOD SELECTION GUIDE:
Data Type | Recommended Methods
-----------------|--------------------
Low-dimensional | Statistical, LOF
High-dimensional | Isolation Forest, Autoencoders
Time series | ARIMA residuals, LSTM
Streaming | Incremental LOF, Online learning
Labeled data | Supervised classification
Mixed types | Isolation Forest, Ensemble
Volume | Recommended Methods
-----------------|--------------------
< 1K samples | Statistical, One-Class SVM
1K-100K | Isolation Forest, LOF
> 100K | Mini-batch, Sampling
Real-time | Lightweight models, Caching
"""
print(selection_guide)
Build a fraud detection system:
Detect anomalies in IoT sensor data:
Detect anomalies in system logs: