Skip to main content

K-means Optimization & Evaluation

Related lessons: K-means Fundamentals | Optimization & Evaluation | Applications & Advanced Topics

Choosing Optimal K

Methods for Determining the Number of Clusters

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import make_blobs
from sklearn.cluster import KMeans, MiniBatchKMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score, silhouette_samples, calinski_harabasz_score
from sklearn.metrics import davies_bouldin_score, adjusted_rand_score
import warnings
warnings.filterwarnings('ignore')

# Set style
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

# Generate sample data
np.random.seed(42)
X, y_true = make_blobs(n_samples=500, centers=4, n_features=2,
                       center_box=(-10.0, 10.0), cluster_std=1.5,
                       random_state=42)

class OptimalKSelector:
    """Methods for selecting optimal number of clusters"""
    
    def __init__(self, X, k_range=range(2, 11)):
        self.X = X
        self.k_range = k_range
        self.results = {
            'k': [],
            'inertia': [],
            'silhouette': [],
            'calinski_harabasz': [],
            'davies_bouldin': [],
            'gap_statistic': [],
            'gap_std': []
        }
        
    def elbow_method(self):
        """Use elbow method to find optimal K"""
        for k in self.k_range:
            kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
            kmeans.fit(self.X)
            
            self.results['k'].append(k)
            self.results['inertia'].append(kmeans.inertia_)
            
            # Calculate metrics
            labels = kmeans.labels_
            self.results['silhouette'].append(silhouette_score(self.X, labels))
            self.results['calinski_harabasz'].append(calinski_harabasz_score(self.X, labels))
            self.results['davies_bouldin'].append(davies_bouldin_score(self.X, labels))
        
        # Calculate gap statistic
        self._calculate_gap_statistic()
        
        return self.results
    
    def _calculate_gap_statistic(self, n_references=10):
        """Calculate gap statistic for each K"""
        gap_stats = []
        gap_stds = []
        
        for k in self.k_range:
            # Fit to actual data
            kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
            kmeans.fit(self.X)
            actual_inertia = kmeans.inertia_
            
            # Generate reference datasets
            reference_inertias = []
            for _ in range(n_references):
                # Random data with same bounds as original
                random_data = np.random.uniform(
                    low=self.X.min(axis=0),
                    high=self.X.max(axis=0),
                    size=self.X.shape
                )
                kmeans_ref = KMeans(n_clusters=k, random_state=42, n_init=1)
                kmeans_ref.fit(random_data)
                reference_inertias.append(kmeans_ref.inertia_)
            
            # Calculate gap statistic
            gap = np.mean(np.log(reference_inertias)) - np.log(actual_inertia)
            gap_std = np.std(np.log(reference_inertias))
            
            gap_stats.append(gap)
            gap_stds.append(gap_std)
        
        self.results['gap_statistic'] = gap_stats
        self.results['gap_std'] = gap_stds
    
    def plot_elbow_curve(self):
        """Plot elbow curve and other metrics"""
        fig, axes = plt.subplots(2, 3, figsize=(15, 10))
        
        # 1. Elbow curve
        axes[0, 0].plot(self.results['k'], self.results['inertia'], 'bo-', linewidth=2)
        axes[0, 0].set_xlabel('Number of Clusters (k)')
        axes[0, 0].set_ylabel('Within-cluster SSE (Inertia)')
        axes[0, 0].set_title('Elbow Method')
        axes[0, 0].grid(True, alpha=0.3)
        
        # Mark elbow point using kneedle algorithm
        elbow_k = self._find_elbow_point()
        axes[0, 0].axvline(x=elbow_k, color='r', linestyle='--', 
                          label=f'Elbow at k={elbow_k}')
        axes[0, 0].scatter(elbow_k, 
                          self.results['inertia'][elbow_k-self.k_range[0]], 
                          color='red', s=100, zorder=5)
        axes[0, 0].legend()
        
        # 2. Silhouette Score
        axes[0, 1].plot(self.results['k'], self.results['silhouette'], 'go-', linewidth=2)
        axes[0, 1].set_xlabel('Number of Clusters (k)')
        axes[0, 1].set_ylabel('Silhouette Score')
        axes[0, 1].set_title('Silhouette Score (Higher is Better)')
        axes[0, 1].grid(True, alpha=0.3)
        best_k_silhouette = self.results['k'][np.argmax(self.results['silhouette'])]
        axes[0, 1].axvline(x=best_k_silhouette, color='r', linestyle='--', 
                          label=f'Best k={best_k_silhouette}')
        axes[0, 1].legend()
        
        # 3. Calinski-Harabasz Score
        axes[0, 2].plot(self.results['k'], self.results['calinski_harabasz'], 
                       'mo-', linewidth=2)
        axes[0, 2].set_xlabel('Number of Clusters (k)')
        axes[0, 2].set_ylabel('Calinski-Harabasz Score')
        axes[0, 2].set_title('Calinski-Harabasz Score (Higher is Better)')
        axes[0, 2].grid(True, alpha=0.3)
        best_k_ch = self.results['k'][np.argmax(self.results['calinski_harabasz'])]
        axes[0, 2].axvline(x=best_k_ch, color='r', linestyle='--', 
                          label=f'Best k={best_k_ch}')
        axes[0, 2].legend()
        
        # 4. Davies-Bouldin Score
        axes[1, 0].plot(self.results['k'], self.results['davies_bouldin'], 
                       'co-', linewidth=2)
        axes[1, 0].set_xlabel('Number of Clusters (k)')
        axes[1, 0].set_ylabel('Davies-Bouldin Score')
        axes[1, 0].set_title('Davies-Bouldin Score (Lower is Better)')
        axes[1, 0].grid(True, alpha=0.3)
        best_k_db = self.results['k'][np.argmin(self.results['davies_bouldin'])]
        axes[1, 0].axvline(x=best_k_db, color='r', linestyle='--', 
                          label=f'Best k={best_k_db}')
        axes[1, 0].legend()
        
        # 5. Gap Statistic
        axes[1, 1].errorbar(self.results['k'], self.results['gap_statistic'], 
                           yerr=self.results['gap_std'], fmt='ro-', linewidth=2,
                           capsize=5, capthick=2)
        axes[1, 1].set_xlabel('Number of Clusters (k)')
        axes[1, 1].set_ylabel('Gap Statistic')
        axes[1, 1].set_title('Gap Statistic (Higher is Better)')
        axes[1, 1].grid(True, alpha=0.3)
        best_k_gap = self._find_optimal_gap()
        axes[1, 1].axvline(x=best_k_gap, color='r', linestyle='--', 
                          label=f'Best k={best_k_gap}')
        axes[1, 1].legend()
        
        # 6. Voting Summary
        votes = [elbow_k, best_k_silhouette, best_k_ch, best_k_db, best_k_gap]
        vote_counts = pd.Series(votes).value_counts()
        
        axes[1, 2].bar(vote_counts.index, vote_counts.values, alpha=0.7, 
                      color=['green' if k == vote_counts.index[0] else 'blue' 
                             for k in vote_counts.index])
        axes[1, 2].set_xlabel('Number of Clusters (k)')
        axes[1, 2].set_ylabel('Votes from Different Methods')
        axes[1, 2].set_title(f'Consensus: k={vote_counts.index[0]} '
                           f'({vote_counts.values[0]} votes)')
        axes[1, 2].grid(True, alpha=0.3)
        
        plt.suptitle('Optimal K Selection: Multiple Methods', fontsize=14)
        plt.tight_layout()
        plt.show()
        
        return {
            'elbow': elbow_k,
            'silhouette': best_k_silhouette,
            'calinski_harabasz': best_k_ch,
            'davies_bouldin': best_k_db,
            'gap_statistic': best_k_gap,
            'consensus': vote_counts.index[0]
        }
    
    def _find_elbow_point(self):
        """Find elbow point using perpendicular distance method"""
        k_values = self.results['k']
        inertias = self.results['inertia']
        
        # Calculate the line from first to last point
        p1 = np.array([k_values[0], inertias[0]])
        p2 = np.array([k_values[-1], inertias[-1]])
        
        # Calculate perpendicular distance for each point
        distances = []
        for i in range(len(k_values)):
            p = np.array([k_values[i], inertias[i]])
            # Distance from point to line
            distance = np.abs(np.cross(p2-p1, p1-p)) / np.linalg.norm(p2-p1)
            distances.append(distance)
        
        # Find elbow point (maximum distance)
        elbow_idx = np.argmax(distances)
        return k_values[elbow_idx]
    
    def _find_optimal_gap(self):
        """Find optimal K using gap statistic criterion"""
        gaps = self.results['gap_statistic']
        stds = self.results['gap_std']
        
        for i in range(len(gaps) - 1):
            if gaps[i] >= gaps[i + 1] - stds[i + 1]:
                return self.results['k'][i]
        
        return self.results['k'][-1]
    
    def silhouette_analysis(self, n_clusters):
        """Detailed silhouette analysis for specific k"""
        kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        labels = kmeans.fit_predict(self.X)
        
        # Calculate silhouette scores for each sample
        silhouette_vals = silhouette_samples(self.X, labels)
        
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
        
        # Silhouette plot
        y_lower = 10
        for i in range(n_clusters):
            cluster_silhouette_vals = silhouette_vals[labels == i]
            cluster_silhouette_vals.sort()
            
            size_cluster_i = cluster_silhouette_vals.shape[0]
            y_upper = y_lower + size_cluster_i
            
            color = plt.cm.nipy_spectral(float(i) / n_clusters)
            ax1.fill_betweenx(np.arange(y_lower, y_upper),
                             0, cluster_silhouette_vals,
                             facecolor=color, edgecolor=color, alpha=0.7)
            
            # Label clusters
            ax1.text(-0.05, y_lower + 0.5 * size_cluster_i, str(i))
            
            # Mark cluster size
            ax1.text(0.5, y_lower + 0.5 * size_cluster_i, 
                    f'n={size_cluster_i}',
                    fontsize=8, ha='center')
            
            y_lower = y_upper + 10
        
        ax1.set_xlabel("Silhouette Coefficient")
        ax1.set_ylabel("Cluster")
        ax1.set_xlim([-0.1, 1])
        
        # Add average silhouette score line
        silhouette_avg = silhouette_score(self.X, labels)
        ax1.axvline(x=silhouette_avg, color="red", linestyle="--",
                   label=f'Average: {silhouette_avg:.3f}')
        ax1.set_title(f"Silhouette Plot for {n_clusters} Clusters")
        ax1.legend()
        
        # Scatter plot with cluster visualization
        colors = plt.cm.nipy_spectral(labels.astype(float) / n_clusters)
        ax2.scatter(self.X[:, 0], self.X[:, 1], marker='o', s=50, 
                   c=colors, alpha=0.6, edgecolors='black', linewidth=0.5)
        
        # Plot cluster centers
        centers = kmeans.cluster_centers_
        ax2.scatter(centers[:, 0], centers[:, 1], marker='x',
                   c='red', s=200, linewidths=3, label='Centroids')
        
        # Draw circle around each cluster
        for i, center in enumerate(centers):
            cluster_points = self.X[labels == i]
            radius = np.max(np.linalg.norm(cluster_points - center, axis=1))
            circle = plt.Circle(center, radius, fill=False, 
                               edgecolor='red', linestyle='--', alpha=0.3)
            ax2.add_patch(circle)
        
        ax2.set_xlabel("Feature 1")
        ax2.set_ylabel("Feature 2")
        ax2.set_title(f"Clusters Visualization (k={n_clusters})")
        ax2.legend()
        
        plt.suptitle(f'Silhouette Analysis for k={n_clusters}', fontsize=14)
        plt.tight_layout()
        plt.show()
        
        return silhouette_avg

# Find optimal K
print("="*60)
print("OPTIMAL K SELECTION")
print("="*60)

selector = OptimalKSelector(X, k_range=range(2, 10))
selector.elbow_method()
best_k_dict = selector.plot_elbow_curve()

print("\nOptimal K by different methods:")
for method, k in best_k_dict.items():
    print(f"  {method:20s}: k = {k}")

print(f"\nConsensus recommendation: k = {best_k_dict['consensus']}")

# Detailed silhouette analysis for different K values
print("\n" + "="*60)
print("SILHOUETTE ANALYSIS")
print("="*60)

for k in [3, 4, 5]:
    print(f"\nAnalyzing k={k}:")
    avg_score = selector.silhouette_analysis(k)
    print(f"  Average silhouette score: {avg_score:.3f}")

Advanced K-means Variants

K-means++ and Mini-batch K-means

import time
from sklearn.cluster import MiniBatchKMeans

class KMeansVariants:
    """Compare different K-means variants"""
    
    def __init__(self, X, n_clusters=4):
        self.X = X
        self.n_clusters = n_clusters
        self.models = {}
        self.results = {}
        
    def compare_initialization_methods(self, n_trials=20):
        """Compare different initialization methods"""
        
        init_methods = ['random', 'k-means++']
        results = {method: {'inertias': [], 'times': [], 'iterations': []} 
                  for method in init_methods}
        
        for init_method in init_methods:
            for trial in range(n_trials):
                start_time = time.time()
                
                kmeans = KMeans(n_clusters=self.n_clusters, 
                              init=init_method,
                              n_init=1,  # Single run to see effect of initialization
                              max_iter=300,
                              random_state=trial)
                kmeans.fit(self.X)
                
                elapsed_time = time.time() - start_time
                
                results[init_method]['inertias'].append(kmeans.inertia_)
                results[init_method]['times'].append(elapsed_time)
                results[init_method]['iterations'].append(kmeans.n_iter_)
        
        # Visualize comparison
        fig, axes = plt.subplots(1, 3, figsize=(15, 5))
        
        # 1. Inertia distribution
        bp1 = axes[0].boxplot([results['random']['inertias'], 
                               results['k-means++']['inertias']], 
                              labels=['Random', 'K-means++'],
                              patch_artist=True)
        axes[0].set_ylabel('Inertia')
        axes[0].set_title('Final Inertia Distribution')
        axes[0].grid(True, alpha=0.3)
        
        # Color boxes
        colors = ['lightblue', 'lightgreen']
        for patch, color in zip(bp1['boxes'], colors):
            patch.set_facecolor(color)
        
        # 2. Convergence speed
        bp2 = axes[1].boxplot([results['random']['iterations'], 
                               results['k-means++']['iterations']], 
                              labels=['Random', 'K-means++'],
                              patch_artist=True)
        axes[1].set_ylabel('Iterations to Converge')
        axes[1].set_title('Convergence Speed')
        axes[1].grid(True, alpha=0.3)
        
        for patch, color in zip(bp2['boxes'], colors):
            patch.set_facecolor(color)
        
        # 3. Runtime comparison
        bp3 = axes[2].boxplot([results['random']['times'], 
                               results['k-means++']['times']], 
                              labels=['Random', 'K-means++'],
                              patch_artist=True)
        axes[2].set_ylabel('Time (seconds)')
        axes[2].set_title('Runtime Distribution')
        axes[2].grid(True, alpha=0.3)
        
        for patch, color in zip(bp3['boxes'], colors):
            patch.set_facecolor(color)
        
        plt.suptitle(f'Initialization Method Comparison ({n_trials} trials)', fontsize=14)
        plt.tight_layout()
        plt.show()
        
        # Print statistics
        print("\nInitialization Method Comparison:")
        print("="*60)
        for method in init_methods:
            print(f"\n{method}:")
            print(f"  Inertia - Mean: {np.mean(results[method]['inertias']):.2f}, "
                  f"Std: {np.std(results[method]['inertias']):.2f}")
            print(f"  Iterations - Mean: {np.mean(results[method]['iterations']):.1f}, "
                  f"Std: {np.std(results[method]['iterations']):.1f}")
            print(f"  Time - Mean: {np.mean(results[method]['times'])*1000:.2f}ms, "
                  f"Std: {np.std(results[method]['times'])*1000:.2f}ms")
        
        return results
    
    def mini_batch_comparison(self, batch_sizes=[10, 50, 100, 200, 'auto']):
        """Compare Mini-batch K-means with different batch sizes"""
        
        # Standard K-means as baseline
        start_time = time.time()
        kmeans = KMeans(n_clusters=self.n_clusters, random_state=42, n_init=10)
        kmeans.fit(self.X)
        kmeans_time = time.time() - start_time
        kmeans_inertia = kmeans.inertia_
        
        # Mini-batch K-means with different batch sizes
        results = []
        
        for batch_size in batch_sizes:
            start_time = time.time()
            
            if batch_size == 'auto':
                mbkmeans = MiniBatchKMeans(n_clusters=self.n_clusters,
                                          random_state=42)
            else:
                mbkmeans = MiniBatchKMeans(n_clusters=self.n_clusters,
                                          batch_size=batch_size,
                                          random_state=42)
            mbkmeans.fit(self.X)
            mb_time = time.time() - start_time
            
            # Calculate metrics
            ari_score = adjusted_rand_score(kmeans.labels_, mbkmeans.labels_)
            
            results.append({
                'Method': f'MiniBatch-{batch_size}',
                'Batch Size': batch_size if batch_size != 'auto' else 'auto (100)',
                'Time (s)': mb_time,
                'Speedup': kmeans_time / mb_time,
                'Inertia': mbkmeans.inertia_,
                'Inertia Diff %': 100 * (mbkmeans.inertia_ - kmeans_inertia) / kmeans_inertia,
                'ARI vs Standard': ari_score
            })
        
        results_df = pd.DataFrame(results)
        
        print("\n" + "="*60)
        print("MINI-BATCH K-MEANS COMPARISON")
        print("="*60)
        print(f"\nStandard K-means baseline:")
        print(f"  Time: {kmeans_time:.4f}s")
        print(f"  Inertia: {kmeans_inertia:.2f}")
        print(f"\nMini-batch variants:")
        print(results_df.to_string(index=False))
        
        # Visualize comparison
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        
        # 1. Speedup vs Batch Size
        x_pos = np.arange(len(results_df))
        axes[0, 0].bar(x_pos, results_df['Speedup'], alpha=0.7, color='blue')
        axes[0, 0].set_xticks(x_pos)
        axes[0, 0].set_xticklabels(results_df['Method'], rotation=45)
        axes[0, 0].set_ylabel('Speedup Factor')
        axes[0, 0].set_title('Speedup vs Batch Size')
        axes[0, 0].axhline(y=1, color='r', linestyle='--', label='No speedup')
        axes[0, 0].legend()
        axes[0, 0].grid(True, alpha=0.3)
        
        # 2. Quality Loss vs Batch Size
        axes[0, 1].bar(x_pos, results_df['Inertia Diff %'], alpha=0.7, 
                      color=['green' if x < 5 else 'orange' if x < 10 else 'red' 
                             for x in results_df['Inertia Diff %']])
        axes[0, 1].set_xticks(x_pos)
        axes[0, 1].set_xticklabels(results_df['Method'], rotation=45)
        axes[0, 1].set_ylabel('Inertia Difference (%)')
        axes[0, 1].set_title('Quality Loss vs Batch Size')
        axes[0, 1].axhline(y=0, color='g', linestyle='--', label='Same as standard')
        axes[0, 1].axhline(y=5, color='orange', linestyle='--', alpha=0.5, label='5% threshold')
        axes[0, 1].legend()
        axes[0, 1].grid(True, alpha=0.3)
        
        # 3. Agreement with Standard K-means
        axes[1, 0].bar(x_pos, results_df['ARI vs Standard'], alpha=0.7, color='purple')
        axes[1, 0].set_xticks(x_pos)
        axes[1, 0].set_xticklabels(results_df['Method'], rotation=45)
        axes[1, 0].set_ylabel('Adjusted Rand Index')
        axes[1, 0].set_title('Agreement with Standard K-means')
        axes[1, 0].set_ylim([0, 1])
        axes[1, 0].axhline(y=1, color='g', linestyle='--', label='Perfect agreement')
        axes[1, 0].legend()
        axes[1, 0].grid(True, alpha=0.3)
        
        # 4. Time-Quality Trade-off
        axes[1, 1].scatter(results_df['Time (s)'], results_df['Inertia'], 
                          s=100, alpha=0.7, c=range(len(results_df)), cmap='viridis')
        axes[1, 1].scatter(kmeans_time, kmeans_inertia, s=200, marker='*', 
                          color='red', label='Standard K-means', zorder=5)
        
        # Annotate points
        for i, row in results_df.iterrows():
            axes[1, 1].annotate(row['Method'], 
                               (row['Time (s)'], row['Inertia']),
                               fontsize=8, ha='right')
        
        axes[1, 1].set_xlabel('Time (seconds)')
        axes[1, 1].set_ylabel('Inertia')
        axes[1, 1].set_title('Time-Quality Trade-off')
        axes[1, 1].legend()
        axes[1, 1].grid(True, alpha=0.3)
        
        plt.suptitle('Mini-batch K-means Performance Analysis', fontsize=14)
        plt.tight_layout()
        plt.show()
        
        return results_df
    
    def scalability_test(self, sizes=[100, 500, 1000, 5000, 10000]):
        """Test scalability of different K-means variants"""
        
        results = []
        
        for n_samples in sizes:
            # Generate data
            X_test, _ = make_blobs(n_samples=n_samples, centers=10, 
                                  n_features=20, random_state=42)
            
            # Standard K-means
            start = time.time()
            km = KMeans(n_clusters=10, n_init=3, random_state=42)
            km.fit(X_test)
            km_time = time.time() - start
            
            # Mini-batch K-means
            start = time.time()
            mbkm = MiniBatchKMeans(n_clusters=10, random_state=42)
            mbkm.fit(X_test)
            mbkm_time = time.time() - start
            
            results.append({
                'n_samples': n_samples,
                'KMeans_time': km_time,
                'MiniBatch_time': mbkm_time,
                'speedup': km_time / mbkm_time
            })
        
        results_df = pd.DataFrame(results)
        
        # Plot scalability
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
        
        ax1.loglog(results_df['n_samples'], results_df['KMeans_time'], 
                  'o-', label='Standard K-means', linewidth=2, markersize=8)
        ax1.loglog(results_df['n_samples'], results_df['MiniBatch_time'], 
                  's-', label='Mini-batch K-means', linewidth=2, markersize=8)
        ax1.set_xlabel('Number of Samples')
        ax1.set_ylabel('Time (seconds)')
        ax1.set_title('Scalability Comparison')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        ax2.semilogx(results_df['n_samples'], results_df['speedup'], 
                    'o-', color='green', linewidth=2, markersize=8)
        ax2.set_xlabel('Number of Samples')
        ax2.set_ylabel('Speedup Factor')
        ax2.set_title('Mini-batch Speedup vs Dataset Size')
        ax2.grid(True, alpha=0.3)
        
        plt.suptitle('K-means Scalability Analysis', fontsize=14)
        plt.tight_layout()
        plt.show()
        
        print("\nScalability Test Results:")
        print(results_df.to_string(index=False))
        
        return results_df

