Skip to main content

Train/Test Splits

The Foundation of Model Evaluation! 🎯

Proper train/test splitting is the cornerstone of reliable machine learning. It's not just about randomly dividing dataβ€”it's about creating representative splits that prevent overfitting, avoid data leakage, and provide honest performance estimates. Master these techniques to build models that generalize well to real-world data.

Why Train/Test Splits Matter

graph TD A[Complete Dataset] --> B{Split Strategy} B --> C[Random Split] B --> D[Stratified Split] B --> E[Time Series Split] B --> F[Group Split] B --> G[Multi-level Split] C --> H[Training Set
60-80%] C --> I[Test Set
20-40%] G --> J[Training Set
60%] G --> K[Validation Set
20%] G --> L[Test Set
20%] H --> M[Model Training] M --> N[Predictions] I --> N N --> O[Evaluation] P[Data Leakage] -.->|Avoid| B Q[Class Imbalance] -.->|Consider| D R[Temporal Order] -.->|Maintain| E

Basic Train/Test Split

Simple Random Split

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification, make_regression
import matplotlib.pyplot as plt
import seaborn as sns

# Set style
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

# Generate sample data
X, y = make_classification(n_samples=1000, n_features=20, n_informative=15,
                          n_redundant=5, n_classes=2, random_state=42)

# Basic train/test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, 
    test_size=0.2,      # 20% for testing
    random_state=42     # For reproducibility
)

print(f"Original dataset size: {X.shape[0]} samples")
print(f"Training set size: {X_train.shape[0]} samples ({X_train.shape[0]/X.shape[0]:.1%})")
print(f"Test set size: {X_test.shape[0]} samples ({X_test.shape[0]/X.shape[0]:.1%})")

# Verify no overlap
train_indices = set(range(len(X_train)))
test_indices = set(range(len(X_test)))
print(f"Overlap between train and test: {len(train_indices.intersection(test_indices))} samples")

# Different split ratios
split_ratios = [0.1, 0.2, 0.25, 0.3, 0.4]
results = []

for ratio in split_ratios:
    X_train_temp, X_test_temp, _, _ = train_test_split(
        X, y, test_size=ratio, random_state=42
    )
    results.append({
        'Test Ratio': ratio,
        'Train Size': len(X_train_temp),
        'Test Size': len(X_test_temp),
        'Train %': f"{len(X_train_temp)/len(X):.1%}",
        'Test %': f"{len(X_test_temp)/len(X):.1%}"
    })

results_df = pd.DataFrame(results)
print("\nDifferent Split Ratios:")
print(results_df.to_string(index=False))

# Visualize split distribution
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

# Distribution of target variable
train_counts = pd.Series(y_train).value_counts()
test_counts = pd.Series(y_test).value_counts()

axes[0].bar(['Class 0', 'Class 1'], train_counts.values, alpha=0.7, label='Train')
axes[0].bar(['Class 0', 'Class 1'], test_counts.values, alpha=0.7, label='Test')
axes[0].set_title('Class Distribution: Train vs Test')
axes[0].set_ylabel('Count')
axes[0].legend()

# Feature distribution comparison
feature_idx = 0  # First feature
axes[1].hist(X_train[:, feature_idx], bins=30, alpha=0.5, label='Train', density=True)
axes[1].hist(X_test[:, feature_idx], bins=30, alpha=0.5, label='Test', density=True)
axes[1].set_title(f'Feature {feature_idx} Distribution')
axes[1].set_xlabel('Value')
axes[1].set_ylabel('Density')
axes[1].legend()

plt.tight_layout()
plt.show()

Stratified Splitting

Maintaining Class Balance

# Stratified split for classification problems

from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd

# Create imbalanced dataset
np.random.seed(42)
n_samples = 1000
n_class_0 = 900  # Majority class
n_class_1 = 100  # Minority class

X = np.vstack([
    np.random.randn(n_class_0, 2),
    np.random.randn(n_class_1, 2) + 3
])
y = np.array([0] * n_class_0 + [1] * n_class_1)

print("Original class distribution:")
print(pd.Series(y).value_counts(normalize=True))

# Compare random vs stratified split
fig, axes = plt.subplots(2, 3, figsize=(15, 10))

# Multiple random splits
for i in range(3):
    X_train_rand, X_test_rand, y_train_rand, y_test_rand = train_test_split(
        X, y, test_size=0.2, random_state=i  # Different seeds
    )
    
    # Plot class distributions
    train_dist = pd.Series(y_train_rand).value_counts(normalize=True)
    test_dist = pd.Series(y_test_rand).value_counts(normalize=True)
    
    x_pos = np.arange(2)
    width = 0.35
    
    axes[0, i].bar(x_pos - width/2, train_dist.values, width, label='Train', alpha=0.7)
    axes[0, i].bar(x_pos + width/2, test_dist.values, width, label='Test', alpha=0.7)
    axes[0, i].set_title(f'Random Split {i+1}')
    axes[0, i].set_xlabel('Class')
    axes[0, i].set_ylabel('Proportion')
    axes[0, i].set_xticks(x_pos)
    axes[0, i].set_xticklabels(['Class 0', 'Class 1'])
    axes[0, i].legend()
    axes[0, i].set_ylim([0, 1])

# Stratified splits
for i in range(3):
    X_train_strat, X_test_strat, y_train_strat, y_test_strat = train_test_split(
        X, y, test_size=0.2, random_state=i, stratify=y  # Stratified
    )
    
    # Plot class distributions
    train_dist = pd.Series(y_train_strat).value_counts(normalize=True)
    test_dist = pd.Series(y_test_strat).value_counts(normalize=True)
    
    axes[1, i].bar(x_pos - width/2, train_dist.values, width, label='Train', alpha=0.7)
    axes[1, i].bar(x_pos + width/2, test_dist.values, width, label='Test', alpha=0.7)
    axes[1, i].set_title(f'Stratified Split {i+1}')
    axes[1, i].set_xlabel('Class')
    axes[1, i].set_ylabel('Proportion')
    axes[1, i].set_xticks(x_pos)
    axes[1, i].set_xticklabels(['Class 0', 'Class 1'])
    axes[1, i].legend()
    axes[1, i].set_ylim([0, 1])

plt.suptitle('Random vs Stratified Splitting on Imbalanced Data', fontsize=14)
plt.tight_layout()
plt.show()

# Stratified split for regression (using bins)
def stratified_split_regression(X, y, test_size=0.2, n_bins=5, random_state=42):
    """
    Stratified split for regression by binning the target variable
    """
    # Create bins for stratification
    y_binned = pd.qcut(y, q=n_bins, labels=False, duplicates='drop')
    
    # Perform stratified split
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, stratify=y_binned, random_state=random_state
    )
    
    return X_train, X_test, y_train, y_test

# Example with regression data
X_reg, y_reg = make_regression(n_samples=1000, n_features=10, noise=10, random_state=42)

# Apply stratified split for regression
X_train_reg, X_test_reg, y_train_reg, y_test_reg = stratified_split_regression(
    X_reg, y_reg, test_size=0.2
)

# Verify distribution similarity
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

axes[0].hist(y_train_reg, bins=30, alpha=0.5, label='Train', density=True)
axes[0].hist(y_test_reg, bins=30, alpha=0.5, label='Test', density=True)
axes[0].set_title('Target Distribution after Stratified Split (Regression)')
axes[0].set_xlabel('Target Value')
axes[0].set_ylabel('Density')
axes[0].legend()

# Q-Q plot to compare distributions
from scipy import stats
stats.probplot(y_test_reg, dist="norm", plot=axes[1])
axes[1].set_title('Q-Q Plot of Test Set Target')

plt.tight_layout()
plt.show()

Time Series Splits

Temporal Data Splitting

# Time series splitting strategies

from sklearn.model_selection import TimeSeriesSplit
import pandas as pd
import numpy as np

# Generate time series data
np.random.seed(42)
dates = pd.date_range('2020-01-01', periods=365, freq='D')
trend = np.linspace(100, 200, 365)
seasonal = 10 * np.sin(np.arange(365) * 2 * np.pi / 30)  # Monthly seasonality
noise = np.random.normal(0, 5, 365)
values = trend + seasonal + noise

