Skip to main content

🎯 DBSCAN: Density-Based Clustering

Introduction

DBSCAN (Density-Based Spatial Clustering of Applications with Noise) is a powerful clustering algorithm that groups together points that are closely packed together. Unlike K-means, it can find clusters of arbitrary shapes and automatically identifies outliers.

Core Concepts

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_moons, make_circles
from sklearn.metrics import silhouette_score
import warnings
warnings.filterwarnings('ignore')

print("="*60)
print("DBSCAN CLUSTERING")
print("="*60)

# Key parameters explained
dbscan_params = """
DBSCAN PARAMETERS:
1. eps (ε): Maximum distance between two samples for them to be neighbors
2. min_samples: Minimum points in a neighborhood to form a dense region

POINT CLASSIFICATIONS:
• Core Point: Has at least min_samples points within eps distance
• Border Point: Within eps of a core point but has fewer than min_samples neighbors
• Noise Point: Not a core point and not within eps of any core point
"""

print(dbscan_params)

Basic Implementation

# Generate sample data with non-convex shapes
X_moons, y_moons = make_moons(n_samples=300, noise=0.1, random_state=42)
X_circles, y_circles = make_circles(n_samples=300, noise=0.05, 
                                     factor=0.5, random_state=42)

# Standardize features (important for DBSCAN)
scaler = StandardScaler()
X_moons_scaled = scaler.fit_transform(X_moons)
X_circles_scaled = scaler.fit_transform(X_circles)

# Apply DBSCAN
dbscan_moons = DBSCAN(eps=0.3, min_samples=5)
clusters_moons = dbscan_moons.fit_predict(X_moons_scaled)

dbscan_circles = DBSCAN(eps=0.3, min_samples=5)
clusters_circles = dbscan_circles.fit_predict(X_circles_scaled)

# Visualize results
fig, axes = plt.subplots(2, 2, figsize=(12, 10))

# Moons dataset
axes[0, 0].scatter(X_moons[:, 0], X_moons[:, 1], c=y_moons, 
                   cmap='viridis', alpha=0.6)
axes[0, 0].set_title('Moons: Original Labels')
axes[0, 0].grid(True, alpha=0.3)

axes[0, 1].scatter(X_moons[:, 0], X_moons[:, 1], c=clusters_moons, 
                   cmap='viridis', alpha=0.6)
axes[0, 1].set_title(f'DBSCAN Results (eps=0.3, min_samples=5)')
axes[0, 1].grid(True, alpha=0.3)

# Circles dataset
axes[1, 0].scatter(X_circles[:, 0], X_circles[:, 1], c=y_circles, 
                   cmap='plasma', alpha=0.6)
axes[1, 0].set_title('Circles: Original Labels')
axes[1, 0].grid(True, alpha=0.3)

axes[1, 1].scatter(X_circles[:, 0], X_circles[:, 1], c=clusters_circles, 
                   cmap='plasma', alpha=0.6)
axes[1, 1].set_title(f'DBSCAN Results (eps=0.3, min_samples=5)')
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Analyze results
n_clusters_moons = len(set(clusters_moons)) - (1 if -1 in clusters_moons else 0)
n_noise_moons = list(clusters_moons).count(-1)

n_clusters_circles = len(set(clusters_circles)) - (1 if -1 in clusters_circles else 0)
n_noise_circles = list(clusters_circles).count(-1)

print(f"\nMoons Dataset:")
print(f"  Clusters found: {n_clusters_moons}")
print(f"  Noise points: {n_noise_moons}")

print(f"\nCircles Dataset:")
print(f"  Clusters found: {n_clusters_circles}")
print(f"  Noise points: {n_noise_circles}")

Parameter Tuning

class DBSCANTuner:
    def __init__(self, X):
        self.X = StandardScaler().fit_transform(X)
    
    def find_optimal_eps(self, min_samples=5):
        """Use k-distance graph to find optimal eps"""
        from sklearn.neighbors import NearestNeighbors
        
        # Calculate distances to k-th nearest neighbor
        nbrs = NearestNeighbors(n_neighbors=min_samples).fit(self.X)
        distances, indices = nbrs.kneighbors(self.X)
        
        # Sort distances
        k_distances = np.sort(distances[:, min_samples-1])
        
        # Plot k-distance graph
        plt.figure(figsize=(10, 6))
        plt.plot(k_distances)
        plt.ylabel(f'{min_samples}-NN Distance')
        plt.xlabel('Points sorted by distance')
        plt.title('K-distance Graph for Optimal Eps')
        plt.grid(True, alpha=0.3)
        
        # Find elbow point (simplified)
        # In practice, you might want more sophisticated elbow detection
        gradients = np.gradient(k_distances)
        elbow_idx = np.argmax(gradients)
        optimal_eps = k_distances[elbow_idx]
        
        plt.axhline(y=optimal_eps, color='r', linestyle='--', 
                   label=f'Suggested eps = {optimal_eps:.3f}')
        plt.legend()
        plt.show()
        
        return optimal_eps
    
    def grid_search(self, eps_range, min_samples_range):
        """Grid search for best parameters"""
        results = []
        
        for eps in eps_range:
            for min_samples in min_samples_range:
                dbscan = DBSCAN(eps=eps, min_samples=min_samples)
                clusters = dbscan.fit_predict(self.X)
                
                n_clusters = len(set(clusters)) - (1 if -1 in clusters else 0)
                n_noise = list(clusters).count(-1)
                
                # Only calculate silhouette if we have clusters
                if n_clusters > 1:
                    # Exclude noise points for silhouette calculation
                    mask = clusters != -1
                    if np.sum(mask) > 1:
                        score = silhouette_score(self.X[mask], clusters[mask])
                    else:
                        score = -1
                else:
                    score = -1
                
                results.append({
                    'eps': eps,
                    'min_samples': min_samples,
                    'n_clusters': n_clusters,
                    'n_noise': n_noise,
                    'silhouette': score
                })
        
        results_df = pd.DataFrame(results)
        
        # Find best parameters based on silhouette score
        best_idx = results_df['silhouette'].idxmax()
        best_params = results_df.iloc[best_idx]
        
        print("\nParameter Grid Search Results:")
        print("-" * 40)
        print(f"Best Parameters:")
        print(f"  eps: {best_params['eps']}")
        print(f"  min_samples: {best_params['min_samples']}")
        print(f"  Clusters: {best_params['n_clusters']:.0f}")
        print(f"  Noise points: {best_params['n_noise']:.0f}")
        print(f"  Silhouette Score: {best_params['silhouette']:.3f}")
        
        return results_df, best_params

# Example usage
X_test, _ = make_moons(n_samples=200, noise=0.1, random_state=42)
tuner = DBSCANTuner(X_test)

# Find optimal eps
optimal_eps = tuner.find_optimal_eps(min_samples=5)
print(f"\nSuggested eps: {optimal_eps:.3f}")

# Grid search
eps_range = np.arange(0.1, 0.6, 0.1)
min_samples_range = [3, 5, 7, 10]
results, best = tuner.grid_search(eps_range, min_samples_range)

Handling Different Data Patterns

# Create datasets with different characteristics
np.random.seed(42)

# 1. Varying density clusters
cluster1 = np.random.randn(100, 2) * 0.3 + [0, 0]
cluster2 = np.random.randn(100, 2) * 0.8 + [5, 5]
cluster3 = np.random.randn(50, 2) * 0.2 + [8, 2]
noise = np.random.uniform(-2, 10, (20, 2))

X_varying = np.vstack([cluster1, cluster2, cluster3, noise])

# 2. High-dimensional data
X_high_dim = np.random.randn(200, 10)  # 10 dimensions

# 3. Data with outliers
from sklearn.datasets import make_blobs
X_blobs, _ = make_blobs(n_samples=200, centers=3, n_features=2, 
                        cluster_std=0.5, random_state=42)
# Add outliers
outliers = np.random.uniform(X_blobs.min(), X_blobs.max(), (20, 2))
X_with_outliers = np.vstack([X_blobs, outliers])

# Compare DBSCAN performance
datasets = {
    'Varying Density': X_varying,
    'High Dimensional': X_high_dim[:, :2],  # Project to 2D for visualization
    'With Outliers': X_with_outliers
}

fig, axes = plt.subplots(1, 3, figsize=(15, 5))

for idx, (name, X) in enumerate(datasets.items()):
    # Standardize
    X_scaled = StandardScaler().fit_transform(X)
    
    # Apply DBSCAN with adaptive parameters
    if name == 'High Dimensional':
        dbscan = DBSCAN(eps=1.5, min_samples=10)
    elif name == 'Varying Density':
        dbscan = DBSCAN(eps=0.5, min_samples=5)
    else:
        dbscan = DBSCAN(eps=0.3, min_samples=5)
    
    clusters = dbscan.fit_predict(X_scaled)
    
    # Visualize
    axes[idx].scatter(X[:, 0], X[:, 1], c=clusters, cmap='viridis', 
                     alpha=0.6, edgecolors='black', linewidth=0.5)
    axes[idx].set_title(f'{name}')
    
    # Mark noise points
    noise_mask = clusters == -1
    if np.any(noise_mask):
        axes[idx].scatter(X[noise_mask, 0], X[noise_mask, 1], 
                         c='red', marker='x', s=50, label='Noise')
    
    n_clusters = len(set(clusters)) - (1 if -1 in clusters else 0)
    n_noise = list(clusters).count(-1)
    axes[idx].set_xlabel(f'Clusters: {n_clusters}, Noise: {n_noise}')
    axes[idx].grid(True, alpha=0.3)
    
    if np.any(noise_mask):
        axes[idx].legend()

plt.tight_layout()
plt.show()

Applications

class DBSCANApplications:
    
    def anomaly_detection(self):
        """Use DBSCAN for anomaly detection"""
        # Generate normal data with anomalies
        np.random.seed(42)
        normal_data = np.random.randn(200, 2)
        anomalies = np.random.uniform(-4, 4, (10, 2))
        X = np.vstack([normal_data, anomalies])
        
        # Apply DBSCAN
        dbscan = DBSCAN(eps=0.5, min_samples=5)
        clusters = dbscan.fit_predict(X)
        
        # Noise points are anomalies
        anomaly_mask = clusters == -1
        
        plt.figure(figsize=(10, 6))
        plt.scatter(X[~anomaly_mask, 0], X[~anomaly_mask, 1], 
                   c='blue', alpha=0.6, label='Normal')
        plt.scatter(X[anomaly_mask, 0], X[anomaly_mask, 1], 
                   c='red', marker='x', s=100, label='Anomalies')
        plt.title('Anomaly Detection with DBSCAN')
        plt.xlabel('Feature 1')
        plt.ylabel('Feature 2')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.show()
        
        print(f"Detected {np.sum(anomaly_mask)} anomalies out of {len(X)} points")
        return anomaly_mask
    
    def spatial_clustering(self):
        """Geographical/spatial data clustering"""
        # Simulate GPS coordinates (latitude, longitude)
        np.random.seed(42)
        
        # City centers
        cities = {
            'City A': [37.7749, -122.4194],
            'City B': [37.3382, -121.8863],
            'City C': [37.8716, -122.2727]
        }
        
        points = []
        true_labels = []
        
        for i, (city, center) in enumerate(cities.items()):
            # Generate points around each city
            n_points = np.random.poisson(50)
            city_points = np.random.randn(n_points, 2) * 0.05 + center
            points.extend(city_points)
            true_labels.extend([i] * n_points)
        
        X = np.array(points)
        
        # Apply DBSCAN with haversine metric for geographical data
        from sklearn.metrics.pairwise import haversine_distances
        
        # Convert to radians
        X_rad = np.radians(X)
        
        # DBSCAN with custom metric
        dbscan = DBSCAN(eps=0.01, min_samples=5, metric='haversine')
        clusters = dbscan.fit_predict(X_rad)
        
        # Visualize
        plt.figure(figsize=(10, 8))
        scatter = plt.scatter(X[:, 1], X[:, 0], c=clusters, 
                            cmap='viridis', alpha=0.6, s=30)
        plt.colorbar(scatter, label='Cluster')
        plt.xlabel('Longitude')
        plt.ylabel('Latitude')
        plt.title('Spatial Clustering with DBSCAN')
        plt.grid(True, alpha=0.3)
        
        # Mark cluster centers
        for cluster_id in set(clusters):
            if cluster_id != -1:
                cluster_points = X[clusters == cluster_id]
                center = cluster_points.mean(axis=0)
                plt.plot(center[1], center[0], 'r*', markersize=15)
        
        plt.show()
        
        n_clusters = len(set(clusters)) - (1 if -1 in clusters else 0)
        print(f"Found {n_clusters} spatial clusters")
        
        return clusters
    
    def customer_segmentation(self):
        """Customer behavior segmentation"""
        # Simulate customer data
        np.random.seed(42)
        
        # Features: [purchase_frequency, avg_purchase_value, recency_days]
        segments = {
            'High Value': {'mean': [30, 500, 5], 'std': [5, 100, 2]},
            'Regular': {'mean': [15, 150, 15], 'std': [3, 30, 5]},
            'Occasional': {'mean': [3, 50, 60], 'std': [1, 20, 20]}
        }
        
        data = []
        true_segments = []
        
        for segment_name, params in segments.items():
            n_customers = np.random.poisson(100)
            segment_data = np.random.randn(n_customers, 3) * params['std'] + params['mean']
            data.extend(segment_data)
            true_segments.extend([segment_name] * n_customers)
        
        X = np.array(data)
        
        # Standardize features (important for different scales)
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)
        
        # Apply DBSCAN
        dbscan = DBSCAN(eps=0.5, min_samples=10)
        clusters = dbscan.fit_predict(X_scaled)
        
        # Analyze segments
        results = pd.DataFrame(X, columns=['Frequency', 'Avg_Value', 'Recency'])
        results['Cluster'] = clusters
        
        print("\nCustomer Segments Found:")
        print("-" * 50)
        
        for cluster_id in sorted(set(clusters)):
            if cluster_id == -1:
                print(f"\nNoise/Outlier Customers: {sum(clusters == -1)}")
            else:
                segment = results[results['Cluster'] == cluster_id]
                print(f"\nSegment {cluster_id}:")
                print(f"  Size: {len(segment)} customers")
                print(f"  Avg Frequency: {segment['Frequency'].mean():.1f}")
                print(f"  Avg Purchase Value: ${segment['Avg_Value'].mean():.2f}")
                print(f"  Avg Recency: {segment['Recency'].mean():.1f} days")
        
        return results

# Run applications
apps = DBSCANApplications()

print("\n" + "="*60)
print("DBSCAN APPLICATIONS")
print("="*60)

print("\n1. Anomaly Detection:")
print("-" * 30)
anomalies = apps.anomaly_detection()

print("\n2. Spatial Clustering:")
print("-" * 30)
spatial_clusters = apps.spatial_clustering()

print("\n3. Customer Segmentation:")
print("-" * 30)
customer_segments = apps.customer_segmentation()

DBSCAN vs Other Algorithms

# Compare clustering algorithms
from sklearn.cluster import KMeans, AgglomerativeClustering
from sklearn.mixture import GaussianMixture
import time

