Cross-validation
Robust Model Evaluation! š
Cross-validation is the gold standard for assessing model performance. By training and testing on multiple data splits, it provides a more reliable estimate of how your model will perform on unseen data. From simple k-fold to sophisticated nested cross-validation, these techniques ensure your models generalize well beyond the training data.
Understanding Cross-validation
graph LR
A[Dataset] --> B[Cross-validation]
B --> C[K-Fold CV]
B --> D[Stratified K-Fold]
B --> E[Leave-One-Out]
B --> F[Leave-P-Out]
B --> G[Time Series CV]
B --> H[Group K-Fold]
C --> I[Fold 1: Train on 2-5, Test on 1]
C --> J[Fold 2: Train on 1,3-5, Test on 2]
C --> K[Fold 3: Train on 1-2,4-5, Test on 3]
C --> L[Fold 4: Train on 1-3,5, Test on 4]
C --> M[Fold 5: Train on 1-4, Test on 5]
I --> N[Score 1]
J --> N[Score 2]
K --> N[Score 3]
L --> N[Score 4]
M --> N[Score 5]
N --> O[Mean Score ± Std]
K-Fold Cross-validation
Standard K-Fold Implementation
import numpy as np
import pandas as pd
from sklearn.model_selection import KFold, cross_val_score, cross_validate
from sklearn.datasets import make_classification, make_regression
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import matplotlib.pyplot as plt
import seaborn as sns
# Set style
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
# Generate sample data
X_class, y_class = make_classification(n_samples=1000, n_features=20,
n_informative=15, n_redundant=5,
n_classes=2, random_state=42)
X_reg, y_reg = make_regression(n_samples=1000, n_features=20,
n_informative=15, noise=10, random_state=42)
# Basic K-Fold Cross-validation
def demonstrate_kfold(X, y, n_splits=5):
"""Demonstrate K-Fold cross-validation"""
kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
print(f"K-Fold Cross-validation with {n_splits} folds")
print("=" * 50)
fold_info = []
for fold, (train_idx, val_idx) in enumerate(kf.split(X), 1):
fold_info.append({
'Fold': fold,
'Train Size': len(train_idx),
'Validation Size': len(val_idx),
'Train %': f"{len(train_idx)/len(X):.1%}",
'Val %': f"{len(val_idx)/len(X):.1%}"
})
# Show first few indices
if fold == 1:
print(f"Fold {fold} indices:")
print(f" Train (first 10): {train_idx[:10]}")
print(f" Val (first 10): {val_idx[:10]}")
fold_df = pd.DataFrame(fold_info)
print("\nFold Statistics:")
print(fold_df.to_string(index=False))
return kf
kf = demonstrate_kfold(X_class, y_class)
# Using cross_val_score for quick evaluation
model = LogisticRegression(random_state=42, max_iter=1000)
scores = cross_val_score(model, X_class, y_class, cv=5, scoring='accuracy')
print(f"\nCross-validation scores: {scores}")
print(f"Mean accuracy: {scores.mean():.3f} (+/- {scores.std() * 2:.3f})")
# Visualize K-Fold splits
def visualize_kfold_splits(X, n_splits=5):
"""Visualize how K-Fold splits the data"""
kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
fig, ax = plt.subplots(figsize=(12, 6))
# Create a matrix to show train/test splits
n_samples = len(X)
split_matrix = np.zeros((n_splits, n_samples))
for fold, (train_idx, test_idx) in enumerate(kf.split(X)):
split_matrix[fold, train_idx] = 1 # Training samples
split_matrix[fold, test_idx] = 2 # Test samples
# Plot
cmap = plt.cm.coolwarm
im = ax.imshow(split_matrix, aspect='auto', cmap=cmap, vmin=0, vmax=2)
ax.set_yticks(range(n_splits))
ax.set_yticklabels([f'Fold {i+1}' for i in range(n_splits)])
ax.set_xlabel('Sample Index')
ax.set_ylabel('Fold')
ax.set_title(f'{n_splits}-Fold Cross-validation Data Split')
# Add colorbar
cbar = plt.colorbar(im, ax=ax, ticks=[0, 1, 2])
cbar.set_label('Dataset')
cbar.ax.set_yticklabels(['Not Used', 'Training', 'Validation'])
plt.tight_layout()
plt.show()
visualize_kfold_splits(X_class[:100], n_splits=5)
# Multiple metrics with cross_validate
def evaluate_with_multiple_metrics(X, y, model, cv=5):
"""Evaluate model with multiple metrics"""
# Define scoring metrics
scoring = {
'accuracy': 'accuracy',
'precision': 'precision_weighted',
'recall': 'recall_weighted',
'f1': 'f1_weighted',
'roc_auc': 'roc_auc'
}
# Perform cross-validation
cv_results = cross_validate(model, X, y, cv=cv, scoring=scoring,
return_train_score=True, return_estimator=False)
# Create results dataframe
results_df = pd.DataFrame({
'Metric': [],
'Train Mean': [],
'Train Std': [],
'Val Mean': [],
'Val Std': []
})
for metric in scoring.keys():
train_key = f'train_{metric}'
test_key = f'test_{metric}'
results_df = pd.concat([results_df, pd.DataFrame({
'Metric': [metric],
'Train Mean': [cv_results[train_key].mean()],
'Train Std': [cv_results[train_key].std()],
'Val Mean': [cv_results[test_key].mean()],
'Val Std': [cv_results[test_key].std()]
})], ignore_index=True)
print("\nCross-validation Results (Multiple Metrics):")
print(results_df.to_string(index=False))
# Plot results
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# Means comparison
x = np.arange(len(results_df))
width = 0.35
axes[0].bar(x - width/2, results_df['Train Mean'], width,
label='Train', alpha=0.7)
axes[0].bar(x + width/2, results_df['Val Mean'], width,
label='Validation', alpha=0.7)
axes[0].set_xlabel('Metric')
axes[0].set_ylabel('Score')
axes[0].set_title('Cross-validation: Train vs Validation Scores')
axes[0].set_xticks(x)
axes[0].set_xticklabels(results_df['Metric'], rotation=45)
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# Fold-by-fold scores for accuracy
fold_scores = cv_results['test_accuracy']
axes[1].plot(range(1, len(fold_scores) + 1), fold_scores, 'o-', markersize=8)
axes[1].axhline(y=fold_scores.mean(), color='r', linestyle='--',
label=f'Mean: {fold_scores.mean():.3f}')
axes[1].fill_between(range(1, len(fold_scores) + 1),
fold_scores.mean() - fold_scores.std(),
fold_scores.mean() + fold_scores.std(),
alpha=0.2, color='red')
axes[1].set_xlabel('Fold')
axes[1].set_ylabel('Accuracy')
axes[1].set_title('Accuracy Across Folds')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return cv_results
# Evaluate with multiple metrics
model = LogisticRegression(random_state=42, max_iter=1000)
cv_results = evaluate_with_multiple_metrics(X_class, y_class, model)
Stratified Cross-validation
Maintaining Class Balance Across Folds
from sklearn.model_selection import StratifiedKFold, StratifiedShuffleSplit
import numpy as np
import pandas as pd
# Create imbalanced dataset
np.random.seed(42)
X_imbalanced = np.vstack([
np.random.randn(900, 10), # Class 0 (majority)
np.random.randn(100, 10) + 2 # Class 1 (minority)
])
y_imbalanced = np.array([0] * 900 + [1] * 100)
print("Dataset Class Distribution:")
print(pd.Series(y_imbalanced).value_counts())
print(f"Class 1 ratio: {(y_imbalanced == 1).mean():.1%}")
# Compare KFold vs StratifiedKFold
def compare_fold_strategies(X, y, n_splits=5):
"""Compare regular K-Fold with Stratified K-Fold"""
# Regular K-Fold
kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
# Stratified K-Fold
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
fig, axes = plt.subplots(2, n_splits, figsize=(15, 6))
# Regular K-Fold
for fold, (train_idx, val_idx) in enumerate(kf.split(X, y)):
y_val = y[val_idx]
class_dist = pd.Series(y_val).value_counts(normalize=True).sort_index()
axes[0, fold].bar(class_dist.index, class_dist.values, alpha=0.7)
axes[0, fold].set_title(f'Fold {fold + 1}')
axes[0, fold].set_ylim([0, 1])
axes[0, fold].set_xlabel('Class')
if fold == 0:
axes[0, fold].set_ylabel('K-Fold\nProportion')
# Stratified K-Fold
for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
y_val = y[val_idx]
class_dist = pd.Series(y_val).value_counts(normalize=True).sort_index()
axes[1, fold].bar(class_dist.index, class_dist.values, alpha=0.7, color='green')
axes[1, fold].set_ylim([0, 1])
axes[1, fold].set_xlabel('Class')
if fold == 0:
axes[1, fold].set_ylabel('Stratified\nProportion')
# Add reference line for original distribution
original_ratio = (y == 1).mean()
for ax in axes.flatten():
ax.axhline(y=original_ratio, color='red', linestyle='--',
alpha=0.5, label=f'Original: {original_ratio:.1%}')
plt.suptitle('Class Distribution: K-Fold vs Stratified K-Fold', fontsize=14)
plt.tight_layout()
plt.show()
compare_fold_strategies(X_imbalanced, y_imbalanced)
# Stratified Shuffle Split (for multiple random splits)
def demonstrate_stratified_shuffle_split(X, y, n_splits=5, test_size=0.2):
"""Demonstrate StratifiedShuffleSplit"""
sss = StratifiedShuffleSplit(n_splits=n_splits, test_size=test_size,
random_state=42)
split_info = []
for split, (train_idx, val_idx) in enumerate(sss.split(X, y), 1):
y_train = y[train_idx]
y_val = y[val_idx]
split_info.append({
'Split': split,
'Train Size': len(train_idx),
'Val Size': len(val_idx),
'Train Class 1': f"{(y_train == 1).mean():.1%}",
'Val Class 1': f"{(y_val == 1).mean():.1%}"
})
split_df = pd.DataFrame(split_info)
print("\nStratified Shuffle Split Results:")
print(split_df.to_string(index=False))
return sss
sss = demonstrate_stratified_shuffle_split(X_imbalanced, y_imbalanced)
# Performance comparison on imbalanced data
from sklearn.tree import DecisionTreeClassifier
models = {
'Logistic Regression': LogisticRegression(random_state=42, max_iter=1000),
'Decision Tree': DecisionTreeClassifier(random_state=42, max_depth=5),
'Random Forest': RandomForestClassifier(n_estimators=50, random_state=42)
}
cv_strategies = {
'KFold': KFold(n_splits=5, shuffle=True, random_state=42),
'StratifiedKFold': StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
}
results = []
for model_name, model in models.items():
for cv_name, cv in cv_strategies.items():
scores = cross_val_score(model, X_imbalanced, y_imbalanced,
cv=cv, scoring='f1')
results.append({
'Model': model_name,
'CV Strategy': cv_name,
'Mean F1': scores.mean(),
'Std F1': scores.std(),
'Min F1': scores.min(),
'Max F1': scores.max()
})
results_df = pd.DataFrame(results)
print("\nPerformance Comparison on Imbalanced Data:")
print(results_df.to_string(index=False))
# Visualize results
fig, ax = plt.subplots(figsize=(10, 6))
models_list = results_df['Model'].unique()
x = np.arange(len(models_list))
width = 0.35
for i, cv_strategy in enumerate(cv_strategies.keys()):
subset = results_df[results_df['CV Strategy'] == cv_strategy]
means = subset['Mean F1'].values
stds = subset['Std F1'].values
ax.bar(x + i * width, means, width, label=cv_strategy,
yerr=stds, capsize=5, alpha=0.7)
ax.set_xlabel('Model')
ax.set_ylabel('F1 Score')
ax.set_title('Impact of CV Strategy on Imbalanced Data')
ax.set_xticks(x + width / 2)
ax.set_xticklabels(models_list, rotation=45)
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
Leave-One-Out and Leave-P-Out
Exhaustive Cross-validation Methods
from sklearn.model_selection import LeaveOneOut, LeavePOut
from sklearn.datasets import load_iris
import time
# Load small dataset for LOO demonstration
iris = load_iris()
X_small = iris.data[:30] # Use subset for speed
y_small = iris.target[:30]
print(f"Dataset size: {len(X_small)} samples")
# Leave-One-Out Cross-validation
def demonstrate_loo(X, y):
"""Demonstrate Leave-One-Out cross-validation"""
loo = LeaveOneOut()
print(f"\nLeave-One-Out Cross-validation")
print(f"Number of splits: {loo.get_n_splits(X)}")
print("=" * 50)
# Show first few splits
for i, (train_idx, test_idx) in enumerate(loo.split(X)):
if i < 3: # Show first 3 splits
print(f"Split {i+1}:")
print(f" Train size: {len(train_idx)}")
print(f" Test size: {len(test_idx)}")
print(f" Test index: {test_idx[0]}")
# Evaluate model
model = LogisticRegression(random_state=42, max_iter=1000)
start_time = time.time()
scores = cross_val_score(model, X, y, cv=loo, scoring='accuracy')
end_time = time.time()
print(f"\nLOO Results:")
print(f"Scores (first 10): {scores[:10]}")
print(f"Mean accuracy: {scores.mean():.3f}")
print(f"Std accuracy: {scores.std():.3f}")
print(f"Time taken: {end_time - start_time:.2f} seconds")
return scores
loo_scores = demonstrate_loo(X_small, y_small)
# Leave-P-Out Cross-validation
def demonstrate_lpo(X, y, p=2):
"""Demonstrate Leave-P-Out cross-validation"""
lpo = LeavePOut(p)
n_splits = lpo.get_n_splits(X)
print(f"\nLeave-{p}-Out Cross-validation")
print(f"Number of splits: {n_splits}")
print("=" * 50)
if n_splits > 100:
print("Warning: Too many splits! Showing limited evaluation.")
return None
# Show first few splits
for i, (train_idx, test_idx) in enumerate(lpo.split(X)):
if i < 3: # Show first 3 splits
print(f"Split {i+1}:")
print(f" Train size: {len(train_idx)}")
print(f" Test size: {len(test_idx)}")
print(f" Test indices: {test_idx}")
# Evaluate model (only if reasonable number of splits)
if n_splits <= 100:
model = LogisticRegression(random_state=42, max_iter=1000)
start_time = time.time()
scores = cross_val_score(model, X, y, cv=lpo, scoring='accuracy')
end_time = time.time()
print(f"\nLPO Results:")
print(f"Mean accuracy: {scores.mean():.3f}")
print(f"Std accuracy: {scores.std():.3f}")
print(f"Time taken: {end_time - start_time:.2f} seconds")
return scores
return None
# Demonstrate with p=2
lpo_scores = demonstrate_lpo(X_small[:20], y_small[:20], p=2)
# Comparison of CV methods on computation time
def compare_cv_computational_cost(X, y):
"""Compare computational cost of different CV methods"""
cv_methods = {
'5-Fold': KFold(n_splits=5, shuffle=True, random_state=42),
'10-Fold': KFold(n_splits=10, shuffle=True, random_state=42),
'LOO': LeaveOneOut()
}
model = LogisticRegression(random_state=42, max_iter=1000)
timing_results = []
for cv_name, cv in cv_methods.items():
start_time = time.time()
scores = cross_val_score(model, X, y, cv=cv, scoring='accuracy')
end_time = time.time()
timing_results.append({
'Method': cv_name,
'N Splits': cv.get_n_splits(X) if hasattr(cv, 'get_n_splits') else 'N/A',
'Time (s)': end_time - start_time,
'Mean Score': scores.mean(),
'Std Score': scores.std()
})
timing_df = pd.DataFrame(timing_results)
print("\nComputational Cost Comparison:")
print(timing_df.to_string(index=False))
# Visualize
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# Time comparison
axes[0].bar(timing_df['Method'], timing_df['Time (s)'], alpha=0.7)
axes[0].set_xlabel('CV Method')
axes[0].set_ylabel('Time (seconds)')
axes[0].set_title('Computational Time Comparison')
axes[0].grid(True, alpha=0.3)
# Accuracy comparison
axes[1].bar(timing_df['Method'], timing_df['Mean Score'],
yerr=timing_df['Std Score'], capsize=5, alpha=0.7)
axes[1].set_xlabel('CV Method')
axes[1].set_ylabel('Accuracy')
axes[1].set_title('Accuracy Comparison')
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return timing_df
timing_comparison = compare_cv_computational_cost(X_small, y_small)
Time Series Cross-validation
Special Methods for Temporal Data
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_squared_error
import pandas as pd
import numpy as np
# Generate time series data
np.random.seed(42)
n_samples = 365
dates = pd.date_range('2023-01-01', periods=n_samples, freq='D')
# Create features with lag
trend = np.linspace(100, 200, n_samples)
seasonal = 20 * np.sin(np.arange(n_samples) * 2 * np.pi / 30)
noise = np.random.normal(0, 10, n_samples)
y_ts = trend + seasonal + noise
# Create lagged features
X_ts = pd.DataFrame({
'lag_1': np.roll(y_ts, 1),
'lag_7': np.roll(y_ts, 7),
'lag_30': np.roll(y_ts, 30),
'day_of_week': dates.dayofweek,
'month': dates.month,
'trend': np.arange(n_samples)
})
# Remove NaN values from rolling
X_ts = X_ts[30:]
y_ts = y_ts[30:]
dates = dates[30:]
print(f"Time series data: {len(X_ts)} samples")
print(f"Date range: {dates[0]} to {dates[-1]}")
# Time Series Split
def demonstrate_time_series_cv(X, y, dates, n_splits=5):
"""Demonstrate time series cross-validation"""
tscv = TimeSeriesSplit(n_splits=n_splits)
fig, axes = plt.subplots(n_splits, 2, figsize=(14, n_splits * 3))
fold_results = []
for fold, (train_idx, test_idx) in enumerate(tscv.split(X)):
X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
y_train, y_test = y[train_idx], y[test_idx]
dates_train = dates[train_idx]
dates_test = dates[test_idx]
# Train model
model = LinearRegression()
model.fit(X_train, y_train)
# Predict
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
fold_results.append({
'Fold': fold + 1,
'Train Start': dates_train[0],
'Train End': dates_train[-1],
'Test Start': dates_test[0],
'Test End': dates_test[-1],
'Train Size': len(train_idx),
'Test Size': len(test_idx),
'MSE': mse
})
# Plot actual vs predicted
axes[fold, 0].plot(dates_train, y_train, label='Train', alpha=0.7)
axes[fold, 0].plot(dates_test, y_test, label='Test Actual', alpha=0.7)
axes[fold, 0].plot(dates_test, y_pred, label='Test Predicted',
linestyle='--', alpha=0.7)
axes[fold, 0].set_title(f'Fold {fold + 1} - MSE: {mse:.2f}')
axes[fold, 0].legend()
axes[fold, 0].grid(True, alpha=0.3)
# Plot residuals
residuals = y_test - y_pred
axes[fold, 1].scatter(dates_test, residuals, alpha=0.5, s=10)
axes[fold, 1].axhline(y=0, color='red', linestyle='--', alpha=0.5)
axes[fold, 1].set_title(f'Fold {fold + 1} Residuals')
axes[fold, 1].set_ylabel('Residual')
axes[fold, 1].grid(True, alpha=0.3)
plt.suptitle('Time Series Cross-validation', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
fold_df = pd.DataFrame(fold_results)
print("\nTime Series CV Results:")
print(fold_df.to_string(index=False))
return tscv, fold_df
tscv, fold_results = demonstrate_time_series_cv(X_ts, y_ts, dates)
# Custom expanding window cross-validation
class ExpandingWindowCV:
"""Expanding window for time series"""
def __init__(self, initial_train_size, step_size=1, max_train_size=None):
self.initial_train_size = initial_train_size
self.step_size = step_size
self.max_train_size = max_train_size
def split(self, X, y=None):
n_samples = len(X)
train_start = 0
train_end = self.initial_train_size
while train_end < n_samples:
# Apply max_train_size if specified
if self.max_train_size and train_end > self.max_train_size:
train_start = train_end - self.max_train_size
train_idx = np.arange(train_start, train_end)
test_idx = np.arange(train_end, min(train_end + self.step_size, n_samples))
if len(test_idx) > 0:
yield train_idx, test_idx
train_end += self.step_size
# Blocked time series cross-validation
class BlockedTimeSeriesCV:
"""Blocked CV with gap between train and test"""
def __init__(self, n_splits, gap_size=0):
self.n_splits = n_splits
self.gap_size = gap_size
def split(self, X, y=None):
n_samples = len(X)
test_size = n_samples // (self.n_splits + 1)
for i in range(self.n_splits):
test_start = (i + 1) * test_size
test_end = min(test_start + test_size, n_samples)
if self.gap_size > 0:
train_end = test_start - self.gap_size
else:
train_end = test_start
train_idx = np.arange(0, train_end)
test_idx = np.arange(test_start, test_end)
if len(train_idx) > 0 and len(test_idx) > 0:
yield train_idx, test_idx
# Compare different time series CV strategies
def compare_ts_cv_strategies(X, y):
"""Compare different time series CV strategies"""
strategies = {
'TimeSeriesSplit': TimeSeriesSplit(n_splits=5),
'ExpandingWindow': ExpandingWindowCV(initial_train_size=100, step_size=50),
'BlockedTS': BlockedTimeSeriesCV(n_splits=5, gap_size=10)
}
fig, axes = plt.subplots(len(strategies), 1, figsize=(14, len(strategies) * 2))
for ax, (name, cv) in zip(axes, strategies.items()):
# Visualize splits
for fold, (train_idx, test_idx) in enumerate(cv.split(X)):
# Plot as horizontal bars
ax.barh(fold, len(train_idx), left=train_idx[0],
color='blue', alpha=0.5, label='Train' if fold == 0 else '')
ax.barh(fold, len(test_idx), left=test_idx[0],
color='red', alpha=0.5, label='Test' if fold == 0 else '')
ax.set_xlabel('Sample Index')
ax.set_ylabel('Fold')
ax.set_title(f'{name} Strategy')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
compare_ts_cv_strategies(X_ts, y_ts)
Nested Cross-validation
For Model Selection and Evaluation
from sklearn.model_selection import GridSearchCV, cross_val_score
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
# Generate data
X, y = make_classification(n_samples=200, n_features=10, n_informative=5,
n_redundant=5, n_classes=2, random_state=42)
# Nested cross-validation implementation
def nested_cross_validation(X, y, model, param_grid, outer_cv=5, inner_cv=3):
"""
Perform nested cross-validation
Outer loop: Model evaluation
Inner loop: Hyperparameter tuning
"""
outer_scores = []
best_params_list = []
# Outer CV
outer_kf = KFold(n_splits=outer_cv, shuffle=True, random_state=42)
for fold, (train_val_idx, test_idx) in enumerate(outer_kf.split(X), 1):
print(f"Outer Fold {fold}/{outer_cv}")
X_train_val, X_test = X[train_val_idx], X[test_idx]
y_train_val, y_test = y[train_val_idx], y[test_idx]
# Inner CV for hyperparameter tuning
inner_kf = KFold(n_splits=inner_cv, shuffle=True, random_state=42)
grid_search = GridSearchCV(
estimator=model,
param_grid=param_grid,
cv=inner_kf,
scoring='accuracy',
n_jobs=-1
)
# Fit on train+val, find best params
grid_search.fit(X_train_val, y_train_val)
# Evaluate on test set with best model
best_model = grid_search.best_estimator_
test_score = best_model.score(X_test, y_test)
outer_scores.append(test_score)
best_params_list.append(grid_search.best_params_)
print(f" Best params: {grid_search.best_params_}")
print(f" Inner CV score: {grid_search.best_score_:.3f}")
print(f" Outer test score: {test_score:.3f}")
print("\n" + "="*50)
print("Nested CV Results:")
print(f"Outer scores: {outer_scores}")
print(f"Mean: {np.mean(outer_scores):.3f} (+/- {np.std(outer_scores) * 2:.3f})")
return outer_scores, best_params_list
# Example with SVM
svm_param_grid = {
'C': [0.1, 1, 10],
'kernel': ['linear', 'rbf'],
'gamma': ['scale', 'auto']
}
print("SVM Nested Cross-validation:")
svm_scores, svm_params = nested_cross_validation(
X, y,
SVC(random_state=42),
svm_param_grid,
outer_cv=5,
inner_cv=3
)
# Compare nested CV vs simple CV
def compare_nested_vs_simple_cv(X, y):
"""Compare nested CV with simple CV to show overfitting risk"""
# Models and parameters
models = {
'SVM': (SVC(random_state=42), {
'C': [0.1, 1, 10],
'kernel': ['linear', 'rbf']
}),
'Random Forest': (RandomForestClassifier(random_state=42), {
'n_estimators': [10, 50, 100],
'max_depth': [3, 5, None]
})
}
results = []
for model_name, (model, param_grid) in models.items():
print(f"\nEvaluating {model_name}...")
# Simple CV (biased)
grid_search = GridSearchCV(model, param_grid, cv=5, scoring='accuracy')
grid_search.fit(X, y)
simple_score = grid_search.best_score_
# Nested CV (unbiased)
nested_scores, _ = nested_cross_validation(
X, y, model, param_grid, outer_cv=5, inner_cv=3
)
nested_score = np.mean(nested_scores)
results.append({
'Model': model_name,
'Simple CV': simple_score,
'Nested CV': nested_score,
'Difference': simple_score - nested_score
})
results_df = pd.DataFrame(results)
print("\n" + "="*50)
print("Comparison: Simple CV vs Nested CV")
print(results_df.to_string(index=False))
print("\nNote: Simple CV typically overestimates performance!")
# Visualize
fig, ax = plt.subplots(figsize=(10, 6))
x = np.arange(len(results_df))
width = 0.35
ax.bar(x - width/2, results_df['Simple CV'], width,
label='Simple CV', alpha=0.7)
ax.bar(x + width/2, results_df['Nested CV'], width,
label='Nested CV', alpha=0.7)
ax.set_xlabel('Model')
ax.set_ylabel('Accuracy')
ax.set_title('Simple CV vs Nested CV: Overfitting in Model Selection')
ax.set_xticks(x)
ax.set_xticklabels(results_df['Model'])
ax.legend()
ax.grid(True, alpha=0.3)
# Add difference annotations
for i, diff in enumerate(results_df['Difference']):
ax.annotate(f'Ī={diff:.3f}',
xy=(i, max(results_df['Simple CV'][i], results_df['Nested CV'][i])),
xytext=(0, 5), textcoords='offset points',
ha='center', fontsize=9)
plt.tight_layout()
plt.show()
return results_df
comparison_results = compare_nested_vs_simple_cv(X, y)
Custom Cross-validation Strategies
Building Your Own CV Splitters
# Custom cross-validation implementations
from sklearn.model_selection import BaseCrossValidator
from sklearn.utils.validation import check_array
class MonteCarloCV(BaseCrossValidator):
"""Monte Carlo (random) cross-validation"""
def __init__(self, n_splits=10, test_size=0.2, random_state=None):
self.n_splits = n_splits
self.test_size = test_size
self.random_state = random_state
def split(self, X, y=None, groups=None):
X = check_array(X)
n_samples = len(X)
n_test = int(n_samples * self.test_size)
rng = np.random.RandomState(self.random_state)
for _ in range(self.n_splits):
# Random permutation
indices = rng.permutation(n_samples)
test_idx = indices[:n_test]
train_idx = indices[n_test:]
yield train_idx, test_idx
def get_n_splits(self, X=None, y=None, groups=None):
return self.n_splits
# Test custom CV
mccv = MonteCarloCV(n_splits=5, test_size=0.3, random_state=42)
model = LogisticRegression(random_state=42, max_iter=1000)
scores = cross_val_score(model, X, y, cv=mccv, scoring='accuracy')
print(f"Monte Carlo CV scores: {scores}")
print(f"Mean: {scores.mean():.3f} (+/- {scores.std() * 2:.3f})")
# Stratified Group K-Fold (custom implementation)
class CustomStratifiedGroupKFold:
"""Stratified K-Fold that respects group boundaries"""
def __init__(self, n_splits=5):
self.n_splits = n_splits
def split(self, X, y, groups):
"""Generate indices to split data while keeping groups and stratification"""
# Group by groups and get class distribution per group
unique_groups = np.unique(groups)
n_groups = len(unique_groups)
# Calculate class distribution for each group
group_classes = {}
for group in unique_groups:
group_mask = groups == group
group_y = y[group_mask]
# Get majority class for this group
unique, counts = np.unique(group_y, return_counts=True)
majority_class = unique[np.argmax(counts)]
group_classes[group] = majority_class
# Split groups based on their majority class
groups_per_class = {}
for group, cls in group_classes.items():
if cls not in groups_per_class:
groups_per_class[cls] = []
groups_per_class[cls].append(group)
# Create folds maintaining class balance
fold_groups = [[] for _ in range(self.n_splits)]
for cls, class_groups in groups_per_class.items():
# Distribute groups of this class across folds
n_groups_in_class = len(class_groups)
for i, group in enumerate(class_groups):
fold_idx = i % self.n_splits
fold_groups[fold_idx].append(group)
# Generate train/test indices for each fold
for fold in range(self.n_splits):
test_groups = fold_groups[fold]
train_groups = []
for f in range(self.n_splits):
if f != fold:
train_groups.extend(fold_groups[f])
# Get indices for train and test
train_idx = [i for i, g in enumerate(groups) if g in train_groups]
test_idx = [i for i, g in enumerate(groups) if g in test_groups]
yield np.array(train_idx), np.array(test_idx)
# Visualization of different CV strategies
def visualize_cv_comparison():
"""Compare different CV strategies visually"""
n_samples = 100
n_features = 2
# Create data with structure
X = np.random.randn(n_samples, n_features)
y = np.array([0] * 70 + [1] * 30) # Imbalanced
groups = np.repeat(np.arange(20), 5) # 20 groups, 5 samples each
cv_strategies = {
'KFold': KFold(n_splits=5, shuffle=True, random_state=42),
'StratifiedKFold': StratifiedKFold(n_splits=5, shuffle=True, random_state=42),
'GroupKFold': GroupKFold(n_splits=5),
'TimeSeriesSplit': TimeSeriesSplit(n_splits=5),
'MonteCarloCV': MonteCarloCV(n_splits=5, test_size=0.2, random_state=42)
}
fig, axes = plt.subplots(len(cv_strategies), 1, figsize=(14, 2 * len(cv_strategies)))
for ax, (name, cv) in zip(axes, cv_strategies.items()):
# Create visualization matrix
if name == 'GroupKFold':
splits = list(cv.split(X, y, groups))
elif name in ['StratifiedKFold']:
splits = list(cv.split(X, y))
else:
splits = list(cv.split(X))
n_splits = len(splits)
split_matrix = np.zeros((n_splits, n_samples))
for fold, (train_idx, test_idx) in enumerate(splits):
split_matrix[fold, train_idx] = 1
split_matrix[fold, test_idx] = 2
im = ax.imshow(split_matrix, aspect='auto', cmap='coolwarm')
ax.set_title(f'{name} Strategy')
ax.set_xlabel('Sample Index')
ax.set_ylabel('Fold')
ax.set_yticks(range(n_splits))
ax.set_yticklabels([f'Fold {i+1}' for i in range(n_splits)])
plt.suptitle('Comparison of Cross-validation Strategies', fontsize=14, y=1.01)
plt.tight_layout()
plt.show()
visualize_cv_comparison()
Best Practices and Guidelines
# Cross-validation best practices
class CVBestPractices:
"""Guidelines for choosing and using cross-validation"""
@staticmethod
def choose_cv_strategy(data_type, n_samples, characteristics):
"""Recommend CV strategy based on data characteristics"""
recommendations = {
'small_balanced': {
'strategy': 'LeaveOneOut or 10-Fold',
'reason': 'Maximum use of limited data'
},
'small_imbalanced': {
'strategy': 'StratifiedKFold',
'reason': 'Maintains class distribution in small samples'
},
'large_balanced': {
'strategy': '5-Fold or 10-Fold',
'reason': 'Good balance of bias-variance and computation'
},
'large_imbalanced': {
'strategy': 'StratifiedKFold',
'reason': 'Maintains class distribution'
},
'time_series': {
'strategy': 'TimeSeriesSplit',
'reason': 'Respects temporal order'
},
'grouped': {
'strategy': 'GroupKFold',
'reason': 'Prevents data leakage between groups'
},
'model_selection': {
'strategy': 'Nested CV',
'reason': 'Unbiased performance estimation'
}
}
# Determine recommendation
if 'time' in characteristics:
key = 'time_series'
elif 'groups' in characteristics:
key = 'grouped'
elif 'model_selection' in characteristics:
key = 'model_selection'
elif n_samples < 100:
key = 'small_imbalanced' if 'imbalanced' in characteristics else 'small_balanced'
else:
key = 'large_imbalanced' if 'imbalanced' in characteristics else 'large_balanced'
return recommendations[key]
@staticmethod
def common_mistakes():
"""List common cross-validation mistakes"""
mistakes = [
{
'mistake': 'Using test set for any decision making',
'consequence': 'Overfitting to test set',
'solution': 'Use validation set or cross-validation for model selection'
},
{
'mistake': 'Not stratifying with imbalanced data',
'consequence': 'Some folds may have no minority class samples',
'solution': 'Always use StratifiedKFold for imbalanced data'
},
{
'mistake': 'Data leakage in preprocessing',
'consequence': 'Overoptimistic performance estimates',
'solution': 'Always preprocess within CV folds using pipelines'
},
{
'mistake': 'Using standard k-fold for time series',
'consequence': 'Future data leaks into training',
'solution': 'Use TimeSeriesSplit or custom time-aware CV'
},
{
'mistake': 'Not considering computational cost',
'consequence': 'Extremely long training times',
'solution': 'Balance k with computational resources'
}
]
return mistakes
@staticmethod
def cv_checklist():
"""Cross-validation checklist"""
checklist = """
Cross-validation Checklist:
ā 1. Choose appropriate CV strategy for your data
ā 2. Use stratification for imbalanced classification
ā 3. Respect temporal order for time series
ā 4. Keep groups together if data is grouped
ā 5. Use pipelines to prevent preprocessing leakage
ā 6. Set random_state for reproducibility
ā 7. Use nested CV for model selection + evaluation
ā 8. Consider computational cost vs statistical benefit
ā 9. Report mean and standard deviation of scores
ā 10. Visualize results across folds to spot issues
"""
return checklist
# Print best practices
practices = CVBestPractices()
print("Common Cross-validation Mistakes:")
print("="*50)
for mistake in practices.common_mistakes():
print(f"\nā Mistake: {mistake['mistake']}")
print(f" Consequence: {mistake['consequence']}")
print(f" ā Solution: {mistake['solution']}")
print("\n" + practices.cv_checklist())
# Example: Complete CV workflow
def complete_cv_workflow(X, y):
"""Demonstrate complete cross-validation workflow"""
print("Complete Cross-validation Workflow")
print("="*50)
# 1. Analyze data
n_samples = len(X)
n_features = X.shape[1]
n_classes = len(np.unique(y))
class_balance = np.bincount(y) / len(y)
print(f"Dataset: {n_samples} samples, {n_features} features, {n_classes} classes")
print(f"Class distribution: {class_balance}")
# 2. Choose CV strategy
if min(class_balance) < 0.2:
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_name = "StratifiedKFold"
else:
cv = KFold(n_splits=5, shuffle=True, random_state=42)
cv_name = "KFold"
print(f"Selected CV: {cv_name}")
# 3. Create pipeline to prevent leakage
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression(random_state=42, max_iter=1000))
])
# 4. Perform cross-validation with multiple metrics
scoring = ['accuracy', 'precision_weighted', 'recall_weighted', 'f1_weighted']
cv_results = cross_validate(
pipeline, X, y, cv=cv,
scoring=scoring,
return_train_score=True,
n_jobs=-1
)
# 5. Analyze results
results_summary = {}
for metric in scoring:
train_key = f'train_{metric}'
test_key = f'test_{metric}'
results_summary[metric] = {
'train_mean': cv_results[train_key].mean(),
'train_std': cv_results[train_key].std(),
'test_mean': cv_results[test_key].mean(),
'test_std': cv_results[test_key].std(),
'overfit': cv_results[train_key].mean() - cv_results[test_key].mean()
}
# 6. Display results
print("\nResults Summary:")
for metric, scores in results_summary.items():
print(f"\n{metric.upper()}:")
print(f" Train: {scores['train_mean']:.3f} ± {scores['train_std']:.3f}")
print(f" Test: {scores['test_mean']:.3f} ± {scores['test_std']:.3f}")
print(f" Overfit: {scores['overfit']:.3f}")
return cv_results
# Run complete workflow
X_demo, y_demo = make_classification(n_samples=500, n_features=10,
n_classes=3, weights=[0.7, 0.2, 0.1],
random_state=42)
cv_results_demo = complete_cv_workflow(X_demo, y_demo)
Practice Exercises
Exercise 1: Custom Time-Aware CV
Create a cross-validation strategy that:
- Handles time series with seasonal patterns
- Ensures test sets are always in the future
- Maintains minimum training size
- Allows for gaps between train and test
- Provides walk-forward analysis
Exercise 2: Adaptive CV Selection
Build a system that:
- Automatically analyzes dataset characteristics
- Recommends optimal CV strategy
- Detects potential issues (imbalance, groups, time)
- Runs appropriate cross-validation
- Generates comprehensive report
Exercise 3: CV Performance Analysis
Develop a framework that:
- Performs multiple CV strategies
- Compares computational cost
- Analyzes variance across folds
- Detects overfitting
- Visualizes results comprehensively
Key Takeaways
- š Cross-validation provides robust performance estimates
- š K-Fold is standard, but choose k based on dataset size
- āļø Use StratifiedKFold for imbalanced classification
- š TimeSeriesSplit preserves temporal order
- š„ GroupKFold prevents leakage between groups
- šÆ LOO maximizes training data but is computationally expensive
- šØ Nested CV prevents overfitting in model selection
- ā ļø Always use pipelines to prevent preprocessing leakage
- š Report mean and standard deviation of CV scores