df = pd.DataFrame({
    'date': dates,
    'value': values,
    'feature1': np.random.randn(365),
    'feature2': np.random.randn(365)
})

print(f"Time series data from {df['date'].min()} to {df['date'].max()}")

# Method 1: Simple time-based split
def time_based_split(df, train_ratio=0.8):
    """Simple chronological split"""
    n_train = int(len(df) * train_ratio)
    train = df.iloc[:n_train]
    test = df.iloc[n_train:]
    return train, test

train_simple, test_simple = time_based_split(df, train_ratio=0.8)

print(f"\nSimple Time Split:")
print(f"Train: {train_simple['date'].min()} to {train_simple['date'].max()}")
print(f"Test: {test_simple['date'].min()} to {test_simple['date'].max()}")

# Method 2: TimeSeriesSplit for cross-validation
tscv = TimeSeriesSplit(n_splits=5)

fig, axes = plt.subplots(5, 1, figsize=(12, 10))

for i, (train_idx, test_idx) in enumerate(tscv.split(df)):
    train_dates = df.iloc[train_idx]['date']
    test_dates = df.iloc[test_idx]['date']
    
    axes[i].scatter(df.iloc[train_idx]['date'], df.iloc[train_idx]['value'], 
                   alpha=0.5, label='Train', s=10)
    axes[i].scatter(df.iloc[test_idx]['date'], df.iloc[test_idx]['value'], 
                   alpha=0.5, label='Test', s=10, color='red')
    axes[i].set_title(f'Split {i+1}')
    axes[i].set_ylabel('Value')
    axes[i].legend()
    
    # Add vertical line to show split point
    split_date = df.iloc[train_idx[-1]]['date']
    axes[i].axvline(x=split_date, color='black', linestyle='--', alpha=0.5)

axes[-1].set_xlabel('Date')
plt.suptitle('Time Series Cross-Validation Splits', fontsize=14)
plt.tight_layout()
plt.show()

# Method 3: Expanding window (for time series)
class ExpandingWindowSplit:
    """Expanding window for time series validation"""
    
    def __init__(self, initial_train_size, test_size, step_size=1):
        self.initial_train_size = initial_train_size
        self.test_size = test_size
        self.step_size = step_size
    
    def split(self, X):
        n = len(X)
        splits = []
        
        train_end = self.initial_train_size
        
        while train_end + self.test_size <= n:
            train_idx = list(range(train_end))
            test_idx = list(range(train_end, min(train_end + self.test_size, n)))
            
            splits.append((train_idx, test_idx))
            train_end += self.step_size
        
        return splits

# Method 4: Sliding window
class SlidingWindowSplit:
    """Fixed-size sliding window for time series"""
    
    def __init__(self, train_size, test_size, step_size=1):
        self.train_size = train_size
        self.test_size = test_size
        self.step_size = step_size
    
    def split(self, X):
        n = len(X)
        splits = []
        
        start = 0
        
        while start + self.train_size + self.test_size <= n:
            train_idx = list(range(start, start + self.train_size))
            test_idx = list(range(start + self.train_size, 
                                 start + self.train_size + self.test_size))
            
            splits.append((train_idx, test_idx))
            start += self.step_size
        
        return splits

# Visualize different time series split strategies
fig, axes = plt.subplots(3, 1, figsize=(14, 10))

# Expanding window
expanding = ExpandingWindowSplit(initial_train_size=100, test_size=30, step_size=30)
splits_expanding = expanding.split(df)

for i, (train_idx, test_idx) in enumerate(splits_expanding[:5]):
    color = plt.cm.viridis(i / 5)
    axes[0].scatter(df.iloc[train_idx]['date'], [i] * len(train_idx), 
                   alpha=0.7, s=5, color=color)
    axes[0].scatter(df.iloc[test_idx]['date'], [i] * len(test_idx), 
                   alpha=0.7, s=5, color='red')

axes[0].set_title('Expanding Window Strategy')
axes[0].set_ylabel('Split Number')
axes[0].set_ylim([-0.5, 4.5])

# Sliding window
sliding = SlidingWindowSplit(train_size=100, test_size=30, step_size=30)
splits_sliding = sliding.split(df)

for i, (train_idx, test_idx) in enumerate(splits_sliding[:5]):
    color = plt.cm.viridis(i / 5)
    axes[1].scatter(df.iloc[train_idx]['date'], [i] * len(train_idx), 
                   alpha=0.7, s=5, color=color)
    axes[1].scatter(df.iloc[test_idx]['date'], [i] * len(test_idx), 
                   alpha=0.7, s=5, color='red')

axes[1].set_title('Sliding Window Strategy')
axes[1].set_ylabel('Split Number')
axes[1].set_ylim([-0.5, 4.5])

# TimeSeriesSplit (sklearn)
tscv = TimeSeriesSplit(n_splits=5)
for i, (train_idx, test_idx) in enumerate(tscv.split(df)):
    color = plt.cm.viridis(i / 5)
    axes[2].scatter(df.iloc[train_idx]['date'], [i] * len(train_idx), 
                   alpha=0.7, s=5, color=color)
    axes[2].scatter(df.iloc[test_idx]['date'], [i] * len(test_idx), 
                   alpha=0.7, s=5, color='red')

axes[2].set_title('TimeSeriesSplit (sklearn)')
axes[2].set_ylabel('Split Number')
axes[2].set_xlabel('Date')
axes[2].set_ylim([-0.5, 4.5])

plt.tight_layout()
plt.show()

Advanced Splitting Strategies

Group-Based and Multi-Level Splits

# Advanced splitting strategies

from sklearn.model_selection import GroupShuffleSplit, GroupKFold, StratifiedGroupKFold
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np

# Create grouped data (e.g., multiple samples per patient/user)
np.random.seed(42)
n_groups = 50
n_samples_per_group = 20

data = []
for group_id in range(n_groups):
    for sample in range(n_samples_per_group):
        data.append({
            'group_id': group_id,
            'feature1': np.random.randn(),
            'feature2': np.random.randn(),
            'target': np.random.choice([0, 1])
        })

df = pd.DataFrame(data)
X = df[['feature1', 'feature2']].values
y = df['target'].values
groups = df['group_id'].values

print(f"Dataset: {len(df)} samples from {df['group_id'].nunique()} groups")
print(f"Samples per group: {df['group_id'].value_counts().describe()}")

# Method 1: GroupShuffleSplit - Ensures groups don't overlap
gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42)

for train_idx, test_idx in gss.split(X, y, groups):
    X_train, X_test = X[train_idx], X[test_idx]
    y_train, y_test = y[train_idx], y[test_idx]
    groups_train, groups_test = groups[train_idx], groups[test_idx]

print(f"\nGroupShuffleSplit:")
print(f"Train: {len(X_train)} samples from {len(np.unique(groups_train))} groups")
print(f"Test: {len(X_test)} samples from {len(np.unique(groups_test))} groups")
print(f"Group overlap: {len(set(groups_train).intersection(set(groups_test)))} groups")

# Method 2: GroupKFold for cross-validation
gkf = GroupKFold(n_splits=5)

splits_info = []
for fold, (train_idx, test_idx) in enumerate(gkf.split(X, y, groups)):
    train_groups = np.unique(groups[train_idx])
    test_groups = np.unique(groups[test_idx])
    
    splits_info.append({
        'Fold': fold + 1,
        'Train Samples': len(train_idx),
        'Test Samples': len(test_idx),
        'Train Groups': len(train_groups),
        'Test Groups': len(test_groups),
        'Group Overlap': len(set(train_groups).intersection(set(test_groups)))
    })

splits_df = pd.DataFrame(splits_info)
print("\nGroupKFold Splits:")
print(splits_df.to_string(index=False))

# Method 3: Multi-level split (train/val/test)
def multi_level_split(X, y, train_ratio=0.6, val_ratio=0.2, test_ratio=0.2, 
                      stratify=None, random_state=42):
    """
    Split data into train/validation/test sets
    """
    assert train_ratio + val_ratio + test_ratio == 1.0, "Ratios must sum to 1"
    
    # First split: separate test set
    X_temp, X_test, y_temp, y_test = train_test_split(
        X, y, 
        test_size=test_ratio,
        stratify=stratify if stratify is not None else None,
        random_state=random_state
    )
    
    # Second split: separate train and validation
    val_ratio_adjusted = val_ratio / (train_ratio + val_ratio)
    
    if stratify is not None:
        # Need to adjust stratify array for temp split
        stratify_temp = stratify[:len(X_temp)]
    else:
        stratify_temp = None
    
    X_train, X_val, y_train, y_val = train_test_split(
        X_temp, y_temp,
        test_size=val_ratio_adjusted,
        stratify=y_temp if stratify is not None else None,
        random_state=random_state
    )
    
    return X_train, X_val, X_test, y_train, y_val, y_test

# Apply multi-level split
X_multi, y_multi = make_classification(n_samples=1000, n_features=10, 
                                       n_classes=3, random_state=42)

X_train, X_val, X_test, y_train, y_val, y_test = multi_level_split(
    X_multi, y_multi, 
    train_ratio=0.6, 
    val_ratio=0.2, 
    test_ratio=0.2,
    stratify=y_multi
)

print(f"\nMulti-level Split:")
print(f"Train: {len(X_train)} samples ({len(X_train)/len(X_multi):.1%})")
print(f"Validation: {len(X_val)} samples ({len(X_val)/len(X_multi):.1%})")
print(f"Test: {len(X_test)} samples ({len(X_test)/len(X_multi):.1%})")

# Visualize multi-level split
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

datasets = [
    ('Training Set', y_train),
    ('Validation Set', y_val),
    ('Test Set', y_test)
]

for ax, (name, y_data) in zip(axes, datasets):
    unique, counts = np.unique(y_data, return_counts=True)
    ax.bar(unique, counts, alpha=0.7)
    ax.set_title(f'{name}\n(n={len(y_data)})')
    ax.set_xlabel('Class')
    ax.set_ylabel('Count')
    ax.set_xticks(unique)
    
    # Add percentage labels
    for i, (u, c) in enumerate(zip(unique, counts)):
        ax.text(u, c, f'{c/len(y_data):.1%}', 
               ha='center', va='bottom')

plt.suptitle('Multi-Level Split: Class Distribution', fontsize=14)
plt.tight_layout()
plt.show()

# Method 4: Nested splits for hyperparameter tuning
class NestedSplit:
    """
    Nested splitting for unbiased model evaluation
    Outer loop: Model evaluation
    Inner loop: Hyperparameter tuning
    """
    
    def __init__(self, outer_splits=5, inner_splits=3):
        self.outer_splits = outer_splits
        self.inner_splits = inner_splits
    
    def split(self, X, y):
        from sklearn.model_selection import KFold
        
        outer_cv = KFold(n_splits=self.outer_splits, shuffle=True, random_state=42)
        inner_cv = KFold(n_splits=self.inner_splits, shuffle=True, random_state=42)
        
        nested_scores = []
        
        for outer_fold, (train_val_idx, test_idx) in enumerate(outer_cv.split(X, y)):
            X_train_val, X_test = X[train_val_idx], X[test_idx]
            y_train_val, y_test = y[train_val_idx], y[test_idx]
            
            # Inner loop for hyperparameter tuning
            inner_scores = []
            for inner_fold, (train_idx, val_idx) in enumerate(inner_cv.split(X_train_val, y_train_val)):
                X_train = X_train_val[train_idx]
                X_val = X_train_val[val_idx]
                y_train = y_train_val[train_idx]
                y_val = y_train_val[val_idx]
                
                inner_scores.append({
                    'outer_fold': outer_fold,
                    'inner_fold': inner_fold,
                    'train_size': len(X_train),
                    'val_size': len(X_val),
                    'test_size': len(X_test)
                })
            
            nested_scores.extend(inner_scores)
        
        return pd.DataFrame(nested_scores)

# Example of nested splits
nested = NestedSplit(outer_splits=3, inner_splits=3)
nested_df = nested.split(X_multi, y_multi)

print("\nNested Cross-Validation Structure:")
print(nested_df.groupby('outer_fold').agg({
    'inner_fold': 'count',
    'train_size': 'mean',
    'val_size': 'mean',
    'test_size': 'mean'
}).round().astype(int))

Data Leakage Prevention

Avoiding Common Pitfalls

# Data leakage examples and prevention

from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
import numpy as np
import pandas as pd

# Generate data
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
                          n_redundant=10, random_state=42)

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

print("Data Leakage Examples:\n")

# ❌ WRONG: Scaling before splitting (data leakage!)
def wrong_preprocessing():
    """Incorrect: Preprocessing before splitting causes leakage"""
    # Scale the entire dataset (WRONG!)
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)  # Fit on ALL data - leakage!
    
    # Then split
    X_train_wrong, X_test_wrong, y_train_wrong, y_test_wrong = train_test_split(
        X_scaled, y, test_size=0.2, random_state=42
    )
    
    # Train model
    model = LogisticRegression(random_state=42)
    model.fit(X_train_wrong, y_train_wrong)
    score = model.score(X_test_wrong, y_test_wrong)
    
    return score

# βœ… CORRECT: Scaling after splitting
def correct_preprocessing():
    """Correct: Preprocessing after splitting prevents leakage"""
    # Split first
    X_train_correct, X_test_correct, y_train_correct, y_test_correct = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    # Scale training data
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train_correct)  # Fit only on training
    X_test_scaled = scaler.transform(X_test_correct)  # Transform only on test
    
    # Train model
    model = LogisticRegression(random_state=42)
    model.fit(X_train_scaled, y_train_correct)
    score = model.score(X_test_scaled, y_test_correct)
    
    return score

wrong_score = wrong_preprocessing()
correct_score = correct_preprocessing()

print(f"❌ Wrong approach (with leakage): {wrong_score:.3f}")
print(f"βœ… Correct approach (no leakage): {correct_score:.3f}")
print(f"Difference: {wrong_score - correct_score:.3f}")

# More leakage examples
class DataLeakageExamples:
    """Common data leakage scenarios and solutions"""
    
    @staticmethod
    def feature_selection_leakage():
        """Feature selection leakage example"""
        print("\n2. Feature Selection Leakage:")
        
        # ❌ WRONG: Select features on all data
        selector = SelectKBest(f_classif, k=10)
        X_selected_wrong = selector.fit_transform(X, y)  # Fit on all data!
        X_train_wrong, X_test_wrong, y_train_wrong, y_test_wrong = train_test_split(
            X_selected_wrong, y, test_size=0.2, random_state=42
        )
        
        model = LogisticRegression(random_state=42)
        model.fit(X_train_wrong, y_train_wrong)
        wrong_score = model.score(X_test_wrong, y_test_wrong)
        
        # βœ… CORRECT: Select features only on training data
        X_train_correct, X_test_correct, y_train_correct, y_test_correct = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        
        selector = SelectKBest(f_classif, k=10)
        X_train_selected = selector.fit_transform(X_train_correct, y_train_correct)
        X_test_selected = selector.transform(X_test_correct)  # Transform only
        
        model = LogisticRegression(random_state=42)
        model.fit(X_train_selected, y_train_correct)
        correct_score = model.score(X_test_selected, y_test_correct)
        
        print(f"❌ With leakage: {wrong_score:.3f}")
        print(f"βœ… Without leakage: {correct_score:.3f}")
    
    @staticmethod
    def duplicate_samples_leakage():
        """Duplicate samples causing leakage"""
        print("\n3. Duplicate Samples Leakage:")
        
        # Create data with duplicates
        X_with_dups = np.vstack([X[:100], X[:100]])  # Duplicate first 100 samples
        y_with_dups = np.hstack([y[:100], y[:100]])
        
        # Random split might put duplicates in both train and test
        X_train_dups, X_test_dups, y_train_dups, y_test_dups = train_test_split(
            X_with_dups, y_with_dups, test_size=0.2, random_state=42
        )
        
        # Check for duplicates between train and test
        train_df = pd.DataFrame(X_train_dups)
        test_df = pd.DataFrame(X_test_dups)
        
        # Find duplicates
        merged = train_df.merge(test_df, how='inner', 
                               left_on=list(range(20)), right_on=list(range(20)))
        
        print(f"Number of duplicate samples between train and test: {len(merged)}")
        print("Solution: Remove duplicates before splitting or use GroupKFold")
    
    @staticmethod
    def target_leakage():
        """Target leakage from feature engineering"""
        print("\n4. Target Leakage from Features:")
        
        # Create features that leak target information
        np.random.seed(42)
        X_leak = np.random.randn(1000, 5)
        y_leak = np.random.choice([0, 1], 1000)
        
        # Add a feature that's highly correlated with target (leakage!)
        X_leak = np.column_stack([X_leak, y_leak + np.random.normal(0, 0.1, 1000)])
        
        # This will show artificially high performance
        X_train_leak, X_test_leak, y_train_leak, y_test_leak = train_test_split(
            X_leak, y_leak, test_size=0.2, random_state=42
        )
        
        model = LogisticRegression(random_state=42)
        model.fit(X_train_leak, y_train_leak)
        leak_score = model.score(X_test_leak, y_test_leak)
        
        # Without leaky feature
        model_clean = LogisticRegression(random_state=42)
        model_clean.fit(X_train_leak[:, :5], y_train_leak)  # Exclude leaky feature
        clean_score = model_clean.score(X_test_leak[:, :5], y_test_leak)
        
        print(f"With target leakage: {leak_score:.3f}")
        print(f"Without target leakage: {clean_score:.3f}")

# Run leakage examples
examples = DataLeakageExamples()
examples.feature_selection_leakage()
examples.duplicate_samples_leakage()
examples.target_leakage()

# Best practice: Use pipelines to prevent leakage
print("\nβœ… BEST PRACTICE: Use Pipelines")

# Create pipeline that ensures proper order of operations
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('poly', PolynomialFeatures(degree=2, include_bias=False)),
    ('selector', SelectKBest(f_classif, k=10)),
    ('classifier', LogisticRegression(random_state=42))
])

# Pipeline ensures transformations are fit only on training data
X_train_pipe, X_test_pipe, y_train_pipe, y_test_pipe = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Fit pipeline (all preprocessing steps fit only on training data)
pipeline.fit(X_train_pipe, y_train_pipe)

# Score (preprocessing steps only transform test data)
pipeline_score = pipeline.score(X_test_pipe, y_test_pipe)
print(f"Pipeline score (no leakage): {pipeline_score:.3f}")

# Cross-validation also prevents leakage
cv_scores = cross_val_score(pipeline, X, y, cv=5)
print(f"Cross-validation scores: {cv_scores}")
print(f"Mean CV score: {cv_scores.mean():.3f} (+/- {cv_scores.std() * 2:.3f})")

Validation Strategies

Choosing the Right Split Strategy

# Comprehensive validation strategy guide

class ValidationStrategyGuide:
    """Guide for choosing appropriate validation strategies"""
    
    def __init__(self):
        self.strategies = {
            'random_split': {
                'when_to_use': [
                    'Independent and identically distributed (i.i.d.) data',
                    'No temporal component',
                    'No group structure',
                    'Balanced classes'
                ],
                'avoid_when': [
                    'Time series data',
                    'Grouped/hierarchical data',
                    'Severe class imbalance'
                ],
                'example': 'General classification/regression problems'
            },
            'stratified_split': {
                'when_to_use': [
                    'Imbalanced classification',
                    'Small dataset with few samples per class',
                    'Multi-class classification',
                    'When class distribution matters'
                ],
                'avoid_when': [
                    'Regression (unless binned)',
                    'Time series data',
                    'Grouped data'
                ],
                'example': 'Medical diagnosis with rare diseases'
            },
            'time_series_split': {
                'when_to_use': [
                    'Temporal data',
                    'Stock prices, weather data',
                    'When future depends on past',
                    'Forecasting problems'
                ],
                'avoid_when': [
                    'No temporal component',
                    'Random order data'
                ],
                'example': 'Sales forecasting, stock prediction'
            },
            'group_split': {
                'when_to_use': [
                    'Multiple samples per entity',
                    'Patient/user-based data',
                    'Preventing information leakage between groups',
                    'Hierarchical data'
                ],
                'avoid_when': [
                    'Independent samples',
                    'No group structure'
                ],
                'example': 'Medical trials (multiple measurements per patient)'
            },
            'nested_cv': {
                'when_to_use': [
                    'Small datasets',
                    'Hyperparameter tuning + model evaluation',
                    'Avoiding overfitting in model selection',
                    'Publishing results'
                ],
                'avoid_when': [
                    'Large datasets (computationally expensive)',
                    'Simple models without hyperparameters'
                ],
                'example': 'Academic research, competitions'
            }
        }
    
    def recommend_strategy(self, data_type, n_samples, has_groups=False, 
                          has_time=False, is_imbalanced=False):
        """Recommend validation strategy based on data characteristics"""
        
        if has_time:
            return 'time_series_split'
        elif has_groups:
            return 'group_split'
        elif is_imbalanced:
            return 'stratified_split'
        elif n_samples < 1000:
            return 'nested_cv'
        else:
            return 'random_split'
    
    def print_guide(self):
        """Print comprehensive guide"""
        for strategy, details in self.strategies.items():
            print(f"\n{'='*50}")
            print(f"Strategy: {strategy.replace('_', ' ').title()}")
            print(f"{'='*50}")
            
            print("\nβœ… When to use:")
            for point in details['when_to_use']:
                print(f"  β€’ {point}")
            
            print("\n❌ Avoid when:")
            for point in details['avoid_when']:
                print(f"  β€’ {point}")
            
            print(f"\nπŸ“Š Example: {details['example']}")

# Create and print guide
guide = ValidationStrategyGuide()
guide.print_guide()

# Interactive strategy selector
def select_validation_strategy(X, y, metadata=None):
    """
    Automatically select appropriate validation strategy
    """
    n_samples, n_features = X.shape
    
    # Check for imbalance
    if len(np.unique(y)) > 1:  # Classification
        class_counts = np.bincount(y)
        imbalance_ratio = class_counts.max() / class_counts.min()
        is_imbalanced = imbalance_ratio > 3
    else:
        is_imbalanced = False
    
    # Check metadata for groups or time
    has_groups = metadata is not None and 'groups' in metadata
    has_time = metadata is not None and 'time' in metadata
    
    # Get recommendation
    strategy = guide.recommend_strategy(
        data_type='classification' if len(np.unique(y)) < 20 else 'regression',
        n_samples=n_samples,
        has_groups=has_groups,
        has_time=has_time,
        is_imbalanced=is_imbalanced
    )
    
    print(f"\nDataset Analysis:")
    print(f"  Samples: {n_samples}")
    print(f"  Features: {n_features}")
    print(f"  Imbalanced: {is_imbalanced}")
    print(f"  Has groups: {has_groups}")
    print(f"  Has time: {has_time}")
    print(f"\nRecommended strategy: {strategy}")
    
    return strategy

# Test with different scenarios
scenarios = [
    (np.random.randn(1000, 10), np.random.choice([0, 1], 1000), None),
    (np.random.randn(1000, 10), np.array([0]*900 + [1]*100), None),
    (np.random.randn(1000, 10), np.random.choice([0, 1], 1000), {'groups': True}),
    (np.random.randn(1000, 10), np.random.randn(1000), {'time': True}),
]

for i, (X_scenario, y_scenario, meta) in enumerate(scenarios):
    print(f"\n{'='*50}")
    print(f"Scenario {i+1}:")
    strategy = select_validation_strategy(X_scenario, y_scenario, meta)

Practice Exercises

Exercise 1: Custom Split Strategy

Implement a custom splitting strategy that:

  1. Handles both stratification and groups
  2. Ensures temporal ordering if dates are provided
  3. Maintains class balance in all splits
  4. Provides visualization of the splits
  5. Calculates and reports distribution statistics

Exercise 2: Leakage Detection Tool

Build a tool that:

  1. Automatically detects potential data leakage
  2. Checks for duplicate samples
  3. Identifies suspiciously correlated features
  4. Validates preprocessing pipeline
  5. Generates a leakage report

Exercise 3: Split Optimization

Create a framework that:

  1. Tests multiple split ratios
  2. Evaluates distribution similarity
  3. Recommends optimal split ratio
  4. Handles multiple data types
  5. Provides confidence intervals

Key Takeaways

Further Resources