Train/Test Splits
The Foundation of Model Evaluation! π―
Proper train/test splitting is the cornerstone of reliable machine learning. It's not just about randomly dividing dataβit's about creating representative splits that prevent overfitting, avoid data leakage, and provide honest performance estimates. Master these techniques to build models that generalize well to real-world data.
Why Train/Test Splits Matter
graph TD
A[Complete Dataset] --> B{Split Strategy}
B --> C[Random Split]
B --> D[Stratified Split]
B --> E[Time Series Split]
B --> F[Group Split]
B --> G[Multi-level Split]
C --> H[Training Set
60-80%] C --> I[Test Set
20-40%] G --> J[Training Set
60%] G --> K[Validation Set
20%] G --> L[Test Set
20%] H --> M[Model Training] M --> N[Predictions] I --> N N --> O[Evaluation] P[Data Leakage] -.->|Avoid| B Q[Class Imbalance] -.->|Consider| D R[Temporal Order] -.->|Maintain| E
60-80%] C --> I[Test Set
20-40%] G --> J[Training Set
60%] G --> K[Validation Set
20%] G --> L[Test Set
20%] H --> M[Model Training] M --> N[Predictions] I --> N N --> O[Evaluation] P[Data Leakage] -.->|Avoid| B Q[Class Imbalance] -.->|Consider| D R[Temporal Order] -.->|Maintain| E
Basic Train/Test Split
Simple Random Split
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification, make_regression
import matplotlib.pyplot as plt
import seaborn as sns
# Set style
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
# Generate sample data
X, y = make_classification(n_samples=1000, n_features=20, n_informative=15,
n_redundant=5, n_classes=2, random_state=42)
# Basic train/test split
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=0.2, # 20% for testing
random_state=42 # For reproducibility
)
print(f"Original dataset size: {X.shape[0]} samples")
print(f"Training set size: {X_train.shape[0]} samples ({X_train.shape[0]/X.shape[0]:.1%})")
print(f"Test set size: {X_test.shape[0]} samples ({X_test.shape[0]/X.shape[0]:.1%})")
# Verify no overlap
train_indices = set(range(len(X_train)))
test_indices = set(range(len(X_test)))
print(f"Overlap between train and test: {len(train_indices.intersection(test_indices))} samples")
# Different split ratios
split_ratios = [0.1, 0.2, 0.25, 0.3, 0.4]
results = []
for ratio in split_ratios:
X_train_temp, X_test_temp, _, _ = train_test_split(
X, y, test_size=ratio, random_state=42
)
results.append({
'Test Ratio': ratio,
'Train Size': len(X_train_temp),
'Test Size': len(X_test_temp),
'Train %': f"{len(X_train_temp)/len(X):.1%}",
'Test %': f"{len(X_test_temp)/len(X):.1%}"
})
results_df = pd.DataFrame(results)
print("\nDifferent Split Ratios:")
print(results_df.to_string(index=False))
# Visualize split distribution
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# Distribution of target variable
train_counts = pd.Series(y_train).value_counts()
test_counts = pd.Series(y_test).value_counts()
axes[0].bar(['Class 0', 'Class 1'], train_counts.values, alpha=0.7, label='Train')
axes[0].bar(['Class 0', 'Class 1'], test_counts.values, alpha=0.7, label='Test')
axes[0].set_title('Class Distribution: Train vs Test')
axes[0].set_ylabel('Count')
axes[0].legend()
# Feature distribution comparison
feature_idx = 0 # First feature
axes[1].hist(X_train[:, feature_idx], bins=30, alpha=0.5, label='Train', density=True)
axes[1].hist(X_test[:, feature_idx], bins=30, alpha=0.5, label='Test', density=True)
axes[1].set_title(f'Feature {feature_idx} Distribution')
axes[1].set_xlabel('Value')
axes[1].set_ylabel('Density')
axes[1].legend()
plt.tight_layout()
plt.show()
Stratified Splitting
Maintaining Class Balance
# Stratified split for classification problems
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
# Create imbalanced dataset
np.random.seed(42)
n_samples = 1000
n_class_0 = 900 # Majority class
n_class_1 = 100 # Minority class
X = np.vstack([
np.random.randn(n_class_0, 2),
np.random.randn(n_class_1, 2) + 3
])
y = np.array([0] * n_class_0 + [1] * n_class_1)
print("Original class distribution:")
print(pd.Series(y).value_counts(normalize=True))
# Compare random vs stratified split
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
# Multiple random splits
for i in range(3):
X_train_rand, X_test_rand, y_train_rand, y_test_rand = train_test_split(
X, y, test_size=0.2, random_state=i # Different seeds
)
# Plot class distributions
train_dist = pd.Series(y_train_rand).value_counts(normalize=True)
test_dist = pd.Series(y_test_rand).value_counts(normalize=True)
x_pos = np.arange(2)
width = 0.35
axes[0, i].bar(x_pos - width/2, train_dist.values, width, label='Train', alpha=0.7)
axes[0, i].bar(x_pos + width/2, test_dist.values, width, label='Test', alpha=0.7)
axes[0, i].set_title(f'Random Split {i+1}')
axes[0, i].set_xlabel('Class')
axes[0, i].set_ylabel('Proportion')
axes[0, i].set_xticks(x_pos)
axes[0, i].set_xticklabels(['Class 0', 'Class 1'])
axes[0, i].legend()
axes[0, i].set_ylim([0, 1])
# Stratified splits
for i in range(3):
X_train_strat, X_test_strat, y_train_strat, y_test_strat = train_test_split(
X, y, test_size=0.2, random_state=i, stratify=y # Stratified
)
# Plot class distributions
train_dist = pd.Series(y_train_strat).value_counts(normalize=True)
test_dist = pd.Series(y_test_strat).value_counts(normalize=True)
axes[1, i].bar(x_pos - width/2, train_dist.values, width, label='Train', alpha=0.7)
axes[1, i].bar(x_pos + width/2, test_dist.values, width, label='Test', alpha=0.7)
axes[1, i].set_title(f'Stratified Split {i+1}')
axes[1, i].set_xlabel('Class')
axes[1, i].set_ylabel('Proportion')
axes[1, i].set_xticks(x_pos)
axes[1, i].set_xticklabels(['Class 0', 'Class 1'])
axes[1, i].legend()
axes[1, i].set_ylim([0, 1])
plt.suptitle('Random vs Stratified Splitting on Imbalanced Data', fontsize=14)
plt.tight_layout()
plt.show()
# Stratified split for regression (using bins)
def stratified_split_regression(X, y, test_size=0.2, n_bins=5, random_state=42):
"""
Stratified split for regression by binning the target variable
"""
# Create bins for stratification
y_binned = pd.qcut(y, q=n_bins, labels=False, duplicates='drop')
# Perform stratified split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, stratify=y_binned, random_state=random_state
)
return X_train, X_test, y_train, y_test
# Example with regression data
X_reg, y_reg = make_regression(n_samples=1000, n_features=10, noise=10, random_state=42)
# Apply stratified split for regression
X_train_reg, X_test_reg, y_train_reg, y_test_reg = stratified_split_regression(
X_reg, y_reg, test_size=0.2
)
# Verify distribution similarity
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
axes[0].hist(y_train_reg, bins=30, alpha=0.5, label='Train', density=True)
axes[0].hist(y_test_reg, bins=30, alpha=0.5, label='Test', density=True)
axes[0].set_title('Target Distribution after Stratified Split (Regression)')
axes[0].set_xlabel('Target Value')
axes[0].set_ylabel('Density')
axes[0].legend()
# Q-Q plot to compare distributions
from scipy import stats
stats.probplot(y_test_reg, dist="norm", plot=axes[1])
axes[1].set_title('Q-Q Plot of Test Set Target')
plt.tight_layout()
plt.show()
Time Series Splits
Temporal Data Splitting
# Time series splitting strategies
from sklearn.model_selection import TimeSeriesSplit
import pandas as pd
import numpy as np
# Generate time series data
np.random.seed(42)
dates = pd.date_range('2020-01-01', periods=365, freq='D')
trend = np.linspace(100, 200, 365)
seasonal = 10 * np.sin(np.arange(365) * 2 * np.pi / 30) # Monthly seasonality
noise = np.random.normal(0, 5, 365)
values = trend + seasonal + noise
df = pd.DataFrame({
'date': dates,
'value': values,
'feature1': np.random.randn(365),
'feature2': np.random.randn(365)
})
print(f"Time series data from {df['date'].min()} to {df['date'].max()}")
# Method 1: Simple time-based split
def time_based_split(df, train_ratio=0.8):
"""Simple chronological split"""
n_train = int(len(df) * train_ratio)
train = df.iloc[:n_train]
test = df.iloc[n_train:]
return train, test
train_simple, test_simple = time_based_split(df, train_ratio=0.8)
print(f"\nSimple Time Split:")
print(f"Train: {train_simple['date'].min()} to {train_simple['date'].max()}")
print(f"Test: {test_simple['date'].min()} to {test_simple['date'].max()}")
# Method 2: TimeSeriesSplit for cross-validation
tscv = TimeSeriesSplit(n_splits=5)
fig, axes = plt.subplots(5, 1, figsize=(12, 10))
for i, (train_idx, test_idx) in enumerate(tscv.split(df)):
train_dates = df.iloc[train_idx]['date']
test_dates = df.iloc[test_idx]['date']
axes[i].scatter(df.iloc[train_idx]['date'], df.iloc[train_idx]['value'],
alpha=0.5, label='Train', s=10)
axes[i].scatter(df.iloc[test_idx]['date'], df.iloc[test_idx]['value'],
alpha=0.5, label='Test', s=10, color='red')
axes[i].set_title(f'Split {i+1}')
axes[i].set_ylabel('Value')
axes[i].legend()
# Add vertical line to show split point
split_date = df.iloc[train_idx[-1]]['date']
axes[i].axvline(x=split_date, color='black', linestyle='--', alpha=0.5)
axes[-1].set_xlabel('Date')
plt.suptitle('Time Series Cross-Validation Splits', fontsize=14)
plt.tight_layout()
plt.show()
# Method 3: Expanding window (for time series)
class ExpandingWindowSplit:
"""Expanding window for time series validation"""
def __init__(self, initial_train_size, test_size, step_size=1):
self.initial_train_size = initial_train_size
self.test_size = test_size
self.step_size = step_size
def split(self, X):
n = len(X)
splits = []
train_end = self.initial_train_size
while train_end + self.test_size <= n:
train_idx = list(range(train_end))
test_idx = list(range(train_end, min(train_end + self.test_size, n)))
splits.append((train_idx, test_idx))
train_end += self.step_size
return splits
# Method 4: Sliding window
class SlidingWindowSplit:
"""Fixed-size sliding window for time series"""
def __init__(self, train_size, test_size, step_size=1):
self.train_size = train_size
self.test_size = test_size
self.step_size = step_size
def split(self, X):
n = len(X)
splits = []
start = 0
while start + self.train_size + self.test_size <= n:
train_idx = list(range(start, start + self.train_size))
test_idx = list(range(start + self.train_size,
start + self.train_size + self.test_size))
splits.append((train_idx, test_idx))
start += self.step_size
return splits
# Visualize different time series split strategies
fig, axes = plt.subplots(3, 1, figsize=(14, 10))
# Expanding window
expanding = ExpandingWindowSplit(initial_train_size=100, test_size=30, step_size=30)
splits_expanding = expanding.split(df)
for i, (train_idx, test_idx) in enumerate(splits_expanding[:5]):
color = plt.cm.viridis(i / 5)
axes[0].scatter(df.iloc[train_idx]['date'], [i] * len(train_idx),
alpha=0.7, s=5, color=color)
axes[0].scatter(df.iloc[test_idx]['date'], [i] * len(test_idx),
alpha=0.7, s=5, color='red')
axes[0].set_title('Expanding Window Strategy')
axes[0].set_ylabel('Split Number')
axes[0].set_ylim([-0.5, 4.5])
# Sliding window
sliding = SlidingWindowSplit(train_size=100, test_size=30, step_size=30)
splits_sliding = sliding.split(df)
for i, (train_idx, test_idx) in enumerate(splits_sliding[:5]):
color = plt.cm.viridis(i / 5)
axes[1].scatter(df.iloc[train_idx]['date'], [i] * len(train_idx),
alpha=0.7, s=5, color=color)
axes[1].scatter(df.iloc[test_idx]['date'], [i] * len(test_idx),
alpha=0.7, s=5, color='red')
axes[1].set_title('Sliding Window Strategy')
axes[1].set_ylabel('Split Number')
axes[1].set_ylim([-0.5, 4.5])
# TimeSeriesSplit (sklearn)
tscv = TimeSeriesSplit(n_splits=5)
for i, (train_idx, test_idx) in enumerate(tscv.split(df)):
color = plt.cm.viridis(i / 5)
axes[2].scatter(df.iloc[train_idx]['date'], [i] * len(train_idx),
alpha=0.7, s=5, color=color)
axes[2].scatter(df.iloc[test_idx]['date'], [i] * len(test_idx),
alpha=0.7, s=5, color='red')
axes[2].set_title('TimeSeriesSplit (sklearn)')
axes[2].set_ylabel('Split Number')
axes[2].set_xlabel('Date')
axes[2].set_ylim([-0.5, 4.5])
plt.tight_layout()
plt.show()
Advanced Splitting Strategies
Group-Based and Multi-Level Splits
# Advanced splitting strategies
from sklearn.model_selection import GroupShuffleSplit, GroupKFold, StratifiedGroupKFold
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
# Create grouped data (e.g., multiple samples per patient/user)
np.random.seed(42)
n_groups = 50
n_samples_per_group = 20
data = []
for group_id in range(n_groups):
for sample in range(n_samples_per_group):
data.append({
'group_id': group_id,
'feature1': np.random.randn(),
'feature2': np.random.randn(),
'target': np.random.choice([0, 1])
})
df = pd.DataFrame(data)
X = df[['feature1', 'feature2']].values
y = df['target'].values
groups = df['group_id'].values
print(f"Dataset: {len(df)} samples from {df['group_id'].nunique()} groups")
print(f"Samples per group: {df['group_id'].value_counts().describe()}")
# Method 1: GroupShuffleSplit - Ensures groups don't overlap
gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
for train_idx, test_idx in gss.split(X, y, groups):
X_train, X_test = X[train_idx], X[test_idx]
y_train, y_test = y[train_idx], y[test_idx]
groups_train, groups_test = groups[train_idx], groups[test_idx]
print(f"\nGroupShuffleSplit:")
print(f"Train: {len(X_train)} samples from {len(np.unique(groups_train))} groups")
print(f"Test: {len(X_test)} samples from {len(np.unique(groups_test))} groups")
print(f"Group overlap: {len(set(groups_train).intersection(set(groups_test)))} groups")
# Method 2: GroupKFold for cross-validation
gkf = GroupKFold(n_splits=5)
splits_info = []
for fold, (train_idx, test_idx) in enumerate(gkf.split(X, y, groups)):
train_groups = np.unique(groups[train_idx])
test_groups = np.unique(groups[test_idx])
splits_info.append({
'Fold': fold + 1,
'Train Samples': len(train_idx),
'Test Samples': len(test_idx),
'Train Groups': len(train_groups),
'Test Groups': len(test_groups),
'Group Overlap': len(set(train_groups).intersection(set(test_groups)))
})
splits_df = pd.DataFrame(splits_info)
print("\nGroupKFold Splits:")
print(splits_df.to_string(index=False))
# Method 3: Multi-level split (train/val/test)
def multi_level_split(X, y, train_ratio=0.6, val_ratio=0.2, test_ratio=0.2,
stratify=None, random_state=42):
"""
Split data into train/validation/test sets
"""
assert train_ratio + val_ratio + test_ratio == 1.0, "Ratios must sum to 1"
# First split: separate test set
X_temp, X_test, y_temp, y_test = train_test_split(
X, y,
test_size=test_ratio,
stratify=stratify if stratify is not None else None,
random_state=random_state
)
# Second split: separate train and validation
val_ratio_adjusted = val_ratio / (train_ratio + val_ratio)
if stratify is not None:
# Need to adjust stratify array for temp split
stratify_temp = stratify[:len(X_temp)]
else:
stratify_temp = None
X_train, X_val, y_train, y_val = train_test_split(
X_temp, y_temp,
test_size=val_ratio_adjusted,
stratify=y_temp if stratify is not None else None,
random_state=random_state
)
return X_train, X_val, X_test, y_train, y_val, y_test
# Apply multi-level split
X_multi, y_multi = make_classification(n_samples=1000, n_features=10,
n_classes=3, random_state=42)
X_train, X_val, X_test, y_train, y_val, y_test = multi_level_split(
X_multi, y_multi,
train_ratio=0.6,
val_ratio=0.2,
test_ratio=0.2,
stratify=y_multi
)
print(f"\nMulti-level Split:")
print(f"Train: {len(X_train)} samples ({len(X_train)/len(X_multi):.1%})")
print(f"Validation: {len(X_val)} samples ({len(X_val)/len(X_multi):.1%})")
print(f"Test: {len(X_test)} samples ({len(X_test)/len(X_multi):.1%})")
# Visualize multi-level split
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
datasets = [
('Training Set', y_train),
('Validation Set', y_val),
('Test Set', y_test)
]
for ax, (name, y_data) in zip(axes, datasets):
unique, counts = np.unique(y_data, return_counts=True)
ax.bar(unique, counts, alpha=0.7)
ax.set_title(f'{name}\n(n={len(y_data)})')
ax.set_xlabel('Class')
ax.set_ylabel('Count')
ax.set_xticks(unique)
# Add percentage labels
for i, (u, c) in enumerate(zip(unique, counts)):
ax.text(u, c, f'{c/len(y_data):.1%}',
ha='center', va='bottom')
plt.suptitle('Multi-Level Split: Class Distribution', fontsize=14)
plt.tight_layout()
plt.show()
# Method 4: Nested splits for hyperparameter tuning
class NestedSplit:
"""
Nested splitting for unbiased model evaluation
Outer loop: Model evaluation
Inner loop: Hyperparameter tuning
"""
def __init__(self, outer_splits=5, inner_splits=3):
self.outer_splits = outer_splits
self.inner_splits = inner_splits
def split(self, X, y):
from sklearn.model_selection import KFold
outer_cv = KFold(n_splits=self.outer_splits, shuffle=True, random_state=42)
inner_cv = KFold(n_splits=self.inner_splits, shuffle=True, random_state=42)
nested_scores = []
for outer_fold, (train_val_idx, test_idx) in enumerate(outer_cv.split(X, y)):
X_train_val, X_test = X[train_val_idx], X[test_idx]
y_train_val, y_test = y[train_val_idx], y[test_idx]
# Inner loop for hyperparameter tuning
inner_scores = []
for inner_fold, (train_idx, val_idx) in enumerate(inner_cv.split(X_train_val, y_train_val)):
X_train = X_train_val[train_idx]
X_val = X_train_val[val_idx]
y_train = y_train_val[train_idx]
y_val = y_train_val[val_idx]
inner_scores.append({
'outer_fold': outer_fold,
'inner_fold': inner_fold,
'train_size': len(X_train),
'val_size': len(X_val),
'test_size': len(X_test)
})
nested_scores.extend(inner_scores)
return pd.DataFrame(nested_scores)
# Example of nested splits
nested = NestedSplit(outer_splits=3, inner_splits=3)
nested_df = nested.split(X_multi, y_multi)
print("\nNested Cross-Validation Structure:")
print(nested_df.groupby('outer_fold').agg({
'inner_fold': 'count',
'train_size': 'mean',
'val_size': 'mean',
'test_size': 'mean'
}).round().astype(int))
Data Leakage Prevention
Avoiding Common Pitfalls
# Data leakage examples and prevention
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
import numpy as np
import pandas as pd
# Generate data
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, random_state=42)
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
print("Data Leakage Examples:\n")
# β WRONG: Scaling before splitting (data leakage!)
def wrong_preprocessing():
"""Incorrect: Preprocessing before splitting causes leakage"""
# Scale the entire dataset (WRONG!)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X) # Fit on ALL data - leakage!
# Then split
X_train_wrong, X_test_wrong, y_train_wrong, y_test_wrong = train_test_split(
X_scaled, y, test_size=0.2, random_state=42
)
# Train model
model = LogisticRegression(random_state=42)
model.fit(X_train_wrong, y_train_wrong)
score = model.score(X_test_wrong, y_test_wrong)
return score
# β
CORRECT: Scaling after splitting
def correct_preprocessing():
"""Correct: Preprocessing after splitting prevents leakage"""
# Split first
X_train_correct, X_test_correct, y_train_correct, y_test_correct = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Scale training data
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_correct) # Fit only on training
X_test_scaled = scaler.transform(X_test_correct) # Transform only on test
# Train model
model = LogisticRegression(random_state=42)
model.fit(X_train_scaled, y_train_correct)
score = model.score(X_test_scaled, y_test_correct)
return score
wrong_score = wrong_preprocessing()
correct_score = correct_preprocessing()
print(f"β Wrong approach (with leakage): {wrong_score:.3f}")
print(f"β
Correct approach (no leakage): {correct_score:.3f}")
print(f"Difference: {wrong_score - correct_score:.3f}")
# More leakage examples
class DataLeakageExamples:
"""Common data leakage scenarios and solutions"""
@staticmethod
def feature_selection_leakage():
"""Feature selection leakage example"""
print("\n2. Feature Selection Leakage:")
# β WRONG: Select features on all data
selector = SelectKBest(f_classif, k=10)
X_selected_wrong = selector.fit_transform(X, y) # Fit on all data!
X_train_wrong, X_test_wrong, y_train_wrong, y_test_wrong = train_test_split(
X_selected_wrong, y, test_size=0.2, random_state=42
)
model = LogisticRegression(random_state=42)
model.fit(X_train_wrong, y_train_wrong)
wrong_score = model.score(X_test_wrong, y_test_wrong)
# β
CORRECT: Select features only on training data
X_train_correct, X_test_correct, y_train_correct, y_test_correct = train_test_split(
X, y, test_size=0.2, random_state=42
)
selector = SelectKBest(f_classif, k=10)
X_train_selected = selector.fit_transform(X_train_correct, y_train_correct)
X_test_selected = selector.transform(X_test_correct) # Transform only
model = LogisticRegression(random_state=42)
model.fit(X_train_selected, y_train_correct)
correct_score = model.score(X_test_selected, y_test_correct)
print(f"β With leakage: {wrong_score:.3f}")
print(f"β
Without leakage: {correct_score:.3f}")
@staticmethod
def duplicate_samples_leakage():
"""Duplicate samples causing leakage"""
print("\n3. Duplicate Samples Leakage:")
# Create data with duplicates
X_with_dups = np.vstack([X[:100], X[:100]]) # Duplicate first 100 samples
y_with_dups = np.hstack([y[:100], y[:100]])
# Random split might put duplicates in both train and test
X_train_dups, X_test_dups, y_train_dups, y_test_dups = train_test_split(
X_with_dups, y_with_dups, test_size=0.2, random_state=42
)
# Check for duplicates between train and test
train_df = pd.DataFrame(X_train_dups)
test_df = pd.DataFrame(X_test_dups)
# Find duplicates
merged = train_df.merge(test_df, how='inner',
left_on=list(range(20)), right_on=list(range(20)))
print(f"Number of duplicate samples between train and test: {len(merged)}")
print("Solution: Remove duplicates before splitting or use GroupKFold")
@staticmethod
def target_leakage():
"""Target leakage from feature engineering"""
print("\n4. Target Leakage from Features:")
# Create features that leak target information
np.random.seed(42)
X_leak = np.random.randn(1000, 5)
y_leak = np.random.choice([0, 1], 1000)
# Add a feature that's highly correlated with target (leakage!)
X_leak = np.column_stack([X_leak, y_leak + np.random.normal(0, 0.1, 1000)])
# This will show artificially high performance
X_train_leak, X_test_leak, y_train_leak, y_test_leak = train_test_split(
X_leak, y_leak, test_size=0.2, random_state=42
)
model = LogisticRegression(random_state=42)
model.fit(X_train_leak, y_train_leak)
leak_score = model.score(X_test_leak, y_test_leak)
# Without leaky feature
model_clean = LogisticRegression(random_state=42)
model_clean.fit(X_train_leak[:, :5], y_train_leak) # Exclude leaky feature
clean_score = model_clean.score(X_test_leak[:, :5], y_test_leak)
print(f"With target leakage: {leak_score:.3f}")
print(f"Without target leakage: {clean_score:.3f}")
# Run leakage examples
examples = DataLeakageExamples()
examples.feature_selection_leakage()
examples.duplicate_samples_leakage()
examples.target_leakage()
# Best practice: Use pipelines to prevent leakage
print("\nβ
BEST PRACTICE: Use Pipelines")
# Create pipeline that ensures proper order of operations
pipeline = Pipeline([
('scaler', StandardScaler()),
('poly', PolynomialFeatures(degree=2, include_bias=False)),
('selector', SelectKBest(f_classif, k=10)),
('classifier', LogisticRegression(random_state=42))
])
# Pipeline ensures transformations are fit only on training data
X_train_pipe, X_test_pipe, y_train_pipe, y_test_pipe = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Fit pipeline (all preprocessing steps fit only on training data)
pipeline.fit(X_train_pipe, y_train_pipe)
# Score (preprocessing steps only transform test data)
pipeline_score = pipeline.score(X_test_pipe, y_test_pipe)
print(f"Pipeline score (no leakage): {pipeline_score:.3f}")
# Cross-validation also prevents leakage
cv_scores = cross_val_score(pipeline, X, y, cv=5)
print(f"Cross-validation scores: {cv_scores}")
print(f"Mean CV score: {cv_scores.mean():.3f} (+/- {cv_scores.std() * 2:.3f})")
Validation Strategies
Choosing the Right Split Strategy
# Comprehensive validation strategy guide
class ValidationStrategyGuide:
"""Guide for choosing appropriate validation strategies"""
def __init__(self):
self.strategies = {
'random_split': {
'when_to_use': [
'Independent and identically distributed (i.i.d.) data',
'No temporal component',
'No group structure',
'Balanced classes'
],
'avoid_when': [
'Time series data',
'Grouped/hierarchical data',
'Severe class imbalance'
],
'example': 'General classification/regression problems'
},
'stratified_split': {
'when_to_use': [
'Imbalanced classification',
'Small dataset with few samples per class',
'Multi-class classification',
'When class distribution matters'
],
'avoid_when': [
'Regression (unless binned)',
'Time series data',
'Grouped data'
],
'example': 'Medical diagnosis with rare diseases'
},
'time_series_split': {
'when_to_use': [
'Temporal data',
'Stock prices, weather data',
'When future depends on past',
'Forecasting problems'
],
'avoid_when': [
'No temporal component',
'Random order data'
],
'example': 'Sales forecasting, stock prediction'
},
'group_split': {
'when_to_use': [
'Multiple samples per entity',
'Patient/user-based data',
'Preventing information leakage between groups',
'Hierarchical data'
],
'avoid_when': [
'Independent samples',
'No group structure'
],
'example': 'Medical trials (multiple measurements per patient)'
},
'nested_cv': {
'when_to_use': [
'Small datasets',
'Hyperparameter tuning + model evaluation',
'Avoiding overfitting in model selection',
'Publishing results'
],
'avoid_when': [
'Large datasets (computationally expensive)',
'Simple models without hyperparameters'
],
'example': 'Academic research, competitions'
}
}
def recommend_strategy(self, data_type, n_samples, has_groups=False,
has_time=False, is_imbalanced=False):
"""Recommend validation strategy based on data characteristics"""
if has_time:
return 'time_series_split'
elif has_groups:
return 'group_split'
elif is_imbalanced:
return 'stratified_split'
elif n_samples < 1000:
return 'nested_cv'
else:
return 'random_split'
def print_guide(self):
"""Print comprehensive guide"""
for strategy, details in self.strategies.items():
print(f"\n{'='*50}")
print(f"Strategy: {strategy.replace('_', ' ').title()}")
print(f"{'='*50}")
print("\nβ
When to use:")
for point in details['when_to_use']:
print(f" β’ {point}")
print("\nβ Avoid when:")
for point in details['avoid_when']:
print(f" β’ {point}")
print(f"\nπ Example: {details['example']}")
# Create and print guide
guide = ValidationStrategyGuide()
guide.print_guide()
# Interactive strategy selector
def select_validation_strategy(X, y, metadata=None):
"""
Automatically select appropriate validation strategy
"""
n_samples, n_features = X.shape
# Check for imbalance
if len(np.unique(y)) > 1: # Classification
class_counts = np.bincount(y)
imbalance_ratio = class_counts.max() / class_counts.min()
is_imbalanced = imbalance_ratio > 3
else:
is_imbalanced = False
# Check metadata for groups or time
has_groups = metadata is not None and 'groups' in metadata
has_time = metadata is not None and 'time' in metadata
# Get recommendation
strategy = guide.recommend_strategy(
data_type='classification' if len(np.unique(y)) < 20 else 'regression',
n_samples=n_samples,
has_groups=has_groups,
has_time=has_time,
is_imbalanced=is_imbalanced
)
print(f"\nDataset Analysis:")
print(f" Samples: {n_samples}")
print(f" Features: {n_features}")
print(f" Imbalanced: {is_imbalanced}")
print(f" Has groups: {has_groups}")
print(f" Has time: {has_time}")
print(f"\nRecommended strategy: {strategy}")
return strategy
# Test with different scenarios
scenarios = [
(np.random.randn(1000, 10), np.random.choice([0, 1], 1000), None),
(np.random.randn(1000, 10), np.array([0]*900 + [1]*100), None),
(np.random.randn(1000, 10), np.random.choice([0, 1], 1000), {'groups': True}),
(np.random.randn(1000, 10), np.random.randn(1000), {'time': True}),
]
for i, (X_scenario, y_scenario, meta) in enumerate(scenarios):
print(f"\n{'='*50}")
print(f"Scenario {i+1}:")
strategy = select_validation_strategy(X_scenario, y_scenario, meta)
Practice Exercises
Exercise 1: Custom Split Strategy
Implement a custom splitting strategy that:
- Handles both stratification and groups
- Ensures temporal ordering if dates are provided
- Maintains class balance in all splits
- Provides visualization of the splits
- Calculates and reports distribution statistics
Exercise 2: Leakage Detection Tool
Build a tool that:
- Automatically detects potential data leakage
- Checks for duplicate samples
- Identifies suspiciously correlated features
- Validates preprocessing pipeline
- Generates a leakage report
Exercise 3: Split Optimization
Create a framework that:
- Tests multiple split ratios
- Evaluates distribution similarity
- Recommends optimal split ratio
- Handles multiple data types
- Provides confidence intervals
Key Takeaways
- π― Proper splitting is crucial for reliable model evaluation
- βοΈ Use stratification for imbalanced datasets
- π Maintain temporal order for time series data
- π₯ Keep groups together to prevent leakage
- π« Always split before preprocessing to avoid data leakage
- π Use cross-validation for robust evaluation
- π Verify distribution similarity between splits
- π οΈ Pipelines help prevent preprocessing leakage