# Generate complex dataset
X_complex, y_complex = make_moons(n_samples=300, noise=0.1, random_state=42)
X_complex = StandardScaler().fit_transform(X_complex)

# Define algorithms
algorithms = {
    'DBSCAN': DBSCAN(eps=0.3, min_samples=5),
    'K-Means': KMeans(n_clusters=2, random_state=42),
    'Hierarchical': AgglomerativeClustering(n_clusters=2),
    'GMM': GaussianMixture(n_components=2, random_state=42)
}

# Compare performance
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.ravel()

results_comparison = []

for idx, (name, algorithm) in enumerate(algorithms.items()):
    start_time = time.time()
    
    if name == 'GMM':
        algorithm.fit(X_complex)
        clusters = algorithm.predict(X_complex)
    else:
        clusters = algorithm.fit_predict(X_complex)
    
    elapsed_time = time.time() - start_time
    
    # Calculate metrics
    n_clusters = len(set(clusters)) - (1 if -1 in clusters else 0)
    
    # Silhouette score (exclude noise for DBSCAN)
    if -1 in clusters:
        mask = clusters != -1
        if np.sum(mask) > 1:
            score = silhouette_score(X_complex[mask], clusters[mask])
        else:
            score = -1
    else:
        score = silhouette_score(X_complex, clusters)
    
    results_comparison.append({
        'Algorithm': name,
        'Clusters': n_clusters,
        'Silhouette': score,
        'Time (s)': elapsed_time
    })
    
    # Visualize
    axes[idx].scatter(X_complex[:, 0], X_complex[:, 1], 
                     c=clusters, cmap='viridis', alpha=0.6)
    axes[idx].set_title(f'{name}')
    axes[idx].set_xlabel(f'Clusters: {n_clusters}, Score: {score:.3f}')
    axes[idx].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Display comparison table
comparison_df = pd.DataFrame(results_comparison)
print("\nAlgorithm Comparison:")
print(comparison_df.to_string(index=False))

# Advantages and disadvantages
print("\n" + "="*60)
print("DBSCAN CHARACTERISTICS")
print("="*60)

characteristics = """
ADVANTAGES:
✓ No need to specify number of clusters
✓ Can find arbitrarily shaped clusters
✓ Robust to outliers (identifies them as noise)
✓ Only two parameters (eps and min_samples)
✓ Deterministic (same results each run)

DISADVANTAGES:
✗ Sensitive to parameter selection
✗ Struggles with clusters of varying densities
✗ Performance degrades in high dimensions
✗ Not suitable for datasets with large differences in densities
✗ Border points can be assigned to multiple clusters

WHEN TO USE DBSCAN:
• Non-convex cluster shapes expected
• Presence of noise and outliers
• Unknown number of clusters
• Spatial/geographical data
• Anomaly detection tasks
"""

print(characteristics)

Advanced Techniques

class AdvancedDBSCAN:
    
    def adaptive_dbscan(self, X, k=5):
        """DBSCAN with adaptive eps based on local density"""
        from sklearn.neighbors import NearestNeighbors
        
        # Calculate k-nearest neighbor distances for each point
        nbrs = NearestNeighbors(n_neighbors=k).fit(X)
        distances, indices = nbrs.kneighbors(X)
        
        # Use median of k-th neighbor distance as adaptive eps
        eps_values = distances[:, k-1]
        adaptive_eps = np.median(eps_values)
        
        print(f"Adaptive eps calculated: {adaptive_eps:.3f}")
        
        # Apply DBSCAN with adaptive eps
        dbscan = DBSCAN(eps=adaptive_eps, min_samples=k)
        clusters = dbscan.fit_predict(X)
        
        return clusters, adaptive_eps
    
    def hierarchical_dbscan(self, X):
        """Use HDBSCAN for hierarchical density-based clustering"""
        try:
            import hdbscan
            
            # Apply HDBSCAN
            clusterer = hdbscan.HDBSCAN(min_cluster_size=5)
            clusters = clusterer.fit_predict(X)
            
            # Get cluster persistence (stability)
            persistence = clusterer.cluster_persistence_
            
            print("HDBSCAN Results:")
            print(f"Number of clusters: {len(set(clusters)) - 1}")
            print(f"Cluster persistence: {persistence}")
            
            return clusters
        except ImportError:
            print("HDBSCAN not installed. Using standard DBSCAN.")
            dbscan = DBSCAN(eps=0.3, min_samples=5)
            return dbscan.fit_predict(X)
    
    def incremental_dbscan(self, X_initial, X_new):
        """Incremental DBSCAN for streaming data"""
        # Fit initial model
        dbscan = DBSCAN(eps=0.3, min_samples=5)
        initial_clusters = dbscan.fit_predict(X_initial)
        
        # For new points, find their cluster assignments
        from sklearn.neighbors import NearestNeighbors
        
        nbrs = NearestNeighbors(radius=0.3).fit(X_initial)
        distances, indices = nbrs.radius_neighbors(X_new)
        
        new_clusters = np.full(len(X_new), -1)  # Initialize as noise
        
        for i, neighbors in enumerate(indices):
            if len(neighbors) >= 5:
                # Assign to majority cluster of neighbors
                neighbor_clusters = initial_clusters[neighbors]
                neighbor_clusters = neighbor_clusters[neighbor_clusters != -1]
                if len(neighbor_clusters) > 0:
                    unique, counts = np.unique(neighbor_clusters, return_counts=True)
                    new_clusters[i] = unique[np.argmax(counts)]
        
        return new_clusters
    
    def multi_density_clustering(self, X):
        """Handle varying density clusters using OPTICS"""
        from sklearn.cluster import OPTICS
        
        # OPTICS can handle varying densities better than DBSCAN
        optics = OPTICS(min_samples=5, xi=0.05, min_cluster_size=0.1)
        clusters = optics.fit_predict(X)
        
        # Get reachability plot
        reachability = optics.reachability_[optics.ordering_]
        
        plt.figure(figsize=(12, 5))
        
        plt.subplot(1, 2, 1)
        plt.scatter(X[:, 0], X[:, 1], c=clusters, cmap='viridis', alpha=0.6)
        plt.title('OPTICS Clustering (Varying Densities)')
        plt.xlabel('Feature 1')
        plt.ylabel('Feature 2')
        plt.grid(True, alpha=0.3)
        
        plt.subplot(1, 2, 2)
        plt.plot(reachability)
        plt.ylabel('Reachability Distance')
        plt.xlabel('Order of Points')
        plt.title('Reachability Plot')
        plt.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        return clusters

# Demonstrate advanced techniques
X_test, _ = make_moons(n_samples=300, noise=0.1, random_state=42)
X_test = StandardScaler().fit_transform(X_test)

advanced = AdvancedDBSCAN()

print("\n" + "="*60)
print("ADVANCED DBSCAN TECHNIQUES")
print("="*60)

print("\n1. Adaptive DBSCAN:")
print("-" * 30)
adaptive_clusters, adaptive_eps = advanced.adaptive_dbscan(X_test)

print("\n2. Hierarchical DBSCAN:")
print("-" * 30)
hierarchical_clusters = advanced.hierarchical_dbscan(X_test)

print("\n3. Multi-Density Clustering (OPTICS):")
print("-" * 30)
optics_clusters = advanced.multi_density_clustering(X_test)

Best Practices and Tips

print("\n" + "="*60)
print("DBSCAN BEST PRACTICES")
print("="*60)

