Feature Engineering
The Art and Science of Creating Features! 🎨
Feature engineering is often the difference between a mediocre model and a state-of-the-art solution. It's the process of transforming raw data into features that better represent the underlying problem, enabling machine learning algorithms to work their magic. Master these techniques to extract maximum value from your data.
Feature Engineering Pipeline
graph TD
A[Raw Data] --> B[Feature Creation]
B --> C[Feature Transformation]
C --> D[Feature Selection]
D --> E[Feature Extraction]
E --> F[Final Feature Set]
B --> G[Domain Knowledge]
B --> H[Statistical Features]
B --> I[Interaction Features]
C --> J[Scaling/Normalization]
C --> K[Encoding]
C --> L[Binning]
D --> M[Filter Methods]
D --> N[Wrapper Methods]
D --> O[Embedded Methods]
E --> P[PCA]
E --> Q[LDA]
E --> R[Autoencoders]
F --> S[Model Training]
Feature Creation
Creating Features from Domain Knowledge
import numpy as np
import pandas as pd
from sklearn.preprocessing import PolynomialFeatures
from sklearn.datasets import make_classification, make_regression
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
# Set visualization style
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
# Example: E-commerce dataset
np.random.seed(42)
n_samples = 1000
# Create synthetic e-commerce data
df = pd.DataFrame({
'user_id': np.random.randint(1, 200, n_samples),
'product_price': np.random.uniform(10, 500, n_samples),
'quantity': np.random.randint(1, 10, n_samples),
'user_age': np.random.randint(18, 70, n_samples),
'user_member_days': np.random.randint(0, 2000, n_samples),
'hour_of_day': np.random.randint(0, 24, n_samples),
'day_of_week': np.random.randint(0, 7, n_samples),
'month': np.random.randint(1, 13, n_samples),
'page_views': np.random.randint(1, 50, n_samples),
'time_on_site': np.random.uniform(30, 1800, n_samples), # seconds
'previous_purchases': np.random.randint(0, 100, n_samples),
'cart_abandonment_rate': np.random.uniform(0, 1, n_samples),
'device_type': np.random.choice(['mobile', 'desktop', 'tablet'], n_samples),
'traffic_source': np.random.choice(['organic', 'paid', 'social', 'direct'], n_samples)
})
# Target: whether user made purchase
df['made_purchase'] = (
(df['time_on_site'] > 300) &
(df['page_views'] > 5) &
(np.random.random(n_samples) > 0.3)
).astype(int)
print("Original features:")
print(df.head())
print(f"\nDataset shape: {df.shape}")
# Feature Engineering Class
class FeatureEngineer:
"""Comprehensive feature engineering pipeline"""
def __init__(self, df):
self.df = df.copy()
self.new_features = []
def create_basic_features(self):
"""Create basic mathematical features"""
# Total spending
self.df['total_amount'] = self.df['product_price'] * self.df['quantity']
self.new_features.append('total_amount')
# Average price per item (handling multiple quantities)
self.df['avg_price_per_item'] = self.df['product_price']
self.new_features.append('avg_price_per_item')
# User engagement score
self.df['engagement_score'] = (
self.df['page_views'] * 0.3 +
self.df['time_on_site'] / 60 * 0.7
)
self.new_features.append('engagement_score')
return self
def create_ratio_features(self):
"""Create ratio and percentage features"""
# Pages per minute
self.df['pages_per_minute'] = (
self.df['page_views'] / (self.df['time_on_site'] / 60 + 1)
)
self.new_features.append('pages_per_minute')
# Purchase frequency
self.df['purchase_frequency'] = (
self.df['previous_purchases'] / (self.df['user_member_days'] + 1)
)
self.new_features.append('purchase_frequency')
# Cart conversion potential
self.df['conversion_potential'] = 1 - self.df['cart_abandonment_rate']
self.new_features.append('conversion_potential')
return self
def create_temporal_features(self):
"""Create time-based features"""
# Is weekend
self.df['is_weekend'] = (self.df['day_of_week'] >= 5).astype(int)
self.new_features.append('is_weekend')
# Part of day
self.df['part_of_day'] = pd.cut(
self.df['hour_of_day'],
bins=[0, 6, 12, 18, 24],
labels=['night', 'morning', 'afternoon', 'evening']
)
# Is business hours
self.df['is_business_hours'] = (
(self.df['hour_of_day'] >= 9) & (self.df['hour_of_day'] <= 17)
).astype(int)
self.new_features.append('is_business_hours')
# Season
self.df['season'] = pd.cut(
self.df['month'],
bins=[0, 3, 6, 9, 12],
labels=['winter', 'spring', 'summer', 'fall']
)
return self
def create_user_segments(self):
"""Create user segmentation features"""
# User loyalty tier
self.df['loyalty_tier'] = pd.cut(
self.df['user_member_days'],
bins=[0, 30, 180, 365, float('inf')],
labels=['new', 'regular', 'loyal', 'vip']
)
# Age group
self.df['age_group'] = pd.cut(
self.df['user_age'],
bins=[0, 25, 35, 50, 100],
labels=['gen_z', 'millennial', 'gen_x', 'boomer']
)
# Spending category
self.df['spending_category'] = pd.cut(
self.df['product_price'],
bins=[0, 50, 150, 300, float('inf')],
labels=['budget', 'medium', 'premium', 'luxury']
)
return self
def create_interaction_features(self):
"""Create feature interactions"""
# Price sensitivity (age vs price)
self.df['price_age_interaction'] = (
self.df['product_price'] / (self.df['user_age'] + 1)
)
self.new_features.append('price_age_interaction')
# Engagement per visit
self.df['engagement_efficiency'] = (
self.df['engagement_score'] / (self.df['previous_purchases'] + 1)
)
self.new_features.append('engagement_efficiency')
# Device-time interaction
self.df['mobile_evening'] = (
(self.df['device_type'] == 'mobile') &
(self.df['hour_of_day'] >= 18)
).astype(int)
self.new_features.append('mobile_evening')
return self
def create_statistical_features(self):
"""Create statistical aggregation features"""
# User statistics
user_stats = self.df.groupby('user_id').agg({
'product_price': ['mean', 'std', 'max'],
'quantity': ['mean', 'sum'],
'page_views': ['mean']
})
user_stats.columns = ['_'.join(col).strip() for col in user_stats.columns]
self.df = self.df.merge(user_stats, left_on='user_id', right_index=True, how='left')
# Fill NaN values for users with single purchase
for col in user_stats.columns:
if 'std' in col:
self.df[col] = self.df[col].fillna(0)
else:
self.df[col] = self.df[col].fillna(self.df[col.replace('_mean', '').replace('_sum', '').replace('_max', '')])
self.new_features.extend(user_stats.columns.tolist())
return self
def get_engineered_features(self):
"""Return DataFrame with all engineered features"""
return self.df
def get_feature_importance_hints(self):
"""Provide hints about potentially important features"""
hints = {
'High Importance Expected': [
'engagement_score',
'purchase_frequency',
'total_amount'
],
'Medium Importance Expected': [
'is_weekend',
'pages_per_minute',
'loyalty_tier'
],
'Interaction Effects': [
'price_age_interaction',
'mobile_evening'
]
}
return hints
# Apply feature engineering
fe = FeatureEngineer(df)
df_engineered = (fe.create_basic_features()
.create_ratio_features()
.create_temporal_features()
.create_user_segments()
.create_interaction_features()
.create_statistical_features()
.get_engineered_features())
print(f"\n\nEngineered dataset shape: {df_engineered.shape}")
print(f"New features created: {len(fe.new_features)}")
print(f"New feature names: {fe.new_features[:10]}...")
# Visualize feature distributions
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.ravel()
features_to_plot = ['engagement_score', 'purchase_frequency', 'pages_per_minute',
'conversion_potential', 'price_age_interaction', 'engagement_efficiency']
for ax, feature in zip(axes, features_to_plot):
if feature in df_engineered.columns:
df_engineered[feature].hist(bins=30, ax=ax, edgecolor='black', alpha=0.7)
ax.set_title(f'Distribution of {feature}')
ax.set_xlabel('Value')
ax.set_ylabel('Frequency')
plt.suptitle('Distributions of Engineered Features', fontsize=14)
plt.tight_layout()
plt.show()
Polynomial and Interaction Features
Automatic Feature Generation
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.model_selection import train_test_split, cross_val_score
import numpy as np
# Generate sample data with non-linear relationship
np.random.seed(42)
X = np.random.uniform(-3, 3, (200, 2))
y = 2 * X[:, 0]**2 + 3 * X[:, 1] + X[:, 0] * X[:, 1] + np.random.normal(0, 1, 200)
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# Compare different polynomial degrees
degrees = [1, 2, 3, 4]
results = []
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.ravel()
for degree, ax in zip(degrees, axes):
# Create polynomial features
poly = PolynomialFeatures(degree=degree, include_bias=False)
X_train_poly = poly.fit_transform(X_train)
X_test_poly = poly.transform(X_test)
# Train model
model = Ridge(alpha=0.1) # Use Ridge to handle multicollinearity
model.fit(X_train_poly, y_train)
# Evaluate
train_score = model.score(X_train_poly, y_train)
test_score = model.score(X_test_poly, y_test)
results.append({
'Degree': degree,
'N Features': X_train_poly.shape[1],
'Train R²': train_score,
'Test R²': test_score,
'Overfit': train_score - test_score
})
# Get feature names
feature_names = poly.get_feature_names_out(['x0', 'x1'])
# Visualize predictions
y_pred = model.predict(X_test_poly)
ax.scatter(y_test, y_pred, alpha=0.5, s=20)
ax.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()],
'r--', lw=2)
ax.set_xlabel('Actual')
ax.set_ylabel('Predicted')
ax.set_title(f'Degree {degree}: {len(feature_names)} features\nTest R² = {test_score:.3f}')
ax.grid(True, alpha=0.3)
plt.suptitle('Polynomial Features: Impact of Degree on Model Performance', fontsize=14)
plt.tight_layout()
plt.show()
# Display results
results_df = pd.DataFrame(results)
print("\nPolynomial Features Comparison:")
print(results_df.to_string(index=False))
# Show example of generated features
poly_example = PolynomialFeatures(degree=2, include_bias=False)
X_example = np.array([[1, 2]])
X_poly = poly_example.fit_transform(X_example)
feature_names = poly_example.get_feature_names_out(['x1', 'x2'])
print(f"\nOriginal features: {X_example[0]}")
print(f"Polynomial features (degree=2): {X_poly[0]}")
print(f"Feature names: {feature_names}")
# Custom interaction features
class InteractionFeatures:
"""Create specific interaction features"""
def __init__(self, interaction_pairs=None):
self.interaction_pairs = interaction_pairs or []
def fit(self, X, feature_names=None):
"""Learn feature indices"""
self.n_features = X.shape[1]
self.feature_names = feature_names or [f'x{i}' for i in range(self.n_features)]
return self
def transform(self, X):
"""Create interaction features"""
X_new = X.copy()
# Add all pairwise interactions if not specified
if not self.interaction_pairs:
for i in range(self.n_features):
for j in range(i+1, self.n_features):
self.interaction_pairs.append((i, j))
# Create interactions
new_features = []
new_names = []
for i, j in self.interaction_pairs:
interaction = X[:, i] * X[:, j]
new_features.append(interaction.reshape(-1, 1))
new_names.append(f'{self.feature_names[i]}*{self.feature_names[j]}')
if new_features:
X_new = np.hstack([X_new] + new_features)
self.output_feature_names = self.feature_names + new_names
return X_new
def get_feature_names_out(self):
return self.output_feature_names
# Example of custom interactions
interaction_creator = InteractionFeatures(interaction_pairs=[(0, 1)])
interaction_creator.fit(X_train, feature_names=['feature1', 'feature2'])
X_train_interact = interaction_creator.transform(X_train)
print(f"\nCustom Interaction Features:")
print(f"Original shape: {X_train.shape}")
print(f"With interactions: {X_train_interact.shape}")
print(f"Feature names: {interaction_creator.get_feature_names_out()}")
Feature Transformation
Mathematical Transformations
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, FunctionTransformer
from scipy import stats
import numpy as np
import pandas as pd
# Generate skewed data
np.random.seed(42)
n_samples = 1000
# Different distributions
data = pd.DataFrame({
'normal': np.random.normal(100, 15, n_samples),
'exponential': np.random.exponential(2, n_samples),
'lognormal': np.random.lognormal(0, 1, n_samples),
'uniform': np.random.uniform(0, 100, n_samples),
'bimodal': np.concatenate([
np.random.normal(30, 5, n_samples//2),
np.random.normal(70, 5, n_samples//2)
])
})
# Add some extreme outliers
data.loc[0:5, 'exponential'] = 100
print("Original data statistics:")
print(data.describe())
# Transformation techniques
class FeatureTransformer:
"""Apply various transformations to features"""
def __init__(self, df):
self.df = df.copy()
self.transformations = {}
def apply_log_transform(self, columns):
"""Log transformation for right-skewed data"""
for col in columns:
# Add small constant to handle zeros
self.df[f'{col}_log'] = np.log1p(self.df[col])
self.transformations[f'{col}_log'] = 'log1p'
return self
def apply_sqrt_transform(self, columns):
"""Square root transformation"""
for col in columns:
self.df[f'{col}_sqrt'] = np.sqrt(np.abs(self.df[col]))
self.transformations[f'{col}_sqrt'] = 'sqrt'
return self
def apply_reciprocal_transform(self, columns):
"""Reciprocal transformation"""
for col in columns:
# Add small constant to avoid division by zero
self.df[f'{col}_reciprocal'] = 1 / (self.df[col] + 1e-8)
self.transformations[f'{col}_reciprocal'] = 'reciprocal'
return self
def apply_box_cox_transform(self, columns):
"""Box-Cox transformation"""
for col in columns:
# Ensure positive values
min_val = self.df[col].min()
if min_val <= 0:
self.df[col] = self.df[col] - min_val + 1
transformed, lambda_param = stats.boxcox(self.df[col])
self.df[f'{col}_boxcox'] = transformed
self.transformations[f'{col}_boxcox'] = f'boxcox(λ={lambda_param:.2f})'
return self
def apply_yeo_johnson_transform(self, columns):
"""Yeo-Johnson transformation (handles negative values)"""
pt = PowerTransformer(method='yeo-johnson')
for col in columns:
transformed = pt.fit_transform(self.df[[col]])
self.df[f'{col}_yeojohnson'] = transformed
self.transformations[f'{col}_yeojohnson'] = 'yeo-johnson'
return self
def apply_quantile_transform(self, columns, n_quantiles=100):
"""Quantile transformation to uniform or normal distribution"""
qt_uniform = QuantileTransformer(n_quantiles=n_quantiles, output_distribution='uniform')
qt_normal = QuantileTransformer(n_quantiles=n_quantiles, output_distribution='normal')
for col in columns:
self.df[f'{col}_quantile_uniform'] = qt_uniform.fit_transform(self.df[[col]])
self.df[f'{col}_quantile_normal'] = qt_normal.fit_transform(self.df[[col]])
self.transformations[f'{col}_quantile_uniform'] = 'quantile_uniform'
self.transformations[f'{col}_quantile_normal'] = 'quantile_normal'
return self
def get_transformed_df(self):
return self.df
# Apply transformations
transformer = FeatureTransformer(data)
transformed_df = (transformer
.apply_log_transform(['exponential', 'lognormal'])
.apply_sqrt_transform(['exponential'])
.apply_yeo_johnson_transform(['exponential', 'bimodal'])
.apply_quantile_transform(['bimodal'])
.get_transformed_df())
# Visualize transformations
fig, axes = plt.subplots(3, 3, figsize=(15, 12))
# Original exponential
axes[0, 0].hist(data['exponential'], bins=50, edgecolor='black', alpha=0.7)
axes[0, 0].set_title('Original Exponential')
axes[0, 0].set_ylabel('Frequency')
# Log transform
axes[0, 1].hist(transformed_df['exponential_log'], bins=50, edgecolor='black', alpha=0.7)
axes[0, 1].set_title('Log Transform')
# Square root transform
axes[0, 2].hist(transformed_df['exponential_sqrt'], bins=50, edgecolor='black', alpha=0.7)
axes[0, 2].set_title('Square Root Transform')
# Original bimodal
axes[1, 0].hist(data['bimodal'], bins=50, edgecolor='black', alpha=0.7)
axes[1, 0].set_title('Original Bimodal')
axes[1, 0].set_ylabel('Frequency')
# Yeo-Johnson transform
axes[1, 1].hist(transformed_df['bimodal_yeojohnson'], bins=50, edgecolor='black', alpha=0.7)
axes[1, 1].set_title('Yeo-Johnson Transform')
# Quantile transform (uniform)
axes[1, 2].hist(transformed_df['bimodal_quantile_uniform'], bins=50, edgecolor='black', alpha=0.7)
axes[1, 2].set_title('Quantile Transform (Uniform)')
# Q-Q plots to check normality
stats.probplot(data['exponential'], dist="norm", plot=axes[2, 0])
axes[2, 0].set_title('Original Exponential Q-Q')
stats.probplot(transformed_df['exponential_log'], dist="norm", plot=axes[2, 1])
axes[2, 1].set_title('Log Transform Q-Q')
stats.probplot(transformed_df['exponential_yeojohnson'], dist="norm", plot=axes[2, 2])
axes[2, 2].set_title('Yeo-Johnson Transform Q-Q')
plt.suptitle('Feature Transformation Effects', fontsize=14)
plt.tight_layout()
plt.show()
# Compare skewness before and after transformation
skewness_comparison = pd.DataFrame({
'Original': data.skew(),
'After_Transform': {
'exponential': transformed_df['exponential_yeojohnson'].skew() if 'exponential_yeojohnson' in transformed_df else None,
'lognormal': transformed_df['lognormal_log'].skew() if 'lognormal_log' in transformed_df else None,
'bimodal': transformed_df['bimodal_yeojohnson'].skew() if 'bimodal_yeojohnson' in transformed_df else None
}
})
print("\nSkewness Comparison:")
print(skewness_comparison.dropna())
Feature Selection
Multiple Selection Methods
from sklearn.feature_selection import (
SelectKBest, SelectPercentile, f_classif, chi2, mutual_info_classif,
RFE, RFECV, SelectFromModel, VarianceThreshold
)
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LassoCV
import numpy as np
import pandas as pd
# Generate classification data with informative and noise features
X, y = make_classification(n_samples=500, n_features=30, n_informative=10,
n_redundant=10, n_repeated=0, n_classes=2,
random_state=42, shuffle=False)
# Add feature names
feature_names = [f'feature_{i}' for i in range(X.shape[1])]
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3,
random_state=42, stratify=y)
# Feature Selection Methods
class FeatureSelector:
"""Comprehensive feature selection"""
def __init__(self, X_train, y_train, X_test, y_test, feature_names):
self.X_train = X_train
self.y_train = y_train
self.X_test = X_test
self.y_test = y_test
self.feature_names = np.array(feature_names)
self.selection_results = {}
def variance_threshold(self, threshold=0.01):
"""Remove low variance features"""
selector = VarianceThreshold(threshold=threshold)
selector.fit(self.X_train)
selected_features = self.feature_names[selector.get_support()]
self.selection_results['Variance Threshold'] = {
'selected_features': selected_features,
'n_features': len(selected_features),
'method': f'variance > {threshold}'
}
return selector
def univariate_selection(self, k=10, score_func=f_classif):
"""Select k best features using univariate statistics"""
selector = SelectKBest(score_func=score_func, k=k)
selector.fit(self.X_train, self.y_train)
# Get scores
scores = selector.scores_
selected_features = self.feature_names[selector.get_support()]
# Create DataFrame with scores
scores_df = pd.DataFrame({
'Feature': self.feature_names,
'Score': scores
}).sort_values('Score', ascending=False)
self.selection_results['Univariate (f_classif)'] = {
'selected_features': selected_features,
'n_features': len(selected_features),
'scores_df': scores_df,
'method': score_func.__name__
}
return selector, scores_df
def mutual_information(self, k=10):
"""Select features using mutual information"""
selector = SelectKBest(score_func=mutual_info_classif, k=k)
selector.fit(self.X_train, self.y_train)
scores = selector.scores_
selected_features = self.feature_names[selector.get_support()]
scores_df = pd.DataFrame({
'Feature': self.feature_names,
'MI Score': scores
}).sort_values('MI Score', ascending=False)
self.selection_results['Mutual Information'] = {
'selected_features': selected_features,
'n_features': len(selected_features),
'scores_df': scores_df,
'method': 'mutual_info_classif'
}
return selector, scores_df
def recursive_feature_elimination(self, estimator=None, n_features=10):
"""RFE with cross-validation"""
if estimator is None:
estimator = RandomForestClassifier(n_estimators=100, random_state=42)
# RFE with cross-validation to find optimal number of features
rfecv = RFECV(estimator, step=1, cv=5, scoring='accuracy', n_jobs=-1)
rfecv.fit(self.X_train, self.y_train)
selected_features = self.feature_names[rfecv.support_]
self.selection_results['RFE-CV'] = {
'selected_features': selected_features,
'n_features': rfecv.n_features_,
'optimal_features': rfecv.n_features_,
'cv_scores': rfecv.cv_results_,
'ranking': rfecv.ranking_
}
return rfecv
def l1_based_selection(self, C=1.0):
"""L1-based feature selection using Lasso"""
lasso = LassoCV(cv=5, random_state=42)
lasso.fit(self.X_train, self.y_train)
selector = SelectFromModel(lasso, prefit=True)
selected_features = self.feature_names[selector.get_support()]
# Get coefficients
coef_df = pd.DataFrame({
'Feature': self.feature_names,
'Coefficient': np.abs(lasso.coef_)
}).sort_values('Coefficient', ascending=False)
self.selection_results['L1 (Lasso)'] = {
'selected_features': selected_features,
'n_features': len(selected_features),
'alpha': lasso.alpha_,
'coef_df': coef_df
}
return selector, coef_df
def tree_based_selection(self, estimator=None, threshold='mean'):
"""Tree-based feature selection"""
if estimator is None:
estimator = RandomForestClassifier(n_estimators=100, random_state=42)
estimator.fit(self.X_train, self.y_train)
selector = SelectFromModel(estimator, threshold=threshold, prefit=True)
selected_features = self.feature_names[selector.get_support()]
# Get feature importances
importance_df = pd.DataFrame({
'Feature': self.feature_names,
'Importance': estimator.feature_importances_
}).sort_values('Importance', ascending=False)
self.selection_results['Tree-based'] = {
'selected_features': selected_features,
'n_features': len(selected_features),
'importance_df': importance_df,
'threshold': threshold
}
return selector, importance_df
def compare_methods(self):
"""Compare all feature selection methods"""
comparison = []
for method, results in self.selection_results.items():
comparison.append({
'Method': method,
'N Features': results['n_features'],
'Features': ', '.join(results['selected_features'][:5]) + '...'
})
return pd.DataFrame(comparison)
# Apply feature selection methods
fs = FeatureSelector(X_train, y_train, X_test, y_test, feature_names)
# Apply different methods
fs.variance_threshold()
univariate_selector, univariate_scores = fs.univariate_selection(k=10)
mi_selector, mi_scores = fs.mutual_information(k=10)
rfecv = fs.recursive_feature_elimination()
lasso_selector, lasso_coef = fs.l1_based_selection()
tree_selector, tree_importance = fs.tree_based_selection()
# Compare methods
comparison_df = fs.compare_methods()
print("\nFeature Selection Methods Comparison:")
print(comparison_df.to_string(index=False))
# Visualize feature selection results
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
# Univariate scores
axes[0, 0].barh(univariate_scores.head(10)['Feature'],
univariate_scores.head(10)['Score'])
axes[0, 0].set_xlabel('F-statistic Score')
axes[0, 0].set_title('Univariate Feature Selection (Top 10)')
# Mutual information scores
axes[0, 1].barh(mi_scores.head(10)['Feature'],
mi_scores.head(10)['MI Score'])
axes[0, 1].set_xlabel('Mutual Information Score')
axes[0, 1].set_title('Mutual Information (Top 10)')
# RFE-CV scores
cv_scores = rfecv.cv_results_['mean_test_score']
axes[0, 2].plot(range(1, len(cv_scores) + 1), cv_scores, 'o-')
axes[0, 2].axvline(x=rfecv.n_features_, color='r', linestyle='--',
label=f'Optimal: {rfecv.n_features_}')
axes[0, 2].set_xlabel('Number of Features')
axes[0, 2].set_ylabel('CV Score')
axes[0, 2].set_title('RFE Cross-validation Scores')
axes[0, 2].legend()
axes[0, 2].grid(True, alpha=0.3)
# Lasso coefficients
axes[1, 0].barh(lasso_coef.head(10)['Feature'],
lasso_coef.head(10)['Coefficient'])
axes[1, 0].set_xlabel('Lasso Coefficient (Abs)')
axes[1, 0].set_title('L1 Feature Selection (Top 10)')
# Tree-based importances
axes[1, 1].barh(tree_importance.head(10)['Feature'],
tree_importance.head(10)['Importance'])
axes[1, 1].set_xlabel('Feature Importance')
axes[1, 1].set_title('Tree-based Selection (Top 10)')
# Feature overlap heatmap
methods = ['Univariate', 'MI', 'RFE-CV', 'Lasso', 'Tree']
selected_sets = [
set(fs.selection_results['Univariate (f_classif)']['selected_features']),
set(fs.selection_results['Mutual Information']['selected_features']),
set(fs.selection_results['RFE-CV']['selected_features']),
set(fs.selection_results['L1 (Lasso)']['selected_features']),
set(fs.selection_results['Tree-based']['selected_features'])
]
overlap_matrix = np.zeros((len(methods), len(methods)))
for i in range(len(methods)):
for j in range(len(methods)):
if i == j:
overlap_matrix[i, j] = len(selected_sets[i])
else:
overlap_matrix[i, j] = len(selected_sets[i].intersection(selected_sets[j]))
im = axes[1, 2].imshow(overlap_matrix, cmap='YlOrRd')
axes[1, 2].set_xticks(range(len(methods)))
axes[1, 2].set_yticks(range(len(methods)))
axes[1, 2].set_xticklabels(methods, rotation=45)
axes[1, 2].set_yticklabels(methods)
axes[1, 2].set_title('Feature Selection Overlap')
# Add text annotations
for i in range(len(methods)):
for j in range(len(methods)):
axes[1, 2].text(j, i, int(overlap_matrix[i, j]),
ha="center", va="center", color="white" if overlap_matrix[i, j] > 5 else "black")
plt.colorbar(im, ax=axes[1, 2])
plt.suptitle('Feature Selection Methods Comparison', fontsize=14)
plt.tight_layout()
plt.show()
Feature Extraction
Dimensionality Reduction Techniques
from sklearn.decomposition import PCA, FastICA, NMF, TruncatedSVD
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.manifold import TSNE
import numpy as np
import pandas as pd
# Generate high-dimensional data
X_high, y_high = make_classification(n_samples=500, n_features=100,
n_informative=20, n_redundant=30,
n_classes=3, random_state=42)
# Feature extraction methods
class FeatureExtractor:
"""Apply various feature extraction techniques"""
def __init__(self, X, y=None):
self.X = X
self.y = y
self.extractors = {}
self.results = {}
def apply_pca(self, n_components=0.95):
"""Principal Component Analysis"""
pca = PCA(n_components=n_components, random_state=42)
X_pca = pca.fit_transform(self.X)
self.extractors['PCA'] = pca
self.results['PCA'] = {
'X_transformed': X_pca,
'n_components': pca.n_components_,
'explained_variance_ratio': pca.explained_variance_ratio_,
'cumulative_variance': np.cumsum(pca.explained_variance_ratio_)
}
return X_pca
def apply_lda(self, n_components=None):
"""Linear Discriminant Analysis (requires labels)"""
if self.y is None:
raise ValueError("LDA requires target labels")
# LDA components limited by n_classes - 1
max_components = len(np.unique(self.y)) - 1
if n_components is None or n_components > max_components:
n_components = max_components
lda = LinearDiscriminantAnalysis(n_components=n_components)
X_lda = lda.fit_transform(self.X, self.y)
self.extractors['LDA'] = lda
self.results['LDA'] = {
'X_transformed': X_lda,
'n_components': n_components,
'explained_variance_ratio': lda.explained_variance_ratio_
}
return X_lda
def apply_ica(self, n_components=10):
"""Independent Component Analysis"""
ica = FastICA(n_components=n_components, random_state=42, max_iter=1000)
X_ica = ica.fit_transform(self.X)
self.extractors['ICA'] = ica
self.results['ICA'] = {
'X_transformed': X_ica,
'n_components': n_components,
'mixing_matrix': ica.mixing_
}
return X_ica
def apply_nmf(self, n_components=10):
"""Non-negative Matrix Factorization"""
# NMF requires non-negative values
X_positive = self.X - self.X.min() + 1e-10
nmf = NMF(n_components=n_components, random_state=42, max_iter=1000)
X_nmf = nmf.fit_transform(X_positive)
self.extractors['NMF'] = nmf
self.results['NMF'] = {
'X_transformed': X_nmf,
'n_components': n_components,
'reconstruction_error': nmf.reconstruction_err_
}
return X_nmf
def apply_tsne(self, n_components=2, perplexity=30):
"""t-SNE for visualization"""
tsne = TSNE(n_components=n_components, perplexity=perplexity,
random_state=42, n_iter=1000)
X_tsne = tsne.fit_transform(self.X)
self.extractors['t-SNE'] = tsne
self.results['t-SNE'] = {
'X_transformed': X_tsne,
'n_components': n_components,
'kl_divergence': tsne.kl_divergence_
}
return X_tsne
# Apply feature extraction
fe = FeatureExtractor(X_high, y_high)
X_pca = fe.apply_pca(n_components=0.95)
X_lda = fe.apply_lda()
X_ica = fe.apply_ica(n_components=10)
X_nmf = fe.apply_nmf(n_components=10)
X_tsne = fe.apply_tsne(n_components=2)
print(f"Original dimensions: {X_high.shape}")
print(f"PCA dimensions: {X_pca.shape} (preserving 95% variance)")
print(f"LDA dimensions: {X_lda.shape}")
print(f"ICA dimensions: {X_ica.shape}")
print(f"NMF dimensions: {X_nmf.shape}")
print(f"t-SNE dimensions: {X_tsne.shape}")
# Visualize results
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
# PCA explained variance
axes[0, 0].plot(range(1, len(fe.results['PCA']['explained_variance_ratio']) + 1),
fe.results['PCA']['cumulative_variance'], 'o-')
axes[0, 0].axhline(y=0.95, color='r', linestyle='--', label='95% threshold')
axes[0, 0].set_xlabel('Number of Components')
axes[0, 0].set_ylabel('Cumulative Explained Variance')
axes[0, 0].set_title('PCA Explained Variance')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# PCA 2D projection
axes[0, 1].scatter(X_pca[:, 0], X_pca[:, 1], c=y_high, cmap='viridis', alpha=0.6)
axes[0, 1].set_xlabel('PC1')
axes[0, 1].set_ylabel('PC2')
axes[0, 1].set_title('PCA Projection (2D)')
# LDA projection
axes[0, 2].scatter(X_lda[:, 0], X_lda[:, 1] if X_lda.shape[1] > 1 else np.zeros(len(X_lda)),
c=y_high, cmap='viridis', alpha=0.6)
axes[0, 2].set_xlabel('LD1')
axes[0, 2].set_ylabel('LD2' if X_lda.shape[1] > 1 else '')
axes[0, 2].set_title('LDA Projection')
# ICA components
axes[1, 0].scatter(X_ica[:, 0], X_ica[:, 1], c=y_high, cmap='viridis', alpha=0.6)
axes[1, 0].set_xlabel('IC1')
axes[1, 0].set_ylabel('IC2')
axes[1, 0].set_title('ICA Components')
# NMF components
axes[1, 1].scatter(X_nmf[:, 0], X_nmf[:, 1], c=y_high, cmap='viridis', alpha=0.6)
axes[1, 1].set_xlabel('NMF1')
axes[1, 1].set_ylabel('NMF2')
axes[1, 1].set_title('NMF Components')
# t-SNE visualization
scatter = axes[1, 2].scatter(X_tsne[:, 0], X_tsne[:, 1], c=y_high, cmap='viridis', alpha=0.6)
axes[1, 2].set_xlabel('t-SNE1')
axes[1, 2].set_ylabel('t-SNE2')
axes[1, 2].set_title('t-SNE Visualization')
plt.colorbar(scatter, ax=axes[1, 2])
plt.suptitle('Feature Extraction Methods Comparison', fontsize=14)
plt.tight_layout()
plt.show()
Automated Feature Engineering
# Automated feature engineering pipeline
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
class AutoFeatureEngineer:
"""Automated feature engineering pipeline"""
def __init__(self, task='classification'):
self.task = task
self.pipeline = None
self.best_params = None
def create_pipeline(self):
"""Create feature engineering pipeline"""
# Feature generation and transformation
polynomial = ('poly', PolynomialFeatures(include_bias=False))
scaler = ('scaler', StandardScaler())
# Feature selection
selector = ('selector', SelectKBest())
# Feature extraction
pca = ('pca', PCA())
# Combine features
feature_union = FeatureUnion([
('original', Pipeline([scaler])),
('polynomial', Pipeline([polynomial, scaler])),
('pca', Pipeline([scaler, pca]))
])
# Final pipeline
if self.task == 'classification':
classifier = ('classifier', RandomForestClassifier(random_state=42))
self.pipeline = Pipeline([
('features', feature_union),
selector,
classifier
])
return self.pipeline
def optimize_pipeline(self, X_train, y_train, cv=5):
"""Optimize pipeline hyperparameters"""
param_grid = {
'features__polynomial__poly__degree': [1, 2],
'features__pca__pca__n_components': [0.9, 0.95],
'selector__k': [10, 20, 30],
'classifier__n_estimators': [50, 100],
'classifier__max_depth': [None, 10, 20]
}
grid_search = GridSearchCV(
self.pipeline,
param_grid,
cv=cv,
scoring='accuracy' if self.task == 'classification' else 'r2',
n_jobs=-1,
verbose=1
)
grid_search.fit(X_train, y_train)
self.best_params = grid_search.best_params_
self.pipeline = grid_search.best_estimator_
return grid_search.best_score_, grid_search.best_params_
def get_feature_importance(self):
"""Get feature importance from final model"""
if hasattr(self.pipeline.named_steps['classifier'], 'feature_importances_'):
return self.pipeline.named_steps['classifier'].feature_importances_
return None
# Example usage
auto_fe = AutoFeatureEngineer(task='classification')
pipeline = auto_fe.create_pipeline()
print("Automated Feature Engineering Pipeline:")
print(pipeline)
# Generate sample data
X_auto, y_auto = make_classification(n_samples=500, n_features=20,
n_informative=10, random_state=42)
X_train_auto, X_test_auto, y_train_auto, y_test_auto = train_test_split(
X_auto, y_auto, test_size=0.3, random_state=42
)
# Optimize pipeline (commented out for speed)
# best_score, best_params = auto_fe.optimize_pipeline(X_train_auto, y_train_auto)
# print(f"\nBest CV Score: {best_score:.3f}")
# print(f"Best Parameters: {best_params}")
# Feature engineering best practices
best_practices = """
FEATURE ENGINEERING BEST PRACTICES:
1. Domain Knowledge
- Understand the problem domain
- Create features that capture business logic
- Consult with domain experts
2. Start Simple
- Begin with basic features
- Add complexity gradually
- Validate improvements
3. Handle Missing Values
- Understand why data is missing
- Choose appropriate imputation
- Consider missingness as a feature
4. Scale Features Appropriately
- Normalize/standardize for distance-based algorithms
- Keep original scale for tree-based models
- Consider feature distributions
5. Avoid Data Leakage
- Create features only from training data
- Be careful with time-based features
- Validate on truly unseen data
6. Feature Selection
- Remove redundant features
- Use multiple selection methods
- Consider computational cost
7. Monitor Overfitting
- Use cross-validation
- Compare train vs validation performance
- Regularize when necessary
8. Document Your Process
- Keep track of feature definitions
- Version control feature engineering code
- Create reproducible pipelines
"""
print(best_practices)
Practice Exercises
Exercise 1: Time Series Feature Engineering
Create a comprehensive feature engineering pipeline for time series data that:
- Extracts lag features (1, 7, 30 days)
- Creates rolling statistics (mean, std, min, max)
- Generates seasonal indicators
- Calculates trend features
- Implements fourier transforms for cyclical patterns
Exercise 2: Text Feature Engineering
Build a text feature engineering system that:
- Creates bag-of-words features
- Implements TF-IDF transformation
- Extracts n-grams (unigrams, bigrams, trigrams)
- Generates text statistics (length, punctuation, capitals)
- Creates word embeddings
Exercise 3: Automated Feature Engineering
Develop an automated system that:
- Analyzes feature types automatically
- Applies appropriate transformations
- Generates interaction features
- Performs feature selection
- Evaluates feature importance
- Creates a report of all engineering steps
Key Takeaways
- 🎯 Feature engineering often has more impact than model selection
- 🧠 Domain knowledge is crucial for creating meaningful features
- ➕ Polynomial and interaction features capture non-linear relationships
- 🔄 Transform features to meet algorithm assumptions
- 📊 Use multiple feature selection methods and compare results
- 📉 PCA and other extraction methods reduce dimensionality
- ⚠️ Always engineer features within cross-validation folds
- 📝 Document and version control your feature engineering process
- 🔬 Validate that new features improve model performance