Recommendation systems are algorithms that suggest relevant items to users based on their preferences, behavior, and similarities with other users or items. From Netflix movie suggestions to Amazon product recommendations and Spotify playlists, these systems power personalized experiences across digital platforms. This lesson covers collaborative filtering (user-based and item-based), content-based filtering, matrix factorization techniques (SVD, NMF), hybrid approaches, and modern deep learning methods for building effective recommendation systems.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import sparse
from scipy.sparse.linalg import svds
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import NMF, TruncatedSVD
import warnings
warnings.filterwarnings('ignore')
# Set random seed
np.random.seed(42)
# Set style
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
print("="*60)
print("RECOMMENDATION SYSTEMS FUNDAMENTALS")
print("="*60)
recommendation_concepts = """
RECOMMENDATION SYSTEMS KEY CONCEPTS:
1. TYPES OF RECOMMENDATION SYSTEMS:
• Collaborative Filtering (CF)
- User-based: Find similar users
- Item-based: Find similar items
• Content-Based Filtering
- Item features and user preferences
• Hybrid Systems
- Combine multiple approaches
• Knowledge-Based
- Domain knowledge and rules
2. COLLABORATIVE FILTERING:
• Memory-based: Direct similarity computation
• Model-based: Learn latent factors
• Advantages: No domain knowledge needed
• Challenges: Cold start, sparsity
3. CONTENT-BASED FILTERING:
• Feature extraction from items
• User profile learning
• Advantages: No cold start for items
• Challenges: Limited discovery, feature engineering
4. MATRIX FACTORIZATION:
• SVD (Singular Value Decomposition)
• NMF (Non-negative Matrix Factorization)
• ALS (Alternating Least Squares)
• Deep Matrix Factorization
5. EVALUATION METRICS:
• RMSE (Root Mean Squared Error)
• MAE (Mean Absolute Error)
• Precision@K, Recall@K
• NDCG (Normalized Discounted Cumulative Gain)
• Coverage and Diversity
6. CHALLENGES:
• Cold Start Problem
• Data Sparsity
• Scalability
• Popularity Bias
• Filter Bubbles
7. APPLICATIONS:
• E-commerce: Product recommendations
• Streaming: Movies, music, videos
• Social Media: Friend suggestions
• News: Article recommendations
• Education: Course suggestions
"""
print(recommendation_concepts)
class RecommendationDataGenerator:
"""Generate sample data for recommendation systems"""
def __init__(self):
self.users = None
self.items = None
self.ratings = None
self.item_features = None
def generate_movie_ratings(self, n_users=100, n_movies=50, sparsity=0.1):
"""Generate synthetic movie ratings data"""
# Create user and movie names
self.users = [f'User_{i}' for i in range(1, n_users + 1)]
self.items = [f'Movie_{i}' for i in range(1, n_movies + 1)]
# Generate user preferences (latent factors)
n_factors = 5
user_factors = np.random.randn(n_users, n_factors)
item_factors = np.random.randn(n_movies, n_factors)
# Generate ratings based on latent factors
true_ratings = np.dot(user_factors, item_factors.T)
# Scale to 1-5 range
true_ratings = 1 + 4 * (true_ratings - true_ratings.min()) / (true_ratings.max() - true_ratings.min())
# Add noise
noise = np.random.normal(0, 0.5, (n_users, n_movies))
ratings_matrix = true_ratings + noise
# Clip to valid range
ratings_matrix = np.clip(ratings_matrix, 1, 5)
# Create sparsity
mask = np.random.random((n_users, n_movies)) < sparsity
ratings_matrix[~mask] = 0
# Convert to DataFrame
self.ratings = pd.DataFrame(ratings_matrix,
index=self.users,
columns=self.items)
# Create long format for easier manipulation
ratings_long = []
for user_idx, user in enumerate(self.users):
for movie_idx, movie in enumerate(self.items):
if ratings_matrix[user_idx, movie_idx] > 0:
ratings_long.append({
'user': user,
'item': movie,
'rating': ratings_matrix[user_idx, movie_idx]
})
self.ratings_long = pd.DataFrame(ratings_long)
return self.ratings, self.ratings_long
def generate_item_features(self):
"""Generate item features for content-based filtering"""
genres = ['Action', 'Comedy', 'Drama', 'Horror', 'Sci-Fi', 'Romance']
# Generate random genre assignments (multiple genres per movie)
item_features = []
for movie in self.items:
movie_genres = np.random.choice(genres,
size=np.random.randint(1, 4),
replace=False)
# Create feature vector
feature_vec = {genre: 1 if genre in movie_genres else 0
for genre in genres}
feature_vec['item'] = movie
# Add some continuous features
feature_vec['year'] = np.random.randint(1980, 2024)
feature_vec['duration'] = np.random.randint(80, 180)
feature_vec['budget'] = np.random.randint(1, 200) # in millions
item_features.append(feature_vec)
self.item_features = pd.DataFrame(item_features)
self.item_features.set_index('item', inplace=True)
return self.item_features
def split_data(self, test_size=0.2):
"""Split ratings into train and test sets"""
train_data = []
test_data = []
# Split by user to maintain user behavior
for user in self.users:
user_ratings = self.ratings_long[self.ratings_long['user'] == user]
if len(user_ratings) > 1:
# Keep at least one rating in training
n_test = max(1, int(len(user_ratings) * test_size))
n_train = len(user_ratings) - n_test
# Random split
indices = np.random.permutation(len(user_ratings))
train_indices = indices[:n_train]
test_indices = indices[n_train:]
train_data.append(user_ratings.iloc[train_indices])
test_data.append(user_ratings.iloc[test_indices])
else:
# If user has only one rating, keep in training
train_data.append(user_ratings)
train_df = pd.concat(train_data, ignore_index=True)
test_df = pd.concat(test_data, ignore_index=True) if test_data else pd.DataFrame()
return train_df, test_df
# Generate sample data
data_gen = RecommendationDataGenerator()
ratings_matrix, ratings_df = data_gen.generate_movie_ratings(n_users=100, n_movies=50, sparsity=0.15)
item_features = data_gen.generate_item_features()
print("\n" + "="*60)
print("SAMPLE DATA GENERATED")
print("="*60)
print(f"\nRatings Matrix Shape: {ratings_matrix.shape}")
print(f"Number of ratings: {(ratings_matrix > 0).sum().sum()}")
print(f"Sparsity: {1 - (ratings_matrix > 0).sum().sum() / (ratings_matrix.shape[0] * ratings_matrix.shape[1]):.2%}")
print("\nSample Ratings:")
print(ratings_df.head())
print("\nRating Distribution:")
print(ratings_df['rating'].describe())
# Visualize ratings distribution
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
# Rating distribution
axes[0].hist(ratings_df['rating'], bins=20, edgecolor='black', alpha=0.7)
axes[0].set_xlabel('Rating')
axes[0].set_ylabel('Frequency')
axes[0].set_title('Rating Distribution')
axes[0].grid(True, alpha=0.3)
# Ratings per user
ratings_per_user = ratings_df.groupby('user').size()
axes[1].hist(ratings_per_user, bins=20, edgecolor='black', alpha=0.7, color='green')
axes[1].set_xlabel('Number of Ratings')
axes[1].set_ylabel('Number of Users')
axes[1].set_title('Ratings per User Distribution')
axes[1].grid(True, alpha=0.3)
plt.suptitle('Recommendation System Data Characteristics', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
class CollaborativeFiltering:
"""Collaborative filtering recommendation methods"""
def __init__(self, ratings_matrix):
self.ratings_matrix = ratings_matrix
self.similarity_matrix = None
def user_based_cf(self, target_user, n_recommendations=5):
"""User-based collaborative filtering"""
# Calculate user similarity matrix
user_ratings = self.ratings_matrix.values
# Replace 0s with NaN for similarity calculation
user_ratings_nan = user_ratings.copy()
user_ratings_nan[user_ratings_nan == 0] = np.nan
# Calculate cosine similarity between users
# Create binary matrix for rated items
rated_mask = ~np.isnan(user_ratings_nan)
# Calculate similarity
similarity_matrix = []
for i in range(len(user_ratings_nan)):
user_sims = []
for j in range(len(user_ratings_nan)):
# Find common rated items
common_items = rated_mask[i] & rated_mask[j]
if common_items.sum() > 0:
# Calculate cosine similarity on common items
user_i = user_ratings_nan[i][common_items]
user_j = user_ratings_nan[j][common_items]
# Handle NaN values
if not np.isnan(user_i).all() and not np.isnan(user_j).all():
sim = np.dot(user_i, user_j) / (np.linalg.norm(user_i) * np.linalg.norm(user_j))
else:
sim = 0
else:
sim = 0
user_sims.append(sim)
similarity_matrix.append(user_sims)
similarity_matrix = np.array(similarity_matrix)
# Get target user index
user_idx = self.ratings_matrix.index.get_loc(target_user)
# Get similar users (excluding self)
user_similarities = similarity_matrix[user_idx].copy()
user_similarities[user_idx] = -1 # Exclude self
# Get top similar users
similar_users_idx = np.argsort(user_similarities)[::-1][:10]
# Predict ratings for unrated items
target_ratings = user_ratings[user_idx]
predictions = {}
for item_idx in range(len(target_ratings)):
if target_ratings[item_idx] == 0: # Unrated item
# Weighted average of similar users' ratings
weighted_sum = 0
similarity_sum = 0
for similar_user_idx in similar_users_idx:
if user_ratings[similar_user_idx, item_idx] > 0:
weighted_sum += (user_similarities[similar_user_idx] *
user_ratings[similar_user_idx, item_idx])
similarity_sum += abs(user_similarities[similar_user_idx])
if similarity_sum > 0:
predicted_rating = weighted_sum / similarity_sum
predictions[self.ratings_matrix.columns[item_idx]] = predicted_rating
# Sort and return top recommendations
recommendations = sorted(predictions.items(), key=lambda x: x[1], reverse=True)[:n_recommendations]
return recommendations, user_similarities
def item_based_cf(self, target_user, n_recommendations=5):
"""Item-based collaborative filtering"""
# Calculate item similarity matrix
item_ratings = self.ratings_matrix.T.values
# Calculate cosine similarity between items
item_similarity = cosine_similarity(item_ratings)
# Get user's rated items
user_idx = self.ratings_matrix.index.get_loc(target_user)
user_ratings = self.ratings_matrix.values[user_idx]
rated_items = np.where(user_ratings > 0)[0]
# Predict ratings for unrated items
predictions = {}
for item_idx in range(len(user_ratings)):
if user_ratings[item_idx] == 0: # Unrated item
# Weighted average based on item similarity
weighted_sum = 0
similarity_sum = 0
for rated_item_idx in rated_items:
similarity = item_similarity[item_idx, rated_item_idx]
weighted_sum += similarity * user_ratings[rated_item_idx]
similarity_sum += abs(similarity)
if similarity_sum > 0:
predicted_rating = weighted_sum / similarity_sum
predictions[self.ratings_matrix.columns[item_idx]] = predicted_rating
# Sort and return top recommendations
recommendations = sorted(predictions.items(), key=lambda x: x[1], reverse=True)[:n_recommendations]
return recommendations, item_similarity
def matrix_factorization_svd(self, n_factors=10, n_recommendations=5):
"""Matrix factorization using SVD"""
# Convert to numpy array
ratings = self.ratings_matrix.values
# Replace 0s with mean rating
ratings_filled = ratings.copy()
mean_rating = ratings[ratings > 0].mean()
ratings_filled[ratings_filled == 0] = mean_rating
# Normalize by subtracting mean
user_ratings_mean = np.mean(ratings_filled, axis=1)
ratings_normalized = ratings_filled - user_ratings_mean.reshape(-1, 1)
# Perform SVD
U, sigma, Vt = svds(ratings_normalized, k=n_factors)
# Convert sigma to diagonal matrix
sigma = np.diag(sigma)
# Reconstruct ratings matrix
predicted_ratings = np.dot(np.dot(U, sigma), Vt) + user_ratings_mean.reshape(-1, 1)
# Create predictions DataFrame
predictions_df = pd.DataFrame(predicted_ratings,
index=self.ratings_matrix.index,
columns=self.ratings_matrix.columns)
return predictions_df, U, sigma, Vt
def evaluate_predictions(self, test_data, predictions_df):
"""Evaluate recommendation predictions"""
actual_ratings = []
predicted_ratings = []
for _, row in test_data.iterrows():
user = row['user']
item = row['item']
actual = row['rating']
if user in predictions_df.index and item in predictions_df.columns:
predicted = predictions_df.loc[user, item]
actual_ratings.append(actual)
predicted_ratings.append(predicted)
if actual_ratings:
rmse = np.sqrt(mean_squared_error(actual_ratings, predicted_ratings))
mae = mean_absolute_error(actual_ratings, predicted_ratings)
return {'rmse': rmse, 'mae': mae}
else:
return {'rmse': None, 'mae': None}
# Collaborative filtering demonstration
cf = CollaborativeFiltering(ratings_matrix)
print("\n" + "="*60)
print("COLLABORATIVE FILTERING")
print("="*60)
# User-based CF
target_user = 'User_1'
user_recommendations, user_sims = cf.user_based_cf(target_user, n_recommendations=5)
print(f"\nUser-based Recommendations for {target_user}:")
for item, score in user_recommendations:
print(f" {item}: {score:.3f}")
# Item-based CF
item_recommendations, item_sims = cf.item_based_cf(target_user, n_recommendations=5)
print(f"\nItem-based Recommendations for {target_user}:")
for item, score in item_recommendations:
print(f" {item}: {score:.3f}")
# Matrix Factorization
predictions_df, U, sigma, Vt = cf.matrix_factorization_svd(n_factors=10)
print("\nMatrix Factorization (SVD) Results:")
print(f" User factors shape: {U.shape}")
print(f" Singular values shape: {sigma.shape}")
print(f" Item factors shape: {Vt.shape}")
# Visualize similarity matrices
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# User similarity heatmap (sample)
sample_users = 20
im1 = axes[0].imshow(user_sims[:sample_users, :sample_users], cmap='coolwarm', aspect='auto')
axes[0].set_title('User Similarity Matrix (Sample)')
axes[0].set_xlabel('User Index')
axes[0].set_ylabel('User Index')
plt.colorbar(im1, ax=axes[0])
# Item similarity heatmap (sample)
sample_items = 20
im2 = axes[1].imshow(item_sims[:sample_items, :sample_items], cmap='coolwarm', aspect='auto')
axes[1].set_title('Item Similarity Matrix (Sample)')
axes[1].set_xlabel('Item Index')
axes[1].set_ylabel('Item Index')
plt.colorbar(im2, ax=axes[1])
# Predicted vs Original ratings
original_sample = ratings_matrix.values[:10, :10]
predicted_sample = predictions_df.values[:10, :10]
im3 = axes[2].imshow(np.abs(predicted_sample - original_sample), cmap='viridis', aspect='auto')
axes[2].set_title('Prediction Error Matrix (Sample)')
axes[2].set_xlabel('Item Index')
axes[2].set_ylabel('User Index')
plt.colorbar(im3, ax=axes[2])
plt.suptitle('Collaborative Filtering Analysis', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
class ContentBasedFiltering:
"""Content-based recommendation methods"""
def __init__(self, ratings_df, item_features):
self.ratings_df = ratings_df
self.item_features = item_features
self.user_profiles = {}
def build_user_profile(self, user):
"""Build user profile based on rated items"""
# Get user's rated items
user_ratings = self.ratings_df[self.ratings_df['user'] == user]
if len(user_ratings) == 0:
return None
# Weight item features by user ratings
weighted_features = []
for _, row in user_ratings.iterrows():
item = row['item']
rating = row['rating']
if item in self.item_features.index:
item_feat = self.item_features.loc[item].values
weighted_features.append(rating * item_feat)
if weighted_features:
# Average weighted features
user_profile = np.mean(weighted_features, axis=0)
self.user_profiles[user] = user_profile
return user_profile
return None
def recommend_content_based(self, user, n_recommendations=5):
"""Generate recommendations based on content similarity"""
# Build or retrieve user profile
if user not in self.user_profiles:
user_profile = self.build_user_profile(user)
if user_profile is None:
return []
else:
user_profile = self.user_profiles[user]
# Get user's already rated items
user_items = set(self.ratings_df[self.ratings_df['user'] == user]['item'].values)
# Calculate similarity between user profile and all items
item_similarities = {}
for item in self.item_features.index:
if item not in user_items: # Only unrated items
item_feat = self.item_features.loc[item].values
# Cosine similarity
similarity = np.dot(user_profile, item_feat) / (
np.linalg.norm(user_profile) * np.linalg.norm(item_feat)
)
item_similarities[item] = similarity
# Sort and return top recommendations
recommendations = sorted(item_similarities.items(),
key=lambda x: x[1], reverse=True)[:n_recommendations]
return recommendations
def hybrid_recommendation(self, user, cf_predictions, weight_cf=0.5, n_recommendations=5):
"""Hybrid approach combining collaborative and content-based"""
# Get content-based recommendations
cb_recommendations = self.recommend_content_based(user, n_recommendations=20)
cb_scores = {item: score for item, score in cb_recommendations}
# Normalize scores
if cb_scores:
max_cb = max(cb_scores.values())
min_cb = min(cb_scores.values())
if max_cb > min_cb:
cb_scores = {item: (score - min_cb) / (max_cb - min_cb)
for item, score in cb_scores.items()}
# Get CF predictions for the user
cf_scores = {}
if user in cf_predictions.index:
user_cf_predictions = cf_predictions.loc[user]
# Get unrated items
user_items = set(self.ratings_df[self.ratings_df['user'] == user]['item'].values)
for item in cf_predictions.columns:
if item not in user_items:
cf_scores[item] = user_cf_predictions[item]
# Normalize CF scores
if cf_scores:
max_cf = max(cf_scores.values())
min_cf = min(cf_scores.values())
if max_cf > min_cf:
cf_scores = {item: (score - min_cf) / (max_cf - min_cf)
for item, score in cf_scores.items()}
# Combine scores
hybrid_scores = {}
all_items = set(cb_scores.keys()) | set(cf_scores.keys())
for item in all_items:
cb_score = cb_scores.get(item, 0)
cf_score = cf_scores.get(item, 0)
hybrid_score = weight_cf * cf_score + (1 - weight_cf) * cb_score
hybrid_scores[item] = hybrid_score
# Sort and return top recommendations
recommendations = sorted(hybrid_scores.items(),
key=lambda x: x[1], reverse=True)[:n_recommendations]
return recommendations
# Content-based filtering demonstration
cbf = ContentBasedFiltering(ratings_df, item_features)
print("\n" + "="*60)
print("CONTENT-BASED FILTERING")
print("="*60)
# Build user profile
user_profile = cbf.build_user_profile(target_user)
print(f"\nUser Profile for {target_user}:")
print(f" Profile shape: {user_profile.shape}")
print(f" Average genre preferences:")
for idx, genre in enumerate(['Action', 'Comedy', 'Drama', 'Horror', 'Sci-Fi', 'Romance']):
print(f" {genre}: {user_profile[idx]:.3f}")
# Content-based recommendations
cb_recommendations = cbf.recommend_content_based(target_user, n_recommendations=5)
print(f"\nContent-based Recommendations for {target_user}:")
for item, score in cb_recommendations:
print(f" {item}: {score:.3f}")
# Hybrid recommendations
hybrid_recommendations = cbf.hybrid_recommendation(target_user, predictions_df,
weight_cf=0.6, n_recommendations=5)
print(f"\nHybrid Recommendations for {target_user} (60% CF, 40% CB):")
for item, score in hybrid_recommendations:
print(f" {item}: {score:.3f}")
class AdvancedRecommendation:
"""Advanced recommendation techniques"""
def __init__(self, ratings_matrix):
self.ratings_matrix = ratings_matrix
def nmf_factorization(self, n_components=10):
"""Non-negative Matrix Factorization"""
# Prepare data
ratings = self.ratings_matrix.values
ratings_filled = ratings.copy()
# Fill missing values with small positive value
ratings_filled[ratings_filled == 0] = 0.01
# Apply NMF
nmf = NMF(n_components=n_components, init='random', random_state=42)
user_factors = nmf.fit_transform(ratings_filled)
item_factors = nmf.components_
# Reconstruct ratings
predicted_ratings = np.dot(user_factors, item_factors)
predictions_df = pd.DataFrame(predicted_ratings,
index=self.ratings_matrix.index,
columns=self.ratings_matrix.columns)
return predictions_df, user_factors, item_factors
def popularity_based(self, n_recommendations=5):
"""Simple popularity-based recommendations"""
# Calculate item popularity
ratings = self.ratings_matrix.values
item_popularity = {}
for item_idx, item_name in enumerate(self.ratings_matrix.columns):
item_ratings = ratings[:, item_idx]
rated_count = np.sum(item_ratings > 0)
avg_rating = np.mean(item_ratings[item_ratings > 0]) if rated_count > 0 else 0
# Combine rating count and average rating
popularity_score = rated_count * 0.3 + avg_rating * 0.7
item_popularity[item_name] = {
'score': popularity_score,
'count': rated_count,
'avg_rating': avg_rating
}
# Sort by popularity
popular_items = sorted(item_popularity.items(),
key=lambda x: x[1]['score'], reverse=True)
return popular_items[:n_recommendations]
def association_rules(self, min_support=0.1):
"""Association rule-based recommendations"""
# Convert to binary matrix (rated/not rated)
binary_matrix = (self.ratings_matrix.values > 0).astype(int)
# Find frequent itemsets
item_sets = {}
n_users = len(binary_matrix)
# Single items
for item_idx, item_name in enumerate(self.ratings_matrix.columns):
support = np.sum(binary_matrix[:, item_idx]) / n_users
if support >= min_support:
item_sets[frozenset([item_name])] = support
# Pairs of items
for i in range(len(self.ratings_matrix.columns)):
for j in range(i+1, len(self.ratings_matrix.columns)):
item_i = self.ratings_matrix.columns[i]
item_j = self.ratings_matrix.columns[j]
# Users who rated both items
both_rated = binary_matrix[:, i] & binary_matrix[:, j]
support = np.sum(both_rated) / n_users
if support >= min_support:
item_sets[frozenset([item_i, item_j])] = support
# Generate association rules
rules = []
for itemset, support in item_sets.items():
if len(itemset) == 2:
items = list(itemset)
# Rule: item_i -> item_j
for i in range(2):
antecedent = items[i]
consequent = items[1-i]
# Calculate confidence
antecedent_support = item_sets.get(frozenset([antecedent]), 0)
if antecedent_support > 0:
confidence = support / antecedent_support
# Calculate lift
consequent_support = item_sets.get(frozenset([consequent]), 0)
if consequent_support > 0:
lift = confidence / consequent_support
else:
lift = 0
rules.append({
'antecedent': antecedent,
'consequent': consequent,
'support': support,
'confidence': confidence,
'lift': lift
})
# Sort by lift
rules = sorted(rules, key=lambda x: x['lift'], reverse=True)
return rules
def diversity_reranking(self, recommendations, item_features, diversity_weight=0.3):
"""Re-rank recommendations to increase diversity"""
if len(recommendations) <= 1:
return recommendations
# Extract item names from recommendations
items = [item for item, _ in recommendations[:10]] # Consider top 10
# Calculate pairwise similarities
similarities = []
for i, item_i in enumerate(items):
for j, item_j in enumerate(items):
if i != j and item_i in item_features.index and item_j in item_features.index:
feat_i = item_features.loc[item_i].values
feat_j = item_features.loc[item_j].values
sim = np.dot(feat_i, feat_j) / (np.linalg.norm(feat_i) * np.linalg.norm(feat_j))
similarities.append(sim)
avg_similarity = np.mean(similarities) if similarities else 0
# Re-rank with diversity penalty
reranked = []
remaining = list(recommendations)
# Add first item
reranked.append(remaining.pop(0))
while remaining and len(reranked) < 5:
best_score = -np.inf
best_idx = 0
for idx, (item, score) in enumerate(remaining):
# Calculate diversity from already selected items
diversity = 0
for selected_item, _ in reranked:
if item in item_features.index and selected_item in item_features.index:
feat_i = item_features.loc[item].values
feat_s = item_features.loc[selected_item].values
sim = np.dot(feat_i, feat_s) / (np.linalg.norm(feat_i) * np.linalg.norm(feat_s))
diversity += (1 - sim)
diversity = diversity / len(reranked)
# Combine score and diversity
combined_score = (1 - diversity_weight) * score + diversity_weight * diversity
if combined_score > best_score:
best_score = combined_score
best_idx = idx
reranked.append(remaining.pop(best_idx))
return reranked
# Advanced techniques
advanced = AdvancedRecommendation(ratings_matrix)
print("\n" + "="*60)
print("ADVANCED RECOMMENDATION TECHNIQUES")
print("="*60)
# NMF
nmf_predictions, user_factors, item_factors = advanced.nmf_factorization(n_components=10)
print("\nNMF Factorization:")
print(f" User factors shape: {user_factors.shape}")
print(f" Item factors shape: {item_factors.shape}")
# Popularity-based
popular_items = advanced.popularity_based(n_recommendations=5)
print("\nMost Popular Items:")
for item, stats in popular_items:
print(f" {item}: Score={stats['score']:.2f}, Count={stats['count']}, Avg={stats['avg_rating']:.2f}")
# Association rules
rules = advanced.association_rules(min_support=0.05)
print("\nTop Association Rules:")
for rule in rules[:5]:
print(f" {rule['antecedent']} → {rule['consequent']}")
print(f" Support: {rule['support']:.3f}, Confidence: {rule['confidence']:.3f}, Lift: {rule['lift']:.3f}")
# Diversity re-ranking
print("\nDiversity Re-ranking Example:")
original_recs = cb_recommendations[:5]
diverse_recs = advanced.diversity_reranking(cb_recommendations, item_features, diversity_weight=0.4)
print(" Original order:")
for item, score in original_recs:
print(f" {item}: {score:.3f}")
print(" After diversity re-ranking:")
for item, score in diverse_recs:
print(f" {item}: {score:.3f}")
class RecommendationEvaluator:
"""Evaluate recommendation system performance"""
def __init__(self):
self.metrics = {}
def evaluate_rating_prediction(self, actual_ratings, predicted_ratings):
"""Evaluate rating prediction accuracy"""
# RMSE
rmse = np.sqrt(mean_squared_error(actual_ratings, predicted_ratings))
# MAE
mae = mean_absolute_error(actual_ratings, predicted_ratings)
# Correlation
correlation = np.corrcoef(actual_ratings, predicted_ratings)[0, 1]
return {
'rmse': rmse,
'mae': mae,
'correlation': correlation
}
def precision_recall_at_k(self, recommendations, relevant_items, k=5):
"""Calculate precision and recall at K"""
# Get top K recommendations
top_k = recommendations[:k]
recommended_items = set([item for item, _ in top_k])
# Calculate metrics
relevant_set = set(relevant_items)
if len(recommended_items) > 0:
precision = len(recommended_items & relevant_set) / len(recommended_items)
else:
precision = 0
if len(relevant_set) > 0:
recall = len(recommended_items & relevant_set) / len(relevant_set)
else:
recall = 0
# F1 score
if precision + recall > 0:
f1 = 2 * precision * recall / (precision + recall)
else:
f1 = 0
return {
'precision@k': precision,
'recall@k': recall,
'f1@k': f1
}
def coverage(self, all_recommendations, all_items):
"""Calculate catalog coverage"""
recommended_items = set()
for recs in all_recommendations:
for item, _ in recs:
recommended_items.add(item)
coverage = len(recommended_items) / len(all_items)
return coverage
def novelty(self, recommendations, item_popularity):
"""Calculate recommendation novelty"""
novelty_scores = []
for item, _ in recommendations:
if item in item_popularity:
# Novelty inversely related to popularity
novelty = 1 / (1 + item_popularity[item])
novelty_scores.append(novelty)
return np.mean(novelty_scores) if novelty_scores else 0
def compare_methods(self, methods_results):
"""Compare different recommendation methods"""
comparison_df = pd.DataFrame(methods_results).T
# Visualize comparison
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# RMSE comparison
if 'rmse' in comparison_df.columns:
axes[0, 0].bar(comparison_df.index, comparison_df['rmse'])
axes[0, 0].set_ylabel('RMSE')
axes[0, 0].set_title('Root Mean Squared Error (Lower is Better)')
axes[0, 0].tick_params(axis='x', rotation=45)
# Precision comparison
if 'precision' in comparison_df.columns:
axes[0, 1].bar(comparison_df.index, comparison_df['precision'])
axes[0, 1].set_ylabel('Precision')
axes[0, 1].set_title('Precision (Higher is Better)')
axes[0, 1].tick_params(axis='x', rotation=45)
# Coverage comparison
if 'coverage' in comparison_df.columns:
axes[1, 0].bar(comparison_df.index, comparison_df['coverage'])
axes[1, 0].set_ylabel('Coverage')
axes[1, 0].set_title('Catalog Coverage (Higher is Better)')
axes[1, 0].tick_params(axis='x', rotation=45)
# Overall scores (normalized)
normalized_scores = comparison_df.copy()
for col in normalized_scores.columns:
if col in ['rmse', 'mae']: # Lower is better
normalized_scores[col] = 1 / (1 + normalized_scores[col])
else: # Higher is better
max_val = normalized_scores[col].max()
if max_val > 0:
normalized_scores[col] = normalized_scores[col] / max_val
overall_scores = normalized_scores.mean(axis=1)
axes[1, 1].bar(overall_scores.index, overall_scores)
axes[1, 1].set_ylabel('Score')
axes[1, 1].set_title('Overall Performance (Normalized)')
axes[1, 1].tick_params(axis='x', rotation=45)
plt.suptitle('Recommendation Methods Comparison', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
return comparison_df
# Evaluation
evaluator = RecommendationEvaluator()
print("\n" + "="*60)
print("EVALUATION AND COMPARISON")
print("="*60)
# Split data for evaluation
train_data, test_data = data_gen.split_data(test_size=0.2)
print(f"\nData Split:")
print(f" Training ratings: {len(train_data)}")
print(f" Test ratings: {len(test_data)}")
# Evaluate different methods
methods_results = {}
# Evaluate SVD
if len(test_data) > 0:
svd_eval = cf.evaluate_predictions(test_data, predictions_df)
if svd_eval['rmse'] is not None:
methods_results['SVD'] = {
'rmse': svd_eval['rmse'],
'mae': svd_eval['mae'],
'coverage': 0.8, # Example value
'precision': 0.75 # Example value
}
# Evaluate NMF
nmf_eval = cf.evaluate_predictions(test_data, nmf_predictions)
if nmf_eval['rmse'] is not None:
methods_results['NMF'] = {
'rmse': nmf_eval['rmse'],
'mae': nmf_eval['mae'],
'coverage': 0.75,
'precision': 0.7
}
# Add example values for other methods
methods_results['User-based CF'] = {
'rmse': 0.95,
'mae': 0.72,
'coverage': 0.6,
'precision': 0.65
}
methods_results['Content-based'] = {
'rmse': 1.05,
'mae': 0.8,
'coverage': 0.9,
'precision': 0.6
}
methods_results['Hybrid'] = {
'rmse': 0.9,
'mae': 0.7,
'coverage': 0.85,
'precision': 0.8
}
print("\nMethods Comparison:")
comparison_df = evaluator.compare_methods(methods_results)
print(comparison_df)
print("\n" + "="*60)
print("RECOMMENDATION SYSTEM BEST PRACTICES")
print("="*60)
best_practices = """
KEY GUIDELINES:
1. DATA QUALITY:
• Handle implicit vs explicit feedback
• Address rating bias (normalization)
• Consider temporal dynamics
• Handle missing data appropriately
• Validate data consistency
2. COLD START SOLUTIONS:
• New Users: Use demographics, popular items
• New Items: Use content features, similar items
• Hybrid approaches for robustness
• Onboarding questionnaires
• Transfer learning from similar domains
3. SCALABILITY:
• Use approximate algorithms (LSH, random sampling)
• Distributed computing (Spark, Dask)
• Caching and pre-computation
• Incremental learning
• Efficient data structures
4. ALGORITHM SELECTION:
• Sparse data: Use model-based CF
• Rich features: Content-based or hybrid
• Large scale: Matrix factorization
• Real-time: Pre-compute or simple methods
• Explainability needed: Item-based or content
5. EVALUATION STRATEGY:
• Offline: Historical data, cross-validation
• Online: A/B testing, interleaving
• Business metrics: CTR, conversion, retention
• User satisfaction surveys
• Long-term engagement tracking
6. DIVERSITY & FAIRNESS:
• Avoid filter bubbles
• Promote long-tail items
• Consider demographic parity
• Temporal diversity
• Explanation diversity
7. IMPLEMENTATION TIPS:
• Start simple, iterate
• Monitor performance metrics
• Handle edge cases gracefully
• Implement fallback strategies
• Regular model updates
"""
print(best_practices)
# Implementation checklist
implementation_checklist = """
IMPLEMENTATION CHECKLIST:
□ Data Pipeline
- Data collection and storage
- ETL processes
- Real-time data streaming
- Data quality monitoring
□ Model Development
- Baseline model (popularity)
- Collaborative filtering
- Content-based filtering
- Hybrid approach
- Deep learning (if applicable)
□ Evaluation Framework
- Offline evaluation metrics
- A/B testing infrastructure
- Business KPI tracking
- User feedback collection
□ Production System
- Model serving API
- Caching layer
- Load balancing
- Monitoring and alerting
- Fallback mechanisms
□ Optimization
- Query optimization
- Model compression
- Distributed computing
- Incremental updates
- Edge computing
□ User Experience
- Explanation generation
- Diversity controls
- Feedback collection
- Personalization settings
- Privacy controls
"""
print(implementation_checklist)
Create a complete movie recommendation system:
Build product recommendation engine:
Implement neural recommendation models: