Gaussian Mixture Models (GMM) represent one of the most elegant approaches to clustering, treating data as arising from a mixture of Gaussian distributions. Unlike hard clustering methods like K-means, GMM provides soft assignments - probabilistic membership to clusters. This makes GMM particularly powerful for capturing uncertainty and handling overlapping clusters.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse
import seaborn as sns
from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_blobs, make_moons, make_circles
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import silhouette_score, adjusted_rand_score
from scipy import stats
from scipy.stats import multivariate_normal
import warnings
warnings.filterwarnings('ignore')
# Set style for better visualizations
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
print("="*60)
print("GAUSSIAN MIXTURE MODELS FUNDAMENTALS")
print("="*60)
# Core concepts explanation
gmm_concepts = """
GMM KEY CONCEPTS:
1. MIXTURE MODEL:
• Data = weighted sum of K Gaussian distributions
• P(x) = Σ πₖ × N(x|μₖ, Σₖ)
• πₖ = mixing coefficients (weights)
2. PARAMETERS:
• μₖ: Mean of component k
• Σₖ: Covariance matrix of component k
• πₖ: Weight of component k (Σπₖ = 1)
3. EXPECTATION-MAXIMIZATION (EM):
• E-step: Calculate probability of each point belonging to each cluster
• M-step: Update parameters to maximize likelihood
• Iterate until convergence
4. COVARIANCE TYPES:
• 'full': Each component has its own covariance matrix
• 'tied': All components share the same covariance
• 'diag': Diagonal covariance (features are independent)
• 'spherical': Single variance per component (like K-means)
5. ADVANTAGES:
• Soft clustering (probabilistic assignments)
• Can model elliptical clusters
• Provides uncertainty estimates
• Solid statistical foundation
"""
print(gmm_concepts)
class GMMVisualizer:
"""Visualize GMM clustering and components"""
def __init__(self):
self.models = {}
self.results = {}
def draw_ellipse(self, position, covariance, ax, **kwargs):
"""Draw an ellipse with given position and covariance"""
angle = np.degrees(np.arctan2(*covariance[:, 0][::-1]))
width, height = 2 * np.sqrt(np.linalg.eigvals(covariance))
ellipse = Ellipse(position, width, height, angle=angle, **kwargs)
ax.add_patch(ellipse)
def visualize_gmm_process(self, X, n_components=3, n_iterations=10):
"""Visualize EM algorithm iterations"""
# Initialize GMM
gmm = GaussianMixture(n_components=n_components,
max_iter=1, warm_start=True,
random_state=42)
fig, axes = plt.subplots(2, 5, figsize=(20, 8))
axes = axes.ravel()
for i in range(min(n_iterations, 10)):
# Perform one iteration
gmm.fit(X)
# Predict labels
labels = gmm.predict(X)
# Plot clusters
axes[i].scatter(X[:, 0], X[:, 1], c=labels,
cmap='viridis', alpha=0.5, s=20)
# Draw Gaussian ellipses
for j in range(n_components):
self.draw_ellipse(gmm.means_[j], gmm.covariances_[j], axes[i],
alpha=0.2, facecolor=f'C{j}',
edgecolor=f'C{j}', linewidth=2)
axes[i].scatter(gmm.means_[j][0], gmm.means_[j][1],
c='red', s=100, marker='x', linewidths=2)
axes[i].set_title(f'Iteration {i+1}')
axes[i].grid(True, alpha=0.3)
axes[i].set_xlim(X[:, 0].min()-1, X[:, 0].max()+1)
axes[i].set_ylim(X[:, 1].min()-1, X[:, 1].max()+1)
plt.suptitle('GMM EM Algorithm Progression', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
return gmm
def compare_covariance_types(self, X, n_components=3):
"""Compare different covariance matrix types"""
covariance_types = ['full', 'tied', 'diag', 'spherical']
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
for idx, cov_type in enumerate(covariance_types):
# Fit GMM
gmm = GaussianMixture(n_components=n_components,
covariance_type=cov_type,
random_state=42)
gmm.fit(X)
labels = gmm.predict(X)
# Store model
self.models[cov_type] = gmm
# Plot clusters
axes[0, idx].scatter(X[:, 0], X[:, 1], c=labels,
cmap='viridis', alpha=0.5, s=20)
# Draw covariance ellipses
for j in range(n_components):
if cov_type == 'full':
cov = gmm.covariances_[j]
elif cov_type == 'tied':
cov = gmm.covariances_
elif cov_type == 'diag':
cov = np.diag(gmm.covariances_[j])
else: # spherical
cov = gmm.covariances_[j] * np.eye(2)
self.draw_ellipse(gmm.means_[j], cov, axes[0, idx],
alpha=0.2, facecolor=f'C{j}',
edgecolor=f'C{j}', linewidth=2)
axes[0, idx].scatter(gmm.means_[j][0], gmm.means_[j][1],
c='red', s=100, marker='x', linewidths=2)
axes[0, idx].set_title(f'{cov_type.capitalize()} Covariance')
axes[0, idx].grid(True, alpha=0.3)
# Calculate metrics
aic = gmm.aic(X)
bic = gmm.bic(X)
log_likelihood = gmm.score(X)
if len(np.unique(labels)) > 1:
silhouette = silhouette_score(X, labels)
else:
silhouette = -1
# Display metrics
metrics_text = f"AIC: {aic:.1f}\nBIC: {bic:.1f}\n"
metrics_text += f"Log-Likelihood: {log_likelihood:.2f}\n"
metrics_text += f"Silhouette: {silhouette:.3f}"
axes[1, idx].text(0.1, 0.5, metrics_text, fontsize=10,
transform=axes[1, idx].transAxes)
axes[1, idx].axis('off')
plt.suptitle('GMM Covariance Types Comparison', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
return self.models
def probability_contours(self, X, gmm):
"""Visualize probability density contours"""
# Create mesh
x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
xx, yy = np.meshgrid(np.linspace(x_min, x_max, 100),
np.linspace(y_min, y_max, 100))
# Calculate probability density
XX = np.array([xx.ravel(), yy.ravel()]).T
Z = -gmm.score_samples(XX)
Z = Z.reshape(xx.shape)
# Plot
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# Contour plot
labels = gmm.predict(X)
contour = axes[0].contour(xx, yy, Z, levels=10, linewidths=1)
axes[0].clabel(contour, inline=True, fontsize=8)
axes[0].scatter(X[:, 0], X[:, 1], c=labels,
cmap='viridis', alpha=0.6, s=20)
axes[0].set_xlabel('Feature 1')
axes[0].set_ylabel('Feature 2')
axes[0].set_title('Probability Density Contours')
axes[0].grid(True, alpha=0.3)
# Filled contour
contourf = axes[1].contourf(xx, yy, Z, levels=20, cmap='viridis', alpha=0.7)
axes[1].scatter(X[:, 0], X[:, 1], c='white',
edgecolors='black', alpha=0.8, s=20)
axes[1].set_xlabel('Feature 1')
axes[1].set_ylabel('Feature 2')
axes[1].set_title('Probability Density Heatmap')
plt.colorbar(contourf, ax=axes[1])
axes[1].grid(True, alpha=0.3)
plt.suptitle('GMM Probability Density Visualization', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
# Generate sample data
np.random.seed(42)
# Create datasets
X_blobs, y_blobs = make_blobs(n_samples=300, centers=3, n_features=2,
cluster_std=0.5, random_state=42)
X_elongated = np.dot(X_blobs, [[0.6, -0.7], [-0.4, 0.8]]) # Create elongated clusters
# Standardize
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_elongated)
# Initialize visualizer
viz = GMMVisualizer()
print("\n" + "="*60)
print("VISUALIZING GMM CLUSTERING")
print("="*60)
# Visualize EM iterations
print("\nVisualizing EM algorithm iterations...")
final_gmm = viz.visualize_gmm_process(X_scaled, n_components=3)
# Compare covariance types
print("\nComparing covariance types...")
models = viz.compare_covariance_types(X_scaled, n_components=3)
# Probability contours
print("\nGenerating probability density visualization...")
best_gmm = models['full']
viz.probability_contours(X_scaled, best_gmm)
class GMMModelSelection:
"""Model selection techniques for GMM"""
def __init__(self):
self.selection_results = {}
def select_n_components(self, X, n_range=range(2, 10)):
"""Select optimal number of components using information criteria"""
aic_scores = []
bic_scores = []
silhouette_scores = []
for n in n_range:
gmm = GaussianMixture(n_components=n, random_state=42)
gmm.fit(X)
# Information criteria
aic_scores.append(gmm.aic(X))
bic_scores.append(gmm.bic(X))
# Silhouette score
labels = gmm.predict(X)
if len(np.unique(labels)) > 1:
silhouette = silhouette_score(X, labels)
else:
silhouette = -1
silhouette_scores.append(silhouette)
# Visualization
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# AIC
axes[0].plot(n_range, aic_scores, 'o-', linewidth=2, markersize=8)
axes[0].set_xlabel('Number of Components')
axes[0].set_ylabel('AIC')
axes[0].set_title('Akaike Information Criterion')
axes[0].grid(True, alpha=0.3)
best_aic_idx = np.argmin(aic_scores)
axes[0].axvline(x=n_range[best_aic_idx], color='r', linestyle='--',
label=f'Best: {n_range[best_aic_idx]}')
axes[0].legend()
# BIC
axes[1].plot(n_range, bic_scores, 'o-', linewidth=2, markersize=8)
axes[1].set_xlabel('Number of Components')
axes[1].set_ylabel('BIC')
axes[1].set_title('Bayesian Information Criterion')
axes[1].grid(True, alpha=0.3)
best_bic_idx = np.argmin(bic_scores)
axes[1].axvline(x=n_range[best_bic_idx], color='r', linestyle='--',
label=f'Best: {n_range[best_bic_idx]}')
axes[1].legend()
# Silhouette
axes[2].plot(n_range, silhouette_scores, 'o-', linewidth=2, markersize=8)
axes[2].set_xlabel('Number of Components')
axes[2].set_ylabel('Silhouette Score')
axes[2].set_title('Silhouette Analysis')
axes[2].grid(True, alpha=0.3)
best_sil_idx = np.argmax(silhouette_scores)
axes[2].axvline(x=n_range[best_sil_idx], color='r', linestyle='--',
label=f'Best: {n_range[best_sil_idx]}')
axes[2].legend()
plt.suptitle('GMM Model Selection', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
# Store results
self.selection_results = {
'aic': {'scores': aic_scores, 'best_n': n_range[best_aic_idx]},
'bic': {'scores': bic_scores, 'best_n': n_range[best_bic_idx]},
'silhouette': {'scores': silhouette_scores, 'best_n': n_range[best_sil_idx]}
}
print(f"\nOptimal number of components:")
print(f" AIC suggests: {n_range[best_aic_idx]}")
print(f" BIC suggests: {n_range[best_bic_idx]}")
print(f" Silhouette suggests: {n_range[best_sil_idx]}")
return self.selection_results
def cross_validation_gmm(self, X, param_grid=None):
"""Cross-validation for GMM hyperparameters"""
if param_grid is None:
param_grid = {
'n_components': [2, 3, 4, 5],
'covariance_type': ['full', 'tied', 'diag', 'spherical']
}
# Custom scorer for GMM (using BIC)
def gmm_bic_scorer(estimator, X):
return -estimator.bic(X) # Negative because GridSearchCV maximizes
# Grid search
gmm = GaussianMixture(random_state=42)
grid_search = GridSearchCV(gmm, param_grid,
scoring=gmm_bic_scorer, cv=3)
grid_search.fit(X)
# Results
results_df = pd.DataFrame(grid_search.cv_results_)
# Visualization
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# Heatmap of results
pivot_table = results_df.pivot_table(
values='mean_test_score',
index='param_covariance_type',
columns='param_n_components'
)
sns.heatmap(-pivot_table, annot=True, fmt='.0f',
cmap='viridis', ax=axes[0])
axes[0].set_title('BIC Scores (lower is better)')
axes[0].set_xlabel('Number of Components')
axes[0].set_ylabel('Covariance Type')
# Best model visualization
best_gmm = grid_search.best_estimator_
labels = best_gmm.predict(X)
axes[1].scatter(X[:, 0], X[:, 1], c=labels,
cmap='viridis', alpha=0.6, s=30)
axes[1].set_title(f'Best Model: {grid_search.best_params_}')
axes[1].grid(True, alpha=0.3)
plt.suptitle('GMM Cross-Validation Results', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
print(f"\nBest parameters: {grid_search.best_params_}")
print(f"Best BIC score: {-grid_search.best_score_:.2f}")
return grid_search
def stability_analysis(self, X, n_components=3, n_runs=20):
"""Analyze GMM stability across multiple runs"""
labels_collection = []
log_likelihoods = []
for i in range(n_runs):
gmm = GaussianMixture(n_components=n_components,
random_state=i)
gmm.fit(X)
labels = gmm.predict(X)
labels_collection.append(labels)
log_likelihoods.append(gmm.score(X))
# Calculate pairwise ARI
ari_matrix = np.zeros((n_runs, n_runs))
for i in range(n_runs):
for j in range(n_runs):
ari_matrix[i, j] = adjusted_rand_score(
labels_collection[i], labels_collection[j]
)
# Visualization
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# ARI heatmap
sns.heatmap(ari_matrix, vmin=0, vmax=1, cmap='RdYlGn',
ax=axes[0], cbar_kws={'label': 'ARI'})
axes[0].set_title('Clustering Stability (ARI between runs)')
axes[0].set_xlabel('Run')
axes[0].set_ylabel('Run')
# Log-likelihood distribution
axes[1].hist(log_likelihoods, bins=15, edgecolor='black', alpha=0.7)
axes[1].axvline(x=np.mean(log_likelihoods), color='r',
linestyle='--', label=f'Mean: {np.mean(log_likelihoods):.2f}')
axes[1].set_xlabel('Log-Likelihood')
axes[1].set_ylabel('Frequency')
axes[1].set_title('Log-Likelihood Distribution')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.suptitle('GMM Stability Analysis', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
mean_ari = np.mean(ari_matrix[np.triu_indices_from(ari_matrix, k=1)])
print(f"\nMean ARI across runs: {mean_ari:.3f}")
print(f"Std of log-likelihoods: {np.std(log_likelihoods):.3f}")
return ari_matrix, log_likelihoods
# Model selection
selector = GMMModelSelection()
print("\n" + "="*60)
print("GMM MODEL SELECTION")
print("="*60)
# Select number of components
print("\nSelecting optimal number of components...")
selection_results = selector.select_n_components(X_scaled)
# Cross-validation
print("\nPerforming cross-validation...")
grid_search = selector.cross_validation_gmm(X_scaled)
# Stability analysis
print("\nAnalyzing model stability...")
ari_matrix, log_likes = selector.stability_analysis(X_scaled, n_components=3)
class AdvancedGMM:
"""Advanced GMM techniques and applications"""
def __init__(self):
self.models = {}
def bayesian_gmm(self, X, n_components_max=10):
"""Bayesian GMM with automatic component selection"""
from sklearn.mixture import BayesianGaussianMixture
# Fit Bayesian GMM
bgmm = BayesianGaussianMixture(
n_components=n_components_max,
weight_concentration_prior_type='dirichlet_process',
weight_concentration_prior=0.01,
random_state=42
)
bgmm.fit(X)
# Get effective number of components
weights = bgmm.weights_
effective_components = np.sum(weights > 0.01)
# Predict labels
labels = bgmm.predict(X)
unique_labels = np.unique(labels)
# Visualization
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# Clustering result
axes[0].scatter(X[:, 0], X[:, 1], c=labels,
cmap='viridis', alpha=0.6, s=30)
axes[0].set_title(f'Bayesian GMM ({len(unique_labels)} active components)')
axes[0].grid(True, alpha=0.3)
# Component weights
axes[1].bar(range(n_components_max), weights, color='skyblue',
edgecolor='black')
axes[1].axhline(y=0.01, color='r', linestyle='--',
label='Threshold (0.01)')
axes[1].set_xlabel('Component')
axes[1].set_ylabel('Weight')
axes[1].set_title('Component Weights')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# Uncertainty visualization
proba = bgmm.predict_proba(X)
uncertainty = 1 - np.max(proba, axis=1)
scatter = axes[2].scatter(X[:, 0], X[:, 1], c=uncertainty,
cmap='RdYlBu_r', alpha=0.6, s=30)
axes[2].set_title('Prediction Uncertainty')
plt.colorbar(scatter, ax=axes[2])
axes[2].grid(True, alpha=0.3)
plt.suptitle('Bayesian Gaussian Mixture Model', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
print(f"\nBayesian GMM Results:")
print(f" Max components: {n_components_max}")
print(f" Effective components: {effective_components}")
print(f" Active components: {len(unique_labels)}")
return bgmm
def online_gmm(self, X_initial, X_stream, batch_size=50):
"""Online/incremental GMM for streaming data"""
# Initialize with initial data
gmm = GaussianMixture(n_components=3, warm_start=True,
random_state=42)
gmm.fit(X_initial)
# Process stream
n_batches = len(X_stream) // batch_size
means_history = [gmm.means_.copy()]
for i in range(n_batches):
# Get batch
start_idx = i * batch_size
end_idx = start_idx + batch_size
batch = X_stream[start_idx:end_idx]
# Update model (simplified online update)
# In practice, you'd use more sophisticated online EM
combined = np.vstack([X_initial, batch])
gmm.fit(combined)
X_initial = combined[-200:] # Keep last 200 points
means_history.append(gmm.means_.copy())
# Visualize evolution
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# Initial state
labels_init = gmm.predict(X_stream[:100])
axes[0].scatter(X_stream[:100, 0], X_stream[:100, 1],
c=labels_init, cmap='viridis', alpha=0.6, s=20)
axes[0].set_title('Initial Clustering')
axes[0].grid(True, alpha=0.3)
# Final state
labels_final = gmm.predict(X_stream)
axes[1].scatter(X_stream[:, 0], X_stream[:, 1],
c=labels_final, cmap='viridis', alpha=0.6, s=20)
axes[1].set_title('Final Clustering')
axes[1].grid(True, alpha=0.3)
# Means trajectory
means_history = np.array(means_history)
for j in range(3):
axes[2].plot(means_history[:, j, 0], means_history[:, j, 1],
'o-', label=f'Component {j+1}', alpha=0.7)
axes[2].set_xlabel('Feature 1')
axes[2].set_ylabel('Feature 2')
axes[2].set_title('Component Means Evolution')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.suptitle('Online GMM Learning', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
return gmm, means_history
def gmm_sampling(self, gmm, n_samples=500):
"""Generate new samples from fitted GMM"""
# Generate samples
samples, labels = gmm.sample(n_samples)
# Original data for comparison
X_original = X_scaled[:300]
labels_original = gmm.predict(X_original)
# Visualization
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# Original data
axes[0].scatter(X_original[:, 0], X_original[:, 1],
c=labels_original, cmap='viridis', alpha=0.6, s=30)
axes[0].set_title('Original Data')
axes[0].set_xlim(-3, 3)
axes[0].set_ylim(-3, 3)
axes[0].grid(True, alpha=0.3)
# Generated samples
axes[1].scatter(samples[:, 0], samples[:, 1],
c=labels, cmap='viridis', alpha=0.6, s=30)
axes[1].set_title('Generated Samples')
axes[1].set_xlim(-3, 3)
axes[1].set_ylim(-3, 3)
axes[1].grid(True, alpha=0.3)
# Combined
axes[2].scatter(X_original[:, 0], X_original[:, 1],
c='blue', alpha=0.3, s=20, label='Original')
axes[2].scatter(samples[:, 0], samples[:, 1],
c='red', alpha=0.3, s=20, label='Generated')
axes[2].set_title('Original vs Generated')
axes[2].set_xlim(-3, 3)
axes[2].set_ylim(-3, 3)
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.suptitle('GMM Sampling', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
# Statistical comparison
print("\nStatistical Comparison:")
print(f"Original mean: {X_original.mean(axis=0)}")
print(f"Generated mean: {samples.mean(axis=0)}")
print(f"Original std: {X_original.std(axis=0)}")
print(f"Generated std: {samples.std(axis=0)}")
return samples, labels
def gmm_outlier_detection(self, X, contamination=0.1):
"""Use GMM for outlier detection"""
# Fit GMM
gmm = GaussianMixture(n_components=3, random_state=42)
gmm.fit(X)
# Calculate log probabilities
log_probs = gmm.score_samples(X)
# Determine threshold
threshold = np.percentile(log_probs, contamination * 100)
# Identify outliers
outliers = log_probs < threshold
# Visualization
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# Log probability distribution
axes[0].hist(log_probs, bins=30, edgecolor='black', alpha=0.7)
axes[0].axvline(x=threshold, color='r', linestyle='--',
label=f'Threshold ({contamination*100}%)')
axes[0].set_xlabel('Log Probability')
axes[0].set_ylabel('Frequency')
axes[0].set_title('Log Probability Distribution')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# Outlier detection
axes[1].scatter(X[~outliers, 0], X[~outliers, 1],
c='blue', alpha=0.6, s=20, label='Inliers')
axes[1].scatter(X[outliers, 0], X[outliers, 1],
c='red', marker='x', s=50, label='Outliers')
axes[1].set_title('Outlier Detection')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# Probability heatmap
x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
xx, yy = np.meshgrid(np.linspace(x_min, x_max, 100),
np.linspace(y_min, y_max, 100))
Z = gmm.score_samples(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
contourf = axes[2].contourf(xx, yy, Z, levels=20, cmap='viridis')
axes[2].scatter(X[outliers, 0], X[outliers, 1],
c='red', marker='x', s=50)
axes[2].set_title('Probability Landscape')
plt.colorbar(contourf, ax=axes[2])
axes[2].grid(True, alpha=0.3)
plt.suptitle('GMM for Outlier Detection', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
print(f"\nOutlier Detection Results:")
print(f" Number of outliers: {np.sum(outliers)}")
print(f" Percentage: {np.sum(outliers)/len(X)*100:.1f}%")
print(f" Threshold: {threshold:.3f}")
return outliers, log_probs
# Advanced techniques
advanced = AdvancedGMM()
print("\n" + "="*60)
print("ADVANCED GMM TECHNIQUES")
print("="*60)
# Bayesian GMM
print("\nDemonstrating Bayesian GMM...")
bgmm = advanced.bayesian_gmm(X_scaled)
# Online GMM
print("\nDemonstrating online GMM...")
X_stream = np.random.randn(500, 2) * 0.5 + [1, 1]
gmm_online, means_hist = advanced.online_gmm(X_scaled[:100], X_stream)
# GMM sampling
print("\nGenerating new samples from GMM...")
samples, sample_labels = advanced.gmm_sampling(best_gmm)
# Outlier detection
print("\nUsing GMM for outlier detection...")
# Add some outliers
X_with_outliers = np.vstack([X_scaled,
np.random.uniform(-4, 4, (30, 2))])
outliers, log_probs = advanced.gmm_outlier_detection(X_with_outliers)
class GMMApplications:
"""Real-world applications of GMM"""
def __init__(self):
self.results = {}
def image_segmentation(self):
"""Image segmentation using GMM"""
# Create synthetic image data
np.random.seed(42)
# Generate image with 3 regions
image = np.zeros((100, 100, 3))
# Region 1: Dark (background)
image[:40, :] = np.random.normal(0.2, 0.05, (40, 100, 3))
# Region 2: Medium (object 1)
image[40:70, 20:80] = np.random.normal(0.5, 0.05, (30, 60, 3))
# Region 3: Bright (object 2)
image[70:, 30:70] = np.random.normal(0.8, 0.05, (30, 40, 3))
# Reshape for GMM
pixels = image.reshape((-1, 3))
# Fit GMM
gmm = GaussianMixture(n_components=3, random_state=42)
gmm.fit(pixels)
labels = gmm.predict(pixels)
# Reshape back to image
segmented = labels.reshape((100, 100))
# Visualization
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
# Original image channels
for i in range(3):
axes[0, i].imshow(image[:, :, i], cmap='gray')
axes[0, i].set_title(f'Channel {i+1}')
axes[0, i].axis('off')
# Segmented image
axes[1, 0].imshow(segmented, cmap='viridis')
axes[1, 0].set_title('Segmented Image')
axes[1, 0].axis('off')
# Probability maps for each component
proba = gmm.predict_proba(pixels)
for i in range(min(2, 3)):
prob_map = proba[:, i].reshape((100, 100))
im = axes[1, i+1].imshow(prob_map, cmap='hot')
axes[1, i+1].set_title(f'Component {i+1} Probability')
axes[1, i+1].axis('off')
plt.colorbar(im, ax=axes[1, i+1], fraction=0.046)
plt.suptitle('Image Segmentation with GMM', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
print(f"\nImage Segmentation Results:")
for i in range(3):
pixel_count = np.sum(labels == i)
print(f" Region {i+1}: {pixel_count} pixels ({pixel_count/len(pixels)*100:.1f}%)")
return segmented, gmm
def speaker_recognition(self):
"""Speaker recognition using GMM on audio features"""
np.random.seed(42)
# Simulate MFCC features for 3 speakers
n_frames = 200
n_features = 13
speakers_data = []
true_labels = []
# Speaker 1: Lower pitch
speaker1 = np.random.multivariate_normal(
np.array([5, 3, 2] + [0]*10),
np.eye(n_features) * 0.5,
n_frames
)
speakers_data.append(speaker1)
true_labels.extend([0] * n_frames)
# Speaker 2: Medium pitch
speaker2 = np.random.multivariate_normal(
np.array([7, 5, 4] + [1]*10),
np.eye(n_features) * 0.7,
n_frames
)
speakers_data.append(speaker2)
true_labels.extend([1] * n_frames)
# Speaker 3: Higher pitch
speaker3 = np.random.multivariate_normal(
np.array([10, 7, 6] + [2]*10),
np.eye(n_features) * 0.6,
n_frames
)
speakers_data.append(speaker3)
true_labels.extend([2] * n_frames)
X_audio = np.vstack(speakers_data)
# Train individual GMMs for each speaker
speaker_models = []
for i, data in enumerate(speakers_data):
gmm = GaussianMixture(n_components=2, random_state=42)
gmm.fit(data)
speaker_models.append(gmm)
# Test recognition
test_idx = np.random.choice(len(X_audio), 100, replace=False)
X_test = X_audio[test_idx]
y_test = np.array(true_labels)[test_idx]
# Predict speaker
predictions = []
for sample in X_test:
scores = [model.score_samples([sample])[0]
for model in speaker_models]
predictions.append(np.argmax(scores))
predictions = np.array(predictions)
# Calculate accuracy
accuracy = np.mean(predictions == y_test)
# Visualization
from sklearn.decomposition import PCA
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_audio)
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# True speaker distribution
axes[0].scatter(X_pca[:, 0], X_pca[:, 1],
c=true_labels, cmap='viridis', alpha=0.5, s=10)
axes[0].set_title('True Speaker Distribution')
axes[0].set_xlabel('PC1')
axes[0].set_ylabel('PC2')
axes[0].grid(True, alpha=0.3)
# Test samples
test_pca = pca.transform(X_test)
axes[1].scatter(test_pca[:, 0], test_pca[:, 1],
c=y_test, cmap='viridis', alpha=0.5, s=30)
axes[1].set_title('Test Samples')
axes[1].set_xlabel('PC1')
axes[1].set_ylabel('PC2')
axes[1].grid(True, alpha=0.3)
# Confusion matrix
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(y_test, predictions)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[2])
axes[2].set_title(f'Confusion Matrix (Acc: {accuracy:.2%})')
axes[2].set_xlabel('Predicted Speaker')
axes[2].set_ylabel('True Speaker')
plt.suptitle('Speaker Recognition with GMM', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
print(f"\nSpeaker Recognition Results:")
print(f" Accuracy: {accuracy:.2%}")
print(f" Speakers identified: {len(speaker_models)}")
return speaker_models, accuracy
def financial_regime_detection(self):
"""Detect market regimes using GMM"""
np.random.seed(42)
# Simulate financial data with different regimes
n_days = 1000
returns = []
volatility = []
regimes_true = []
# Bull market (high returns, low volatility)
bull_days = 400
returns.extend(np.random.normal(0.001, 0.01, bull_days))
volatility.extend(np.abs(np.random.normal(0.01, 0.003, bull_days)))
regimes_true.extend([0] * bull_days)
# Bear market (negative returns, high volatility)
bear_days = 300
returns.extend(np.random.normal(-0.0005, 0.02, bear_days))
volatility.extend(np.abs(np.random.normal(0.025, 0.005, bear_days)))
regimes_true.extend([1] * bear_days)
# Sideways market (neutral returns, medium volatility)
sideways_days = 300
returns.extend(np.random.normal(0, 0.012, sideways_days))
volatility.extend(np.abs(np.random.normal(0.015, 0.004, sideways_days)))
regimes_true.extend([2] * sideways_days)
# Combine features
X_financial = np.column_stack([returns, volatility])
# Fit GMM
gmm = GaussianMixture(n_components=3, random_state=42)
gmm.fit(X_financial)
predicted_regimes = gmm.predict(X_financial)
# Calculate regime probabilities
regime_probs = gmm.predict_proba(X_financial)
# Visualization
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
# Scatter plot of regimes
scatter = axes[0, 0].scatter(returns, volatility,
c=predicted_regimes, cmap='viridis',
alpha=0.5, s=10)
axes[0, 0].set_xlabel('Returns')
axes[0, 0].set_ylabel('Volatility')
axes[0, 0].set_title('Market Regimes (GMM)')
axes[0, 0].grid(True, alpha=0.3)
plt.colorbar(scatter, ax=axes[0, 0])
# Time series of returns with regimes
time = np.arange(n_days)
axes[0, 1].scatter(time, returns, c=predicted_regimes,
cmap='viridis', alpha=0.5, s=5)
axes[0, 1].set_xlabel('Time (days)')
axes[0, 1].set_ylabel('Returns')
axes[0, 1].set_title('Returns with Regime Coloring')
axes[0, 1].grid(True, alpha=0.3)
# Cumulative returns by regime
cumulative_returns = np.cumsum(returns)
axes[0, 2].plot(time, cumulative_returns, 'b-', alpha=0.7)
axes[0, 2].set_xlabel('Time (days)')
axes[0, 2].set_ylabel('Cumulative Returns')
axes[0, 2].set_title('Cumulative Performance')
axes[0, 2].grid(True, alpha=0.3)
# Regime probabilities over time
for i in range(3):
axes[1, 0].plot(time[:200], regime_probs[:200, i],
label=f'Regime {i+1}', alpha=0.7)
axes[1, 0].set_xlabel('Time (first 200 days)')
axes[1, 0].set_ylabel('Probability')
axes[1, 0].set_title('Regime Probabilities')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# Regime statistics
regime_stats = pd.DataFrame(X_financial, columns=['Returns', 'Volatility'])
regime_stats['Regime'] = predicted_regimes
stats = regime_stats.groupby('Regime').agg(['mean', 'std'])
axes[1, 1].axis('tight')
axes[1, 1].axis('off')
table = axes[1, 1].table(cellText=stats.round(4).values,
rowLabels=[f'Regime {i+1}' for i in range(3)],
colLabels=stats.columns,
cellLoc='center',
loc='center')
table.auto_set_font_size(False)
table.set_fontsize(9)
axes[1, 1].set_title('Regime Statistics')
# Transition matrix
transitions = np.zeros((3, 3))
for i in range(len(predicted_regimes)-1):
transitions[predicted_regimes[i], predicted_regimes[i+1]] += 1
transitions = transitions / transitions.sum(axis=1, keepdims=True)
sns.heatmap(transitions, annot=True, fmt='.2f',
cmap='YlOrRd', ax=axes[1, 2])
axes[1, 2].set_title('Regime Transition Probabilities')
axes[1, 2].set_xlabel('To Regime')
axes[1, 2].set_ylabel('From Regime')
plt.suptitle('Financial Market Regime Detection', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
print(f"\nRegime Detection Results:")
for i in range(3):
regime_days = np.sum(predicted_regimes == i)
print(f" Regime {i+1}: {regime_days} days ({regime_days/n_days*100:.1f}%)")
return predicted_regimes, regime_probs
# Applications
apps = GMMApplications()
print("\n" + "="*60)
print("GMM REAL-WORLD APPLICATIONS")
print("="*60)
# Image segmentation
print("\n1. Image Segmentation:")
print("-" * 30)
segmented_img, seg_gmm = apps.image_segmentation()
# Speaker recognition
print("\n2. Speaker Recognition:")
print("-" * 30)
speaker_models, speaker_acc = apps.speaker_recognition()
# Financial regime detection
print("\n3. Financial Market Regime Detection:")
print("-" * 30)
market_regimes, regime_probabilities = apps.financial_regime_detection()
print("\n" + "="*60)
print("GMM BEST PRACTICES")
print("="*60)
best_practices = """
KEY GUIDELINES:
1. DATA PREPROCESSING:
• Always standardize/normalize features
• Check for multicollinearity
• Consider PCA for high-dimensional data
• Handle outliers appropriately
2. MODEL SELECTION:
• Use AIC for predictive models
• Use BIC for explanatory models (penalizes complexity more)
• Cross-validate for stability
• Consider Bayesian GMM for automatic selection
3. COVARIANCE TYPE SELECTION:
• 'full': Most flexible, best for complex data
• 'tied': When components have similar shape
• 'diag': Assumes feature independence
• 'spherical': Similar to K-means
4. INITIALIZATION:
• Use 'kmeans' for stable initialization
• Try multiple random starts
• Consider warm_start for incremental learning
5. CONVERGENCE:
• Monitor log-likelihood
• Check convergence tolerance
• Increase max_iter if needed
• Watch for local optima
6. ADVANTAGES:
✓ Soft clustering (probabilistic)
✓ Captures cluster shape and orientation
✓ Provides uncertainty estimates
✓ Solid statistical foundation
✓ Can generate new samples
7. DISADVANTAGES:
✗ Assumes Gaussian distributions
✗ Sensitive to initialization
✗ Can converge to local optima
✗ Computationally expensive for large datasets
✗ Struggles with non-convex clusters
"""
print(best_practices)
# Comparison with other methods
comparison_data = {
'Method': ['GMM', 'K-means', 'DBSCAN', 'Hierarchical'],
'Type': ['Soft', 'Hard', 'Hard', 'Hard'],
'Cluster Shape': ['Elliptical', 'Spherical', 'Arbitrary', 'Any'],
'Noise Handling': ['Moderate', 'Poor', 'Excellent', 'Poor'],
'Scalability': ['O(nkd²)', 'O(nkd)', 'O(n log n)', 'O(n²)'],
'Parameters': ['k, cov_type', 'k', 'eps, min_pts', 'linkage'],
'Probabilistic': ['Yes', 'No', 'No', 'No']
}
comparison_df = pd.DataFrame(comparison_data)
print("\nClustering Methods Comparison:")
print("="*60)
print(comparison_df.to_string(index=False))
# Implementation checklist
checklist = """
IMPLEMENTATION CHECKLIST:
□ Data cleaned and preprocessed
□ Features standardized/normalized
□ Choose number of components (use AIC/BIC)
□ Select covariance type
□ Set initialization method
□ Configure convergence parameters
□ Fit model with multiple random starts
□ Validate clustering stability
□ Check component weights
□ Analyze uncertainty in predictions
□ Interpret cluster characteristics
"""
print(checklist)
# When to use GMM
use_cases = """
WHEN TO USE GMM:
✓ Need probability estimates for cluster membership
✓ Clusters have elliptical shapes
✓ Want to generate new samples
✓ Dealing with overlapping clusters
✓ Need a generative model
✓ Anomaly detection with probability threshold
✓ When theoretical foundation is important
WHEN NOT TO USE GMM:
✗ Data clearly non-Gaussian
✗ Very high dimensional data
✗ Need deterministic results
✗ Clusters have arbitrary shapes
✗ Large datasets (computational cost)
✗ Presence of many outliers
"""
print(use_cases)
Implement semi-supervised learning with GMM:
Create an ensemble of GMMs:
Implement GMM for time-varying data: