DBSCAN (Density-Based Spatial Clustering of Applications with Noise) is a powerful clustering algorithm that groups together points that are closely packed together. Unlike K-means, it can find clusters of arbitrary shapes and automatically identifies outliers.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_moons, make_circles
from sklearn.metrics import silhouette_score
import warnings
warnings.filterwarnings('ignore')
print("="*60)
print("DBSCAN CLUSTERING")
print("="*60)
# Key parameters explained
dbscan_params = """
DBSCAN PARAMETERS:
1. eps (ε): Maximum distance between two samples for them to be neighbors
2. min_samples: Minimum points in a neighborhood to form a dense region
POINT CLASSIFICATIONS:
• Core Point: Has at least min_samples points within eps distance
• Border Point: Within eps of a core point but has fewer than min_samples neighbors
• Noise Point: Not a core point and not within eps of any core point
"""
print(dbscan_params)
# Generate sample data with non-convex shapes
X_moons, y_moons = make_moons(n_samples=300, noise=0.1, random_state=42)
X_circles, y_circles = make_circles(n_samples=300, noise=0.05,
factor=0.5, random_state=42)
# Standardize features (important for DBSCAN)
scaler = StandardScaler()
X_moons_scaled = scaler.fit_transform(X_moons)
X_circles_scaled = scaler.fit_transform(X_circles)
# Apply DBSCAN
dbscan_moons = DBSCAN(eps=0.3, min_samples=5)
clusters_moons = dbscan_moons.fit_predict(X_moons_scaled)
dbscan_circles = DBSCAN(eps=0.3, min_samples=5)
clusters_circles = dbscan_circles.fit_predict(X_circles_scaled)
# Visualize results
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# Moons dataset
axes[0, 0].scatter(X_moons[:, 0], X_moons[:, 1], c=y_moons,
cmap='viridis', alpha=0.6)
axes[0, 0].set_title('Moons: Original Labels')
axes[0, 0].grid(True, alpha=0.3)
axes[0, 1].scatter(X_moons[:, 0], X_moons[:, 1], c=clusters_moons,
cmap='viridis', alpha=0.6)
axes[0, 1].set_title(f'DBSCAN Results (eps=0.3, min_samples=5)')
axes[0, 1].grid(True, alpha=0.3)
# Circles dataset
axes[1, 0].scatter(X_circles[:, 0], X_circles[:, 1], c=y_circles,
cmap='plasma', alpha=0.6)
axes[1, 0].set_title('Circles: Original Labels')
axes[1, 0].grid(True, alpha=0.3)
axes[1, 1].scatter(X_circles[:, 0], X_circles[:, 1], c=clusters_circles,
cmap='plasma', alpha=0.6)
axes[1, 1].set_title(f'DBSCAN Results (eps=0.3, min_samples=5)')
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Analyze results
n_clusters_moons = len(set(clusters_moons)) - (1 if -1 in clusters_moons else 0)
n_noise_moons = list(clusters_moons).count(-1)
n_clusters_circles = len(set(clusters_circles)) - (1 if -1 in clusters_circles else 0)
n_noise_circles = list(clusters_circles).count(-1)
print(f"\nMoons Dataset:")
print(f" Clusters found: {n_clusters_moons}")
print(f" Noise points: {n_noise_moons}")
print(f"\nCircles Dataset:")
print(f" Clusters found: {n_clusters_circles}")
print(f" Noise points: {n_noise_circles}")
class DBSCANTuner:
def __init__(self, X):
self.X = StandardScaler().fit_transform(X)
def find_optimal_eps(self, min_samples=5):
"""Use k-distance graph to find optimal eps"""
from sklearn.neighbors import NearestNeighbors
# Calculate distances to k-th nearest neighbor
nbrs = NearestNeighbors(n_neighbors=min_samples).fit(self.X)
distances, indices = nbrs.kneighbors(self.X)
# Sort distances
k_distances = np.sort(distances[:, min_samples-1])
# Plot k-distance graph
plt.figure(figsize=(10, 6))
plt.plot(k_distances)
plt.ylabel(f'{min_samples}-NN Distance')
plt.xlabel('Points sorted by distance')
plt.title('K-distance Graph for Optimal Eps')
plt.grid(True, alpha=0.3)
# Find elbow point (simplified)
# In practice, you might want more sophisticated elbow detection
gradients = np.gradient(k_distances)
elbow_idx = np.argmax(gradients)
optimal_eps = k_distances[elbow_idx]
plt.axhline(y=optimal_eps, color='r', linestyle='--',
label=f'Suggested eps = {optimal_eps:.3f}')
plt.legend()
plt.show()
return optimal_eps
def grid_search(self, eps_range, min_samples_range):
"""Grid search for best parameters"""
results = []
for eps in eps_range:
for min_samples in min_samples_range:
dbscan = DBSCAN(eps=eps, min_samples=min_samples)
clusters = dbscan.fit_predict(self.X)
n_clusters = len(set(clusters)) - (1 if -1 in clusters else 0)
n_noise = list(clusters).count(-1)
# Only calculate silhouette if we have clusters
if n_clusters > 1:
# Exclude noise points for silhouette calculation
mask = clusters != -1
if np.sum(mask) > 1:
score = silhouette_score(self.X[mask], clusters[mask])
else:
score = -1
else:
score = -1
results.append({
'eps': eps,
'min_samples': min_samples,
'n_clusters': n_clusters,
'n_noise': n_noise,
'silhouette': score
})
results_df = pd.DataFrame(results)
# Find best parameters based on silhouette score
best_idx = results_df['silhouette'].idxmax()
best_params = results_df.iloc[best_idx]
print("\nParameter Grid Search Results:")
print("-" * 40)
print(f"Best Parameters:")
print(f" eps: {best_params['eps']}")
print(f" min_samples: {best_params['min_samples']}")
print(f" Clusters: {best_params['n_clusters']:.0f}")
print(f" Noise points: {best_params['n_noise']:.0f}")
print(f" Silhouette Score: {best_params['silhouette']:.3f}")
return results_df, best_params
# Example usage
X_test, _ = make_moons(n_samples=200, noise=0.1, random_state=42)
tuner = DBSCANTuner(X_test)
# Find optimal eps
optimal_eps = tuner.find_optimal_eps(min_samples=5)
print(f"\nSuggested eps: {optimal_eps:.3f}")
# Grid search
eps_range = np.arange(0.1, 0.6, 0.1)
min_samples_range = [3, 5, 7, 10]
results, best = tuner.grid_search(eps_range, min_samples_range)
# Create datasets with different characteristics
np.random.seed(42)
# 1. Varying density clusters
cluster1 = np.random.randn(100, 2) * 0.3 + [0, 0]
cluster2 = np.random.randn(100, 2) * 0.8 + [5, 5]
cluster3 = np.random.randn(50, 2) * 0.2 + [8, 2]
noise = np.random.uniform(-2, 10, (20, 2))
X_varying = np.vstack([cluster1, cluster2, cluster3, noise])
# 2. High-dimensional data
X_high_dim = np.random.randn(200, 10) # 10 dimensions
# 3. Data with outliers
from sklearn.datasets import make_blobs
X_blobs, _ = make_blobs(n_samples=200, centers=3, n_features=2,
cluster_std=0.5, random_state=42)
# Add outliers
outliers = np.random.uniform(X_blobs.min(), X_blobs.max(), (20, 2))
X_with_outliers = np.vstack([X_blobs, outliers])
# Compare DBSCAN performance
datasets = {
'Varying Density': X_varying,
'High Dimensional': X_high_dim[:, :2], # Project to 2D for visualization
'With Outliers': X_with_outliers
}
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
for idx, (name, X) in enumerate(datasets.items()):
# Standardize
X_scaled = StandardScaler().fit_transform(X)
# Apply DBSCAN with adaptive parameters
if name == 'High Dimensional':
dbscan = DBSCAN(eps=1.5, min_samples=10)
elif name == 'Varying Density':
dbscan = DBSCAN(eps=0.5, min_samples=5)
else:
dbscan = DBSCAN(eps=0.3, min_samples=5)
clusters = dbscan.fit_predict(X_scaled)
# Visualize
axes[idx].scatter(X[:, 0], X[:, 1], c=clusters, cmap='viridis',
alpha=0.6, edgecolors='black', linewidth=0.5)
axes[idx].set_title(f'{name}')
# Mark noise points
noise_mask = clusters == -1
if np.any(noise_mask):
axes[idx].scatter(X[noise_mask, 0], X[noise_mask, 1],
c='red', marker='x', s=50, label='Noise')
n_clusters = len(set(clusters)) - (1 if -1 in clusters else 0)
n_noise = list(clusters).count(-1)
axes[idx].set_xlabel(f'Clusters: {n_clusters}, Noise: {n_noise}')
axes[idx].grid(True, alpha=0.3)
if np.any(noise_mask):
axes[idx].legend()
plt.tight_layout()
plt.show()
class DBSCANApplications:
def anomaly_detection(self):
"""Use DBSCAN for anomaly detection"""
# Generate normal data with anomalies
np.random.seed(42)
normal_data = np.random.randn(200, 2)
anomalies = np.random.uniform(-4, 4, (10, 2))
X = np.vstack([normal_data, anomalies])
# Apply DBSCAN
dbscan = DBSCAN(eps=0.5, min_samples=5)
clusters = dbscan.fit_predict(X)
# Noise points are anomalies
anomaly_mask = clusters == -1
plt.figure(figsize=(10, 6))
plt.scatter(X[~anomaly_mask, 0], X[~anomaly_mask, 1],
c='blue', alpha=0.6, label='Normal')
plt.scatter(X[anomaly_mask, 0], X[anomaly_mask, 1],
c='red', marker='x', s=100, label='Anomalies')
plt.title('Anomaly Detection with DBSCAN')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
print(f"Detected {np.sum(anomaly_mask)} anomalies out of {len(X)} points")
return anomaly_mask
def spatial_clustering(self):
"""Geographical/spatial data clustering"""
# Simulate GPS coordinates (latitude, longitude)
np.random.seed(42)
# City centers
cities = {
'City A': [37.7749, -122.4194],
'City B': [37.3382, -121.8863],
'City C': [37.8716, -122.2727]
}
points = []
true_labels = []
for i, (city, center) in enumerate(cities.items()):
# Generate points around each city
n_points = np.random.poisson(50)
city_points = np.random.randn(n_points, 2) * 0.05 + center
points.extend(city_points)
true_labels.extend([i] * n_points)
X = np.array(points)
# Apply DBSCAN with haversine metric for geographical data
from sklearn.metrics.pairwise import haversine_distances
# Convert to radians
X_rad = np.radians(X)
# DBSCAN with custom metric
dbscan = DBSCAN(eps=0.01, min_samples=5, metric='haversine')
clusters = dbscan.fit_predict(X_rad)
# Visualize
plt.figure(figsize=(10, 8))
scatter = plt.scatter(X[:, 1], X[:, 0], c=clusters,
cmap='viridis', alpha=0.6, s=30)
plt.colorbar(scatter, label='Cluster')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('Spatial Clustering with DBSCAN')
plt.grid(True, alpha=0.3)
# Mark cluster centers
for cluster_id in set(clusters):
if cluster_id != -1:
cluster_points = X[clusters == cluster_id]
center = cluster_points.mean(axis=0)
plt.plot(center[1], center[0], 'r*', markersize=15)
plt.show()
n_clusters = len(set(clusters)) - (1 if -1 in clusters else 0)
print(f"Found {n_clusters} spatial clusters")
return clusters
def customer_segmentation(self):
"""Customer behavior segmentation"""
# Simulate customer data
np.random.seed(42)
# Features: [purchase_frequency, avg_purchase_value, recency_days]
segments = {
'High Value': {'mean': [30, 500, 5], 'std': [5, 100, 2]},
'Regular': {'mean': [15, 150, 15], 'std': [3, 30, 5]},
'Occasional': {'mean': [3, 50, 60], 'std': [1, 20, 20]}
}
data = []
true_segments = []
for segment_name, params in segments.items():
n_customers = np.random.poisson(100)
segment_data = np.random.randn(n_customers, 3) * params['std'] + params['mean']
data.extend(segment_data)
true_segments.extend([segment_name] * n_customers)
X = np.array(data)
# Standardize features (important for different scales)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Apply DBSCAN
dbscan = DBSCAN(eps=0.5, min_samples=10)
clusters = dbscan.fit_predict(X_scaled)
# Analyze segments
results = pd.DataFrame(X, columns=['Frequency', 'Avg_Value', 'Recency'])
results['Cluster'] = clusters
print("\nCustomer Segments Found:")
print("-" * 50)
for cluster_id in sorted(set(clusters)):
if cluster_id == -1:
print(f"\nNoise/Outlier Customers: {sum(clusters == -1)}")
else:
segment = results[results['Cluster'] == cluster_id]
print(f"\nSegment {cluster_id}:")
print(f" Size: {len(segment)} customers")
print(f" Avg Frequency: {segment['Frequency'].mean():.1f}")
print(f" Avg Purchase Value: ${segment['Avg_Value'].mean():.2f}")
print(f" Avg Recency: {segment['Recency'].mean():.1f} days")
return results
# Run applications
apps = DBSCANApplications()
print("\n" + "="*60)
print("DBSCAN APPLICATIONS")
print("="*60)
print("\n1. Anomaly Detection:")
print("-" * 30)
anomalies = apps.anomaly_detection()
print("\n2. Spatial Clustering:")
print("-" * 30)
spatial_clusters = apps.spatial_clustering()
print("\n3. Customer Segmentation:")
print("-" * 30)
customer_segments = apps.customer_segmentation()
# Compare clustering algorithms
from sklearn.cluster import KMeans, AgglomerativeClustering
from sklearn.mixture import GaussianMixture
import time
# Generate complex dataset
X_complex, y_complex = make_moons(n_samples=300, noise=0.1, random_state=42)
X_complex = StandardScaler().fit_transform(X_complex)
# Define algorithms
algorithms = {
'DBSCAN': DBSCAN(eps=0.3, min_samples=5),
'K-Means': KMeans(n_clusters=2, random_state=42),
'Hierarchical': AgglomerativeClustering(n_clusters=2),
'GMM': GaussianMixture(n_components=2, random_state=42)
}
# Compare performance
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.ravel()
results_comparison = []
for idx, (name, algorithm) in enumerate(algorithms.items()):
start_time = time.time()
if name == 'GMM':
algorithm.fit(X_complex)
clusters = algorithm.predict(X_complex)
else:
clusters = algorithm.fit_predict(X_complex)
elapsed_time = time.time() - start_time
# Calculate metrics
n_clusters = len(set(clusters)) - (1 if -1 in clusters else 0)
# Silhouette score (exclude noise for DBSCAN)
if -1 in clusters:
mask = clusters != -1
if np.sum(mask) > 1:
score = silhouette_score(X_complex[mask], clusters[mask])
else:
score = -1
else:
score = silhouette_score(X_complex, clusters)
results_comparison.append({
'Algorithm': name,
'Clusters': n_clusters,
'Silhouette': score,
'Time (s)': elapsed_time
})
# Visualize
axes[idx].scatter(X_complex[:, 0], X_complex[:, 1],
c=clusters, cmap='viridis', alpha=0.6)
axes[idx].set_title(f'{name}')
axes[idx].set_xlabel(f'Clusters: {n_clusters}, Score: {score:.3f}')
axes[idx].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Display comparison table
comparison_df = pd.DataFrame(results_comparison)
print("\nAlgorithm Comparison:")
print(comparison_df.to_string(index=False))
# Advantages and disadvantages
print("\n" + "="*60)
print("DBSCAN CHARACTERISTICS")
print("="*60)
characteristics = """
ADVANTAGES:
✓ No need to specify number of clusters
✓ Can find arbitrarily shaped clusters
✓ Robust to outliers (identifies them as noise)
✓ Only two parameters (eps and min_samples)
✓ Deterministic (same results each run)
DISADVANTAGES:
✗ Sensitive to parameter selection
✗ Struggles with clusters of varying densities
✗ Performance degrades in high dimensions
✗ Not suitable for datasets with large differences in densities
✗ Border points can be assigned to multiple clusters
WHEN TO USE DBSCAN:
• Non-convex cluster shapes expected
• Presence of noise and outliers
• Unknown number of clusters
• Spatial/geographical data
• Anomaly detection tasks
"""
print(characteristics)
class AdvancedDBSCAN:
def adaptive_dbscan(self, X, k=5):
"""DBSCAN with adaptive eps based on local density"""
from sklearn.neighbors import NearestNeighbors
# Calculate k-nearest neighbor distances for each point
nbrs = NearestNeighbors(n_neighbors=k).fit(X)
distances, indices = nbrs.kneighbors(X)
# Use median of k-th neighbor distance as adaptive eps
eps_values = distances[:, k-1]
adaptive_eps = np.median(eps_values)
print(f"Adaptive eps calculated: {adaptive_eps:.3f}")
# Apply DBSCAN with adaptive eps
dbscan = DBSCAN(eps=adaptive_eps, min_samples=k)
clusters = dbscan.fit_predict(X)
return clusters, adaptive_eps
def hierarchical_dbscan(self, X):
"""Use HDBSCAN for hierarchical density-based clustering"""
try:
import hdbscan
# Apply HDBSCAN
clusterer = hdbscan.HDBSCAN(min_cluster_size=5)
clusters = clusterer.fit_predict(X)
# Get cluster persistence (stability)
persistence = clusterer.cluster_persistence_
print("HDBSCAN Results:")
print(f"Number of clusters: {len(set(clusters)) - 1}")
print(f"Cluster persistence: {persistence}")
return clusters
except ImportError:
print("HDBSCAN not installed. Using standard DBSCAN.")
dbscan = DBSCAN(eps=0.3, min_samples=5)
return dbscan.fit_predict(X)
def incremental_dbscan(self, X_initial, X_new):
"""Incremental DBSCAN for streaming data"""
# Fit initial model
dbscan = DBSCAN(eps=0.3, min_samples=5)
initial_clusters = dbscan.fit_predict(X_initial)
# For new points, find their cluster assignments
from sklearn.neighbors import NearestNeighbors
nbrs = NearestNeighbors(radius=0.3).fit(X_initial)
distances, indices = nbrs.radius_neighbors(X_new)
new_clusters = np.full(len(X_new), -1) # Initialize as noise
for i, neighbors in enumerate(indices):
if len(neighbors) >= 5:
# Assign to majority cluster of neighbors
neighbor_clusters = initial_clusters[neighbors]
neighbor_clusters = neighbor_clusters[neighbor_clusters != -1]
if len(neighbor_clusters) > 0:
unique, counts = np.unique(neighbor_clusters, return_counts=True)
new_clusters[i] = unique[np.argmax(counts)]
return new_clusters
def multi_density_clustering(self, X):
"""Handle varying density clusters using OPTICS"""
from sklearn.cluster import OPTICS
# OPTICS can handle varying densities better than DBSCAN
optics = OPTICS(min_samples=5, xi=0.05, min_cluster_size=0.1)
clusters = optics.fit_predict(X)
# Get reachability plot
reachability = optics.reachability_[optics.ordering_]
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.scatter(X[:, 0], X[:, 1], c=clusters, cmap='viridis', alpha=0.6)
plt.title('OPTICS Clustering (Varying Densities)')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
plt.plot(reachability)
plt.ylabel('Reachability Distance')
plt.xlabel('Order of Points')
plt.title('Reachability Plot')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return clusters
# Demonstrate advanced techniques
X_test, _ = make_moons(n_samples=300, noise=0.1, random_state=42)
X_test = StandardScaler().fit_transform(X_test)
advanced = AdvancedDBSCAN()
print("\n" + "="*60)
print("ADVANCED DBSCAN TECHNIQUES")
print("="*60)
print("\n1. Adaptive DBSCAN:")
print("-" * 30)
adaptive_clusters, adaptive_eps = advanced.adaptive_dbscan(X_test)
print("\n2. Hierarchical DBSCAN:")
print("-" * 30)
hierarchical_clusters = advanced.hierarchical_dbscan(X_test)
print("\n3. Multi-Density Clustering (OPTICS):")
print("-" * 30)
optics_clusters = advanced.multi_density_clustering(X_test)
print("\n" + "="*60)
print("DBSCAN BEST PRACTICES")
print("="*60)
best_practices = """
KEY GUIDELINES:
1. DATA PREPROCESSING:
• ALWAYS standardize/normalize features
• Remove or impute missing values
• Consider dimensionality reduction for high-D data
2. PARAMETER SELECTION:
• eps: Use k-distance graph or domain knowledge
• min_samples: Usually 2*dimensions or at least 3-5
• Start with min_samples = 4 for 2D data
3. EVALUATION:
• Check number of noise points (shouldn't be too high)
• Validate with domain knowledge
• Use silhouette score for compact clusters
• Consider stability across parameter variations
4. OPTIMIZATION:
• Use spatial indexing for large datasets
• Consider OPTICS for varying densities
• Use HDBSCAN for hierarchical approach
5. COMMON PITFALLS:
⚠ Forgetting to scale features
⚠ Using same eps for varying density data
⚠ Ignoring curse of dimensionality
⚠ Not handling categorical variables properly
"""
print(best_practices)
# Implementation checklist
checklist = """
IMPLEMENTATION CHECKLIST:
□ Data cleaned and preprocessed
□ Features standardized/normalized
□ Optimal eps determined (k-distance graph)
□ min_samples selected based on dimensionality
□ Results validated with domain knowledge
□ Noise points analyzed
□ Comparison with other algorithms done
□ Performance metrics calculated
"""
print(checklist)
Create a dataset with clusters of varying densities and implement a solution:
Build a streaming anomaly detection system:
Use DBSCAN for image segmentation:
# Complete example: Network Intrusion Detection
class NetworkIntrusionDetector:
def __init__(self):
self.scaler = StandardScaler()
self.dbscan = None
def prepare_data(self, n_samples=1000):
"""Simulate network traffic data"""
np.random.seed(42)
# Normal traffic (majority)
normal_traffic = {
'packet_size': np.random.normal(1500, 200, n_samples),
'connection_duration': np.random.exponential(10, n_samples),
'packets_per_second': np.random.poisson(100, n_samples),
'unique_destinations': np.random.poisson(5, n_samples)
}
# Attack patterns (minority)
n_attacks = int(n_samples * 0.05) # 5% attacks
# DDoS attack pattern
ddos = {
'packet_size': np.random.normal(64, 10, n_attacks//2),
'connection_duration': np.random.exponential(0.1, n_attacks//2),
'packets_per_second': np.random.poisson(10000, n_attacks//2),
'unique_destinations': np.ones(n_attacks//2)
}
# Port scan pattern
port_scan = {
'packet_size': np.random.normal(40, 5, n_attacks//2),
'connection_duration': np.random.exponential(0.01, n_attacks//2),
'packets_per_second': np.random.poisson(500, n_attacks//2),
'unique_destinations': np.random.poisson(100, n_attacks//2)
}
# Combine data
features = []
labels = []
for key in normal_traffic.keys():
normal_traffic[key] = list(normal_traffic[key])
ddos[key] = list(ddos[key])
port_scan[key] = list(port_scan[key])
# Add normal traffic
for i in range(n_samples):
features.append([
normal_traffic['packet_size'][i],
normal_traffic['connection_duration'][i],
normal_traffic['packets_per_second'][i],
normal_traffic['unique_destinations'][i]
])
labels.append('normal')
# Add attacks
for i in range(n_attacks//2):
features.append([
ddos['packet_size'][i],
ddos['connection_duration'][i],
ddos['packets_per_second'][i],
ddos['unique_destinations'][i]
])
labels.append('ddos')
features.append([
port_scan['packet_size'][i],
port_scan['connection_duration'][i],
port_scan['packets_per_second'][i],
port_scan['unique_destinations'][i]
])
labels.append('port_scan')
return np.array(features), np.array(labels)
def train(self, X_train):
"""Train DBSCAN on normal traffic"""
X_scaled = self.scaler.fit_transform(X_train)
# Find optimal parameters
from sklearn.neighbors import NearestNeighbors
nbrs = NearestNeighbors(n_neighbors=5).fit(X_scaled)
distances, _ = nbrs.kneighbors(X_scaled)
# Use 95th percentile distance as eps
eps = np.percentile(distances[:, 4], 95)
self.dbscan = DBSCAN(eps=eps, min_samples=5)
self.dbscan.fit(X_scaled)
print(f"Model trained with eps={eps:.3f}")
def detect_intrusions(self, X_test, y_test):
"""Detect network intrusions"""
X_scaled = self.scaler.transform(X_test)
# Predict clusters
clusters = self.dbscan.fit_predict(X_scaled)
# Noise points are potential intrusions
intrusions = clusters == -1
# Evaluate detection
from sklearn.metrics import classification_report, confusion_matrix
# Convert labels to binary (normal vs attack)
y_binary = (y_test != 'normal').astype(int)
print("\nIntrusion Detection Results:")
print("-" * 40)
print(classification_report(y_binary, intrusions,
target_names=['Normal', 'Attack']))
# Confusion matrix
cm = confusion_matrix(y_binary, intrusions)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
# Confusion matrix
im = ax1.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
ax1.figure.colorbar(im, ax=ax1)
ax1.set(xticks=np.arange(cm.shape[1]),
yticks=np.arange(cm.shape[0]),
xticklabels=['Normal', 'Attack'],
yticklabels=['Normal', 'Attack'],
title='Confusion Matrix',
ylabel='True label',
xlabel='Predicted label')
# Annotate cells
for i in range(cm.shape[0]):
for j in range(cm.shape[1]):
ax1.text(j, i, format(cm[i, j], 'd'),
ha="center", va="center",
color="white" if cm[i, j] > cm.max() / 2. else "black")
# Scatter plot of detections
ax2.scatter(X_test[~intrusions, 0], X_test[~intrusions, 1],
c='blue', alpha=0.5, label='Normal', s=20)
ax2.scatter(X_test[intrusions, 0], X_test[intrusions, 1],
c='red', alpha=0.7, label='Intrusion', s=40, marker='^')
ax2.set_xlabel('Packet Size')
ax2.set_ylabel('Connection Duration')
ax2.set_title('Network Traffic Analysis')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return intrusions
# Run intrusion detection example
detector = NetworkIntrusionDetector()
# Prepare data
X, y = detector.prepare_data(1000)
# Split data
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# Train on normal traffic only
normal_mask = y_train == 'normal'
detector.train(X_train[normal_mask])
# Detect intrusions
print("\n" + "="*60)
print("NETWORK INTRUSION DETECTION SYSTEM")
print("="*60)
intrusions = detector.detect_intrusions(X_test, y_test)