best_practices = """
KEY GUIDELINES:

1. DATA PREPROCESSING:
   • ALWAYS standardize/normalize features
   • Remove or impute missing values
   • Consider dimensionality reduction for high-D data

2. PARAMETER SELECTION:
   • eps: Use k-distance graph or domain knowledge
   • min_samples: Usually 2*dimensions or at least 3-5
   • Start with min_samples = 4 for 2D data

3. EVALUATION:
   • Check number of noise points (shouldn't be too high)
   • Validate with domain knowledge
   • Use silhouette score for compact clusters
   • Consider stability across parameter variations

4. OPTIMIZATION:
   • Use spatial indexing for large datasets
   • Consider OPTICS for varying densities
   • Use HDBSCAN for hierarchical approach
   
5. COMMON PITFALLS:
   ⚠ Forgetting to scale features
   ⚠ Using same eps for varying density data
   ⚠ Ignoring curse of dimensionality
   ⚠ Not handling categorical variables properly
"""

print(best_practices)

# Implementation checklist
checklist = """
IMPLEMENTATION CHECKLIST:
□ Data cleaned and preprocessed
□ Features standardized/normalized
□ Optimal eps determined (k-distance graph)
□ min_samples selected based on dimensionality
□ Results validated with domain knowledge
□ Noise points analyzed
□ Comparison with other algorithms done
□ Performance metrics calculated
"""

print(checklist)

Practice Exercises

Exercise 1: Multi-Density DBSCAN

Create a dataset with clusters of varying densities and implement a solution:

  1. Generate 3 clusters with different densities
  2. Apply standard DBSCAN and observe limitations
  3. Implement adaptive DBSCAN with local density estimation
  4. Compare results with OPTICS algorithm

Exercise 2: Real-time Anomaly Detection

Build a streaming anomaly detection system:

  1. Simulate streaming data with periodic anomalies
  2. Implement incremental DBSCAN
  3. Update cluster assignments for new points
  4. Calculate detection accuracy and latency

Exercise 3: Image Segmentation

Use DBSCAN for image segmentation:

  1. Load an image and extract color features
  2. Apply DBSCAN in color space
  3. Segment image based on color similarity
  4. Compare with K-means segmentation

Real-World Application Example