# Compare K-means variants
variants = KMeansVariants(X, n_clusters=4)

# Compare initialization methods
print("\n" + "="*60)
print("INITIALIZATION METHODS COMPARISON")
print("="*60)
init_results = variants.compare_initialization_methods(n_trials=30)

# For larger dataset, demonstrate Mini-batch K-means
X_large, _ = make_blobs(n_samples=5000, centers=10, n_features=20, random_state=42)
variants_large = KMeansVariants(X_large, n_clusters=10)

# Compare mini-batch variants
mb_results = variants_large.mini_batch_comparison(
    batch_sizes=[50, 100, 200, 500, 1000, 'auto']
)

# Test scalability
print("\n" + "="*60)
print("SCALABILITY ANALYSIS")
print("="*60)
scalability_results = variants_large.scalability_test()

Advanced Initialization Strategies

Beyond K-means++

class AdvancedInitialization:
    """Advanced initialization strategies for K-means"""
    
    def __init__(self, X, n_clusters):
        self.X = X
        self.n_clusters = n_clusters
        
    def furthest_point_initialization(self):
        """Initialize centroids using furthest point heuristic"""
        n_samples = len(self.X)
        centroids = []
        
        # Start with random point
        first_idx = np.random.randint(n_samples)
        centroids.append(self.X[first_idx])
        
        for _ in range(1, self.n_clusters):
            # Find point furthest from existing centroids
            distances = np.array([
                min([np.linalg.norm(x - c) for c in centroids]) 
                for x in self.X
            ])
            furthest_idx = np.argmax(distances)
            centroids.append(self.X[furthest_idx])
        
        return np.array(centroids)
    
    def density_based_initialization(self):
        """Initialize centroids based on local density"""
        from sklearn.neighbors import KernelDensity
        
        # Estimate density
        kde = KernelDensity(kernel='gaussian', bandwidth=1.0)
        kde.fit(self.X)
        densities = np.exp(kde.score_samples(self.X))
        
        centroids = []
        used_indices = set()
        
        for _ in range(self.n_clusters):
            # Find high-density point not too close to existing centroids
            for idx in np.argsort(densities)[::-1]:
                if idx in used_indices:
                    continue
                
                if not centroids:
                    centroids.append(self.X[idx])
                    used_indices.add(idx)
                    break
                else:
                    # Check distance to existing centroids
                    min_dist = min([np.linalg.norm(self.X[idx] - c) 
                                  for c in centroids])
                    if min_dist > np.mean([np.linalg.norm(c1 - c2) 
                                          for i, c1 in enumerate(centroids) 
                                          for c2 in centroids[i+1:]]) * 0.5:
                        centroids.append(self.X[idx])
                        used_indices.add(idx)
                        break
        
        return np.array(centroids)
    
    def compare_initializations(self, n_trials=10):
        """Compare different initialization strategies"""
        
        strategies = {
            'random': 'random',
            'k-means++': 'k-means++',
            'furthest': self.furthest_point_initialization,
            'density': self.density_based_initialization
        }
        
        results = {name: {'inertias': [], 'times': [], 'silhouettes': []} 
                  for name in strategies.keys()}
        
        for trial in range(n_trials):
            for name, strategy in strategies.items():
                start_time = time.time()
                
                if isinstance(strategy, str):
                    # sklearn built-in initialization
                    km = KMeans(n_clusters=self.n_clusters, init=strategy,
                               n_init=1, random_state=trial)
                else:
                    # Custom initialization
                    init_centroids = strategy()
                    km = KMeans(n_clusters=self.n_clusters, init=init_centroids,
                               n_init=1)
                
                km.fit(self.X)
                elapsed = time.time() - start_time
                
                results[name]['inertias'].append(km.inertia_)
                results[name]['times'].append(elapsed)
                results[name]['silhouettes'].append(
                    silhouette_score(self.X, km.labels_)
                )
        
        # Visualize results
        fig, axes = plt.subplots(1, 3, figsize=(15, 5))
        
        # Prepare data for plotting
        methods = list(results.keys())
        inertias_data = [results[m]['inertias'] for m in methods]
        times_data = [results[m]['times'] for m in methods]
        silhouettes_data = [results[m]['silhouettes'] for m in methods]
        
        # Inertia comparison
        bp1 = axes[0].boxplot(inertias_data, labels=methods, patch_artist=True)
        axes[0].set_ylabel('Inertia')
        axes[0].set_title('Final Inertia by Initialization Method')
        axes[0].grid(True, alpha=0.3)
        
        # Time comparison
        bp2 = axes[1].boxplot(times_data, labels=methods, patch_artist=True)
        axes[1].set_ylabel('Time (seconds)')
        axes[1].set_title('Initialization Time')
        axes[1].grid(True, alpha=0.3)
        
        # Silhouette score comparison
        bp3 = axes[2].boxplot(silhouettes_data, labels=methods, patch_artist=True)
        axes[2].set_ylabel('Silhouette Score')
        axes[2].set_title('Clustering Quality')
        axes[2].grid(True, alpha=0.3)
        
        # Color boxes
        colors = ['lightblue', 'lightgreen', 'lightyellow', 'lightcoral']
        for bp in [bp1, bp2, bp3]:
            for patch, color in zip(bp['boxes'], colors):
                patch.set_facecolor(color)
        
        plt.suptitle('Initialization Strategies Comparison', fontsize=14)
        plt.tight_layout()
        plt.show()
        
        # Print statistics
        print("\nInitialization Strategies Statistics:")
        print("="*60)
        for method in methods:
            print(f"\n{method}:")
            print(f"  Inertia: {np.mean(results[method]['inertias']):.2f} "
                  f"(±{np.std(results[method]['inertias']):.2f})")
            print(f"  Silhouette: {np.mean(results[method]['silhouettes']):.3f} "
                  f"(±{np.std(results[method]['silhouettes']):.3f})")
            print(f"  Time: {np.mean(results[method]['times'])*1000:.2f}ms "
                  f"(±{np.std(results[method]['times'])*1000:.2f}ms)")

# Test advanced initialization strategies
print("\n" + "="*60)
print("ADVANCED INITIALIZATION STRATEGIES")
print("="*60)

adv_init = AdvancedInitialization(X, n_clusters=4)
adv_init.compare_initializations(n_trials=20)

Practice Exercises

Exercise 1: Automated K Selection

Implement a system that automatically determines the optimal number of clusters by:

  1. Running multiple metrics (elbow, silhouette, gap statistic)
  2. Using weighted voting based on metric reliability
  3. Providing confidence scores for the recommendation
  4. Generating a detailed report with visualizations

Exercise 2: Hybrid K-means

Create a hybrid K-means algorithm that:

  1. Starts with mini-batch K-means for speed
  2. Refines with standard K-means for accuracy
  3. Uses adaptive batch sizes based on data size
  4. Implements early stopping criteria

Key Takeaways