# Complete example: Network Intrusion Detection
class NetworkIntrusionDetector:
    def __init__(self):
        self.scaler = StandardScaler()
        self.dbscan = None
        
    def prepare_data(self, n_samples=1000):
        """Simulate network traffic data"""
        np.random.seed(42)
        
        # Normal traffic (majority)
        normal_traffic = {
            'packet_size': np.random.normal(1500, 200, n_samples),
            'connection_duration': np.random.exponential(10, n_samples),
            'packets_per_second': np.random.poisson(100, n_samples),
            'unique_destinations': np.random.poisson(5, n_samples)
        }
        
        # Attack patterns (minority)
        n_attacks = int(n_samples * 0.05)  # 5% attacks
        
        # DDoS attack pattern
        ddos = {
            'packet_size': np.random.normal(64, 10, n_attacks//2),
            'connection_duration': np.random.exponential(0.1, n_attacks//2),
            'packets_per_second': np.random.poisson(10000, n_attacks//2),
            'unique_destinations': np.ones(n_attacks//2)
        }
        
        # Port scan pattern
        port_scan = {
            'packet_size': np.random.normal(40, 5, n_attacks//2),
            'connection_duration': np.random.exponential(0.01, n_attacks//2),
            'packets_per_second': np.random.poisson(500, n_attacks//2),
            'unique_destinations': np.random.poisson(100, n_attacks//2)
        }
        
        # Combine data
        features = []
        labels = []
        
        for key in normal_traffic.keys():
            normal_traffic[key] = list(normal_traffic[key])
            ddos[key] = list(ddos[key])
            port_scan[key] = list(port_scan[key])
        
        # Add normal traffic
        for i in range(n_samples):
            features.append([
                normal_traffic['packet_size'][i],
                normal_traffic['connection_duration'][i],
                normal_traffic['packets_per_second'][i],
                normal_traffic['unique_destinations'][i]
            ])
            labels.append('normal')
        
        # Add attacks
        for i in range(n_attacks//2):
            features.append([
                ddos['packet_size'][i],
                ddos['connection_duration'][i],
                ddos['packets_per_second'][i],
                ddos['unique_destinations'][i]
            ])
            labels.append('ddos')
            
            features.append([
                port_scan['packet_size'][i],
                port_scan['connection_duration'][i],
                port_scan['packets_per_second'][i],
                port_scan['unique_destinations'][i]
            ])
            labels.append('port_scan')
        
        return np.array(features), np.array(labels)
    
    def train(self, X_train):
        """Train DBSCAN on normal traffic"""
        X_scaled = self.scaler.fit_transform(X_train)
        
        # Find optimal parameters
        from sklearn.neighbors import NearestNeighbors
        nbrs = NearestNeighbors(n_neighbors=5).fit(X_scaled)
        distances, _ = nbrs.kneighbors(X_scaled)
        
        # Use 95th percentile distance as eps
        eps = np.percentile(distances[:, 4], 95)
        
        self.dbscan = DBSCAN(eps=eps, min_samples=5)
        self.dbscan.fit(X_scaled)
        
        print(f"Model trained with eps={eps:.3f}")
        
    def detect_intrusions(self, X_test, y_test):
        """Detect network intrusions"""
        X_scaled = self.scaler.transform(X_test)
        
        # Predict clusters
        clusters = self.dbscan.fit_predict(X_scaled)
        
        # Noise points are potential intrusions
        intrusions = clusters == -1
        
        # Evaluate detection
        from sklearn.metrics import classification_report, confusion_matrix
        
        # Convert labels to binary (normal vs attack)
        y_binary = (y_test != 'normal').astype(int)
        
        print("\nIntrusion Detection Results:")
        print("-" * 40)
        print(classification_report(y_binary, intrusions, 
                                   target_names=['Normal', 'Attack']))
        
        # Confusion matrix
        cm = confusion_matrix(y_binary, intrusions)
        
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
        
        # Confusion matrix
        im = ax1.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
        ax1.figure.colorbar(im, ax=ax1)
        ax1.set(xticks=np.arange(cm.shape[1]),
               yticks=np.arange(cm.shape[0]),
               xticklabels=['Normal', 'Attack'],
               yticklabels=['Normal', 'Attack'],
               title='Confusion Matrix',
               ylabel='True label',
               xlabel='Predicted label')
        
        # Annotate cells
        for i in range(cm.shape[0]):
            for j in range(cm.shape[1]):
                ax1.text(j, i, format(cm[i, j], 'd'),
                        ha="center", va="center",
                        color="white" if cm[i, j] > cm.max() / 2. else "black")
        
        # Scatter plot of detections
        ax2.scatter(X_test[~intrusions, 0], X_test[~intrusions, 1], 
                   c='blue', alpha=0.5, label='Normal', s=20)
        ax2.scatter(X_test[intrusions, 0], X_test[intrusions, 1], 
                   c='red', alpha=0.7, label='Intrusion', s=40, marker='^')
        ax2.set_xlabel('Packet Size')
        ax2.set_ylabel('Connection Duration')
        ax2.set_title('Network Traffic Analysis')
        ax2.legend()
        ax2.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        return intrusions

# Run intrusion detection example
detector = NetworkIntrusionDetector()

# Prepare data
X, y = detector.prepare_data(1000)

# Split data
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, random_state=42
)

# Train on normal traffic only
normal_mask = y_train == 'normal'
detector.train(X_train[normal_mask])

# Detect intrusions
print("\n" + "="*60)
print("NETWORK INTRUSION DETECTION SYSTEM")
print("="*60)
intrusions = detector.detect_intrusions(X_test, y_test)

Summary and Key Takeaways

🎯 Key Points to Remember