Skip to main content

šŸš€ MLOps: Machine Learning Operations & Production

Introduction

MLOps (Machine Learning Operations) bridges the gap between model development and production deployment. It encompasses the practices, tools, and culture needed to deploy, monitor, and maintain ML models at scale. From version control and containerization to CI/CD pipelines and model monitoring, MLOps ensures that your models deliver value reliably in production. This lesson covers the MLOps lifecycle, essential tools, model versioning, experiment tracking, and the foundations for building production-ready ML systems.

MLOps Fundamentals

import os
import json
import pickle
import joblib
import hashlib
import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from typing import Dict, List, Any, Optional
import warnings
warnings.filterwarnings('ignore')

# For demonstration purposes
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

print("="*60)
print("MLOPS FUNDAMENTALS")
print("="*60)

mlops_concepts = """
MLOPS KEY CONCEPTS:

1. WHAT IS MLOPS:
   • Set of practices combining ML, DevOps, and Data Engineering
   • Streamlines ML model lifecycle from development to production
   • Ensures reliability, scalability, and maintainability
   • Automates repetitive tasks and workflows

2. MLOPS LIFECYCLE:
   • Data Collection & Preparation
   • Feature Engineering
   • Model Development & Training
   • Model Evaluation & Validation
   • Model Deployment
   • Monitoring & Maintenance
   • Continuous Improvement

3. KEY COMPONENTS:
   • Version Control: Code, data, and model versioning
   • CI/CD: Automated testing and deployment
   • Containerization: Docker, Kubernetes
   • Orchestration: Workflow automation
   • Monitoring: Performance tracking and alerts
   • Infrastructure: Cloud platforms and scaling

4. CHALLENGES ADDRESSED:
   • Model Reproducibility
   • Deployment Complexity
   • Performance Degradation
   • Data/Concept Drift
   • Scaling Issues
   • Team Collaboration

5. TOOLS ECOSYSTEM:
   • Experiment Tracking: MLflow, Weights & Biases, Neptune
   • Model Registry: MLflow, DVC, Kubeflow
   • Deployment: Docker, Kubernetes, Seldon, BentoML
   • Monitoring: Prometheus, Grafana, Evidently AI
   • Orchestration: Airflow, Kubeflow, Prefect
   • Cloud: AWS SageMaker, GCP AI Platform, Azure ML

6. MATURITY LEVELS:
   Level 0: Manual process, no automation
   Level 1: ML pipeline automation
   Level 2: CI/CD pipeline automation
   Level 3: Full MLOps automation

7. BEST PRACTICES:
   • Automate everything possible
   • Version control everything
   • Monitor continuously
   • Test rigorously
   • Document thoroughly
   • Collaborate effectively
"""

print(mlops_concepts)

Model Versioning and Management

class ModelVersionManager:
    """Manage model versions, metadata, and lineage"""
    
    def __init__(self, base_path: str = "./models"):
        self.base_path = Path(base_path)
        self.base_path.mkdir(exist_ok=True)
        self.registry_path = self.base_path / "registry.json"
        self.registry = self.load_registry()
        
    def load_registry(self) -> Dict:
        """Load model registry from disk"""
        if self.registry_path.exists():
            with open(self.registry_path, 'r') as f:
                return json.load(f)
        return {"models": {}}
    
    def save_registry(self):
        """Save model registry to disk"""
        with open(self.registry_path, 'w') as f:
            json.dump(self.registry, f, indent=2, default=str)
    
    def generate_model_id(self, model_name: str) -> str:
        """Generate unique model ID"""
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        return f"{model_name}_{timestamp}"
    
    def calculate_model_hash(self, model) -> str:
        """Calculate model hash for integrity checking"""
        model_bytes = pickle.dumps(model)
        return hashlib.sha256(model_bytes).hexdigest()
    
    def save_model(self, model, model_name: str, metrics: Dict, 
                   metadata: Optional[Dict] = None) -> str:
        """Save model with versioning and metadata"""
        
        # Generate model ID
        model_id = self.generate_model_id(model_name)
        model_path = self.base_path / f"{model_id}.pkl"
        
        # Save model
        joblib.dump(model, model_path)
        
        # Calculate model hash
        model_hash = self.calculate_model_hash(model)
        
        # Prepare model metadata
        model_info = {
            "model_id": model_id,
            "model_name": model_name,
            "path": str(model_path),
            "hash": model_hash,
            "timestamp": datetime.datetime.now().isoformat(),
            "metrics": metrics,
            "metadata": metadata or {},
            "status": "staging"  # staging, production, archived
        }
        
        # Update registry
        if model_name not in self.registry["models"]:
            self.registry["models"][model_name] = []
        
        self.registry["models"][model_name].append(model_info)
        self.save_registry()
        
        print(f"āœ… Model saved: {model_id}")
        print(f"   Path: {model_path}")
        print(f"   Hash: {model_hash[:8]}...")
        print(f"   Metrics: {metrics}")
        
        return model_id
    
    def load_model(self, model_id: str):
        """Load specific model version"""
        # Find model in registry
        for model_name, versions in self.registry["models"].items():
            for version in versions:
                if version["model_id"] == model_id:
                    model = joblib.load(version["path"])
                    return model, version
        
        raise ValueError(f"Model {model_id} not found in registry")
    
    def promote_model(self, model_id: str, status: str = "production"):
        """Promote model to production or other status"""
        for model_name, versions in self.registry["models"].items():
            for version in versions:
                if version["model_id"] == model_id:
                    # Demote current production model if exists
                    if status == "production":
                        for v in versions:
                            if v["status"] == "production":
                                v["status"] = "archived"
                    
                    # Promote selected model
                    version["status"] = status
                    version["promoted_at"] = datetime.datetime.now().isoformat()
                    self.save_registry()
                    
                    print(f"āœ… Model {model_id} promoted to {status}")
                    return
        
        raise ValueError(f"Model {model_id} not found")
    
    def get_production_model(self, model_name: str):
        """Get current production model"""
        if model_name in self.registry["models"]:
            for version in self.registry["models"][model_name]:
                if version["status"] == "production":
                    model = joblib.load(version["path"])
                    return model, version
        
        raise ValueError(f"No production model found for {model_name}")
    
    def list_models(self, model_name: Optional[str] = None):
        """List all models or specific model versions"""
        if model_name:
            if model_name in self.registry["models"]:
                versions = self.registry["models"][model_name]
                df = pd.DataFrame(versions)
                return df[['model_id', 'timestamp', 'status', 'metrics']]
        else:
            all_models = []
            for name, versions in self.registry["models"].items():
                for v in versions:
                    v['model_name'] = name
                    all_models.append(v)
            
            if all_models:
                df = pd.DataFrame(all_models)
                return df[['model_name', 'model_id', 'timestamp', 'status']]
        
        return pd.DataFrame()
    
    def compare_models(self, model_ids: List[str]):
        """Compare metrics across model versions"""
        comparisons = []
        
        for model_id in model_ids:
            for model_name, versions in self.registry["models"].items():
                for version in versions:
                    if version["model_id"] == model_id:
                        comparison = {
                            "model_id": model_id,
                            "status": version["status"],
                            **version["metrics"]
                        }
                        comparisons.append(comparison)
                        break
        
        return pd.DataFrame(comparisons)

# Demonstrate model versioning
print("\n" + "="*60)
print("MODEL VERSIONING DEMO")
print("="*60)

# Create model manager
model_manager = ModelVersionManager()

# Generate sample data and train models
X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train multiple model versions
models_trained = []

for i in range(3):
    # Train model with different parameters
    model = RandomForestClassifier(
        n_estimators=50 + i*50,
        max_depth=5 + i*2,
        random_state=42
    )
    model.fit(X_train, y_train)
    
    # Calculate metrics
    y_pred = model.predict(X_test)
    metrics = {
        "accuracy": round(accuracy_score(y_test, y_pred), 4),
        "precision": round(precision_score(y_test, y_pred), 4),
        "recall": round(recall_score(y_test, y_pred), 4),
        "f1_score": round(f1_score(y_test, y_pred), 4)
    }
    
    # Save model
    metadata = {
        "n_estimators": 50 + i*50,
        "max_depth": 5 + i*2,
        "training_samples": len(X_train),
        "features": X.shape[1]
    }
    
    model_id = model_manager.save_model(
        model, 
        "classifier_v1",
        metrics,
        metadata
    )
    models_trained.append(model_id)
    print()

# List models
print("\nModel Registry:")
print(model_manager.list_models())

# Compare models
print("\nModel Comparison:")
print(model_manager.compare_models(models_trained))

# Promote best model to production
best_model_id = models_trained[1]  # Assume second model is best
model_manager.promote_model(best_model_id, "production")

Experiment Tracking

class ExperimentTracker:
    """Track ML experiments, hyperparameters, and results"""
    
    def __init__(self, experiment_name: str, base_path: str = "./experiments"):
        self.experiment_name = experiment_name
        self.base_path = Path(base_path)
        self.experiment_path = self.base_path / experiment_name
        self.experiment_path.mkdir(parents=True, exist_ok=True)
        self.runs = []
        self.current_run = None
        
    def start_run(self, run_name: Optional[str] = None):
        """Start a new experimental run"""
        run_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        if run_name:
            run_id = f"{run_name}_{run_id}"
        
        self.current_run = {
            "run_id": run_id,
            "start_time": datetime.datetime.now(),
            "parameters": {},
            "metrics": {},
            "artifacts": [],
            "tags": []
        }
        
        print(f"šŸ”¬ Starting run: {run_id}")
        return run_id
    
    def log_params(self, params: Dict[str, Any]):
        """Log hyperparameters"""
        if self.current_run is None:
            raise ValueError("No active run. Call start_run() first.")
        
        self.current_run["parameters"].update(params)
        print(f"šŸ“ Parameters logged: {params}")
    
    def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None):
        """Log metrics"""
        if self.current_run is None:
            raise ValueError("No active run. Call start_run() first.")
        
        if step is not None:
            # Log metrics at specific step (for training curves)
            for key, value in metrics.items():
                if key not in self.current_run["metrics"]:
                    self.current_run["metrics"][key] = []
                self.current_run["metrics"][key].append((step, value))
        else:
            # Log single metrics
            for key, value in metrics.items():
                self.current_run["metrics"][key] = value
        
        print(f"šŸ“Š Metrics logged: {metrics}")
    
    def log_artifact(self, artifact_path: str, artifact_type: str = "file"):
        """Log artifacts (models, plots, data)"""
        if self.current_run is None:
            raise ValueError("No active run. Call start_run() first.")
        
        artifact_info = {
            "path": artifact_path,
            "type": artifact_type,
            "timestamp": datetime.datetime.now().isoformat()
        }
        self.current_run["artifacts"].append(artifact_info)
        print(f"šŸ“¦ Artifact logged: {artifact_path}")
    
    def add_tags(self, tags: List[str]):
        """Add tags to current run"""
        if self.current_run is None:
            raise ValueError("No active run. Call start_run() first.")
        
        self.current_run["tags"].extend(tags)
        print(f"šŸ·ļø  Tags added: {tags}")
    
    def end_run(self):
        """End current run and save results"""
        if self.current_run is None:
            raise ValueError("No active run to end.")
        
        self.current_run["end_time"] = datetime.datetime.now()
        self.current_run["duration"] = (
            self.current_run["end_time"] - self.current_run["start_time"]
        ).total_seconds()
        
        # Save run data
        run_file = self.experiment_path / f"{self.current_run['run_id']}.json"
        with open(run_file, 'w') as f:
            json.dump(self.current_run, f, indent=2, default=str)
        
        self.runs.append(self.current_run)
        print(f"āœ… Run ended: {self.current_run['run_id']}")
        print(f"   Duration: {self.current_run['duration']:.2f} seconds")
        
        self.current_run = None
    
    def get_all_runs(self) -> pd.DataFrame:
        """Get all runs as DataFrame"""
        all_runs = []
        
        for run_file in self.experiment_path.glob("*.json"):
            with open(run_file, 'r') as f:
                run_data = json.load(f)
                
                # Flatten run data for DataFrame
                flat_run = {
                    "run_id": run_data["run_id"],
                    "duration": run_data.get("duration", 0),
                    **run_data["parameters"],
                    **{k: v if not isinstance(v, list) else v[-1][1] 
                       for k, v in run_data["metrics"].items()}
                }
                all_runs.append(flat_run)
        
        return pd.DataFrame(all_runs)
    
    def plot_metric_history(self, metric_name: str):
        """Plot metric history across runs"""
        fig, ax = plt.subplots(figsize=(10, 6))
        
        for run_file in self.experiment_path.glob("*.json"):
            with open(run_file, 'r') as f:
                run_data = json.load(f)
                
                if metric_name in run_data["metrics"]:
                    metric_data = run_data["metrics"][metric_name]
                    
                    if isinstance(metric_data, list):
                        # Plot training curve
                        steps, values = zip(*metric_data)
                        ax.plot(steps, values, label=run_data["run_id"], 
                               marker='o', markersize=4)
                    else:
                        # Single value
                        ax.axhline(y=metric_data, label=run_data["run_id"], 
                                  linestyle='--')
        
        ax.set_xlabel('Step')
        ax.set_ylabel(metric_name)
        ax.set_title(f'{metric_name} History - {self.experiment_name}')
        ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
        ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
    
    def find_best_run(self, metric_name: str, mode: str = 'max') -> Dict:
        """Find best run based on metric"""
        best_run = None
        best_value = float('-inf') if mode == 'max' else float('inf')
        
        for run_file in self.experiment_path.glob("*.json"):
            with open(run_file, 'r') as f:
                run_data = json.load(f)
                
                if metric_name in run_data["metrics"]:
                    metric_value = run_data["metrics"][metric_name]
                    
                    if isinstance(metric_value, list):
                        metric_value = metric_value[-1][1]  # Last value
                    
                    if (mode == 'max' and metric_value > best_value) or \
                       (mode == 'min' and metric_value < best_value):
                        best_value = metric_value
                        best_run = run_data
        
        return best_run

# Demonstrate experiment tracking
print("\n" + "="*60)
print("EXPERIMENT TRACKING DEMO")
print("="*60)

# Create experiment tracker
tracker = ExperimentTracker("hyperparameter_tuning")

# Run multiple experiments
hyperparameters = [
    {"n_estimators": 50, "max_depth": 5, "min_samples_split": 5},
    {"n_estimators": 100, "max_depth": 10, "min_samples_split": 2},
    {"n_estimators": 200, "max_depth": 15, "min_samples_split": 10}
]

for i, params in enumerate(hyperparameters):
    # Start run
    tracker.start_run(f"run_{i+1}")
    
    # Log parameters
    tracker.log_params(params)
    
    # Train model
    model = RandomForestClassifier(**params, random_state=42)
    
    # Simulate training with epochs
    for epoch in range(5):
        # Training step
        model.fit(X_train, y_train)
        
        # Calculate metrics
        train_pred = model.predict(X_train)
        val_pred = model.predict(X_test)
        
        train_acc = accuracy_score(y_train, train_pred)
        val_acc = accuracy_score(y_test, val_pred)
        
        # Log metrics
        tracker.log_metrics({
            "train_accuracy": train_acc,
            "val_accuracy": val_acc
        }, step=epoch)
    
    # Log final metrics
    final_metrics = {
        "final_accuracy": accuracy_score(y_test, val_pred),
        "final_f1": f1_score(y_test, val_pred)
    }
    tracker.log_metrics(final_metrics)
    
    # Add tags
    tracker.add_tags([f"depth_{params['max_depth']}", "random_forest"])
    
    # End run
    tracker.end_run()
    print()

# Visualize results
print("\nAll Experiment Runs:")
print(tracker.get_all_runs())

print("\nPlotting validation accuracy history:")
tracker.plot_metric_history("val_accuracy")

# Find best run
best_run = tracker.find_best_run("final_accuracy", mode='max')
if best_run:
    print(f"\nBest Run: {best_run['run_id']}")
    print(f"Parameters: {best_run['parameters']}")
    print(f"Best Accuracy: {best_run['metrics']['final_accuracy']}")

Model Deployment Pipeline

class ModelDeploymentPipeline:
    """Pipeline for model deployment and serving"""
    
    def __init__(self, model_name: str):
        self.model_name = model_name
        self.deployment_path = Path("./deployments") / model_name
        self.deployment_path.mkdir(parents=True, exist_ok=True)
        
    def create_model_api(self, model, preprocessing_fn=None):
        """Create API wrapper for model serving"""
        
        api_code = '''
import pickle
import numpy as np
from flask import Flask, request, jsonify
import joblib

app = Flask(__name__)

# Load model
model = joblib.load('model.pkl')

@app.route('/health', methods=['GET'])
def health_check():
    """Health check endpoint"""
    return jsonify({"status": "healthy", "model": "''' + self.model_name + '''"})

@app.route('/predict', methods=['POST'])
def predict():
    """Prediction endpoint"""
    try:
        # Get data from request
        data = request.json
        
        # Convert to numpy array
        features = np.array(data['features']).reshape(1, -1)
        
        # Make prediction
        prediction = model.predict(features)[0]
        probability = model.predict_proba(features)[0].tolist()
        
        return jsonify({
            "prediction": int(prediction),
            "probability": probability,
            "model_name": "''' + self.model_name + '''"
        })
    except Exception as e:
        return jsonify({"error": str(e)}), 400

@app.route('/batch_predict', methods=['POST'])
def batch_predict():
    """Batch prediction endpoint"""
    try:
        # Get data from request
        data = request.json
        
        # Convert to numpy array
        features = np.array(data['features'])
        
        # Make predictions
        predictions = model.predict(features).tolist()
        probabilities = model.predict_proba(features).tolist()
        
        return jsonify({
            "predictions": predictions,
            "probabilities": probabilities,
            "batch_size": len(predictions)
        })
    except Exception as e:
        return jsonify({"error": str(e)}), 400

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=False)
'''
        
        # Save API code
        api_path = self.deployment_path / "app.py"
        with open(api_path, 'w') as f:
            f.write(api_code)
        
        print(f"āœ… API code created: {api_path}")
        
        return api_path
    
    def create_dockerfile(self):
        """Create Dockerfile for containerization"""
        
        dockerfile_content = f'''
FROM python:3.9-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy model and app
COPY model.pkl .
COPY app.py .

# Expose port
EXPOSE 5000

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \\
    CMD curl -f http://localhost:5000/health || exit 1

# Run app
CMD ["python", "app.py"]
'''
        
        # Save Dockerfile
        dockerfile_path = self.deployment_path / "Dockerfile"
        with open(dockerfile_path, 'w') as f:
            f.write(dockerfile_content.strip())
        
        print(f"āœ… Dockerfile created: {dockerfile_path}")
        
        return dockerfile_path
    
    def create_requirements(self):
        """Create requirements.txt for dependencies"""
        
        requirements = '''
flask==2.3.0
numpy==1.24.0
scikit-learn==1.3.0
joblib==1.3.0
gunicorn==21.2.0
'''
        
        # Save requirements
        req_path = self.deployment_path / "requirements.txt"
        with open(req_path, 'w') as f:
            f.write(requirements.strip())
        
        print(f"āœ… Requirements file created: {req_path}")
        
        return req_path
    
    def create_kubernetes_deployment(self):
        """Create Kubernetes deployment manifest"""
        
        k8s_yaml = f'''
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {self.model_name}-deployment
  labels:
    app: {self.model_name}
spec:
  replicas: 3
  selector:
    matchLabels:
      app: {self.model_name}
  template:
    metadata:
      labels:
        app: {self.model_name}
    spec:
      containers:
      - name: {self.model_name}
        image: {self.model_name}:latest
        ports:
        - containerPort: 5000
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 5000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 5000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: {self.model_name}-service
spec:
  selector:
    app: {self.model_name}
  ports:
    - protocol: TCP
      port: 80
      targetPort: 5000
  type: LoadBalancer
'''
        
        # Save Kubernetes manifest
        k8s_path = self.deployment_path / "kubernetes.yaml"
        with open(k8s_path, 'w') as f:
            f.write(k8s_yaml.strip())
        
        print(f"āœ… Kubernetes manifest created: {k8s_path}")
        
        return k8s_path
    
    def create_deployment_package(self, model):
        """Create complete deployment package"""
        
        print(f"\nšŸ“¦ Creating deployment package for {self.model_name}")
        print("-" * 40)
        
        # Save model
        model_path = self.deployment_path / "model.pkl"
        joblib.dump(model, model_path)
        print(f"āœ… Model saved: {model_path}")
        
        # Create API
        self.create_model_api(model)
        
        # Create Dockerfile
        self.create_dockerfile()
        
        # Create requirements
        self.create_requirements()
        
        # Create Kubernetes manifest
        self.create_kubernetes_deployment()
        
        # Create deployment script
        deploy_script = f'''#!/bin/bash

echo "Deploying {self.model_name}..."

# Build Docker image
docker build -t {self.model_name}:latest .

# Run locally for testing
# docker run -p 5000:5000 {self.model_name}:latest

# Push to registry (update with your registry)
# docker tag {self.model_name}:latest your-registry/{self.model_name}:latest
# docker push your-registry/{self.model_name}:latest

# Deploy to Kubernetes
# kubectl apply -f kubernetes.yaml

echo "Deployment complete!"
'''
        
        deploy_script_path = self.deployment_path / "deploy.sh"
        with open(deploy_script_path, 'w') as f:
            f.write(deploy_script.strip())
        
        print(f"āœ… Deployment script created: {deploy_script_path}")
        
        print(f"\n✨ Deployment package ready at: {self.deployment_path}")
        print("\nNext steps:")
        print("1. cd", self.deployment_path)
        print("2. chmod +x deploy.sh")
        print("3. ./deploy.sh")

# Create deployment pipeline
print("\n" + "="*60)
print("MODEL DEPLOYMENT PIPELINE")
print("="*60)

# Train a model for deployment
model_for_deployment = RandomForestClassifier(n_estimators=100, random_state=42)
model_for_deployment.fit(X_train, y_train)

# Create deployment package
deployment = ModelDeploymentPipeline("production_classifier")
deployment.create_deployment_package(model_for_deployment)

Model Monitoring and Observability

class ModelMonitor:
    """Monitor model performance and data drift"""
    
    def __init__(self, model_name: str):
        self.model_name = model_name
        self.metrics_history = []
        self.alerts = []
        self.baseline_metrics = None
        
    def set_baseline(self, metrics: Dict[str, float]):
        """Set baseline metrics for comparison"""
        self.baseline_metrics = metrics
        print(f"šŸ“Š Baseline metrics set: {metrics}")
    
    def log_prediction(self, features, prediction, actual=None, timestamp=None):
        """Log individual prediction for monitoring"""
        if timestamp is None:
            timestamp = datetime.datetime.now()
        
        log_entry = {
            "timestamp": timestamp,
            "features": features.tolist() if hasattr(features, 'tolist') else features,
            "prediction": prediction,
            "actual": actual
        }
        
        # In production, this would go to a database or monitoring service
        return log_entry
    
    def calculate_metrics(self, predictions, actuals):
        """Calculate performance metrics"""
        metrics = {
            "accuracy": accuracy_score(actuals, predictions),
            "precision": precision_score(actuals, predictions, average='weighted'),
            "recall": recall_score(actuals, predictions, average='weighted'),
            "f1_score": f1_score(actuals, predictions, average='weighted')
        }
        return metrics
    
    def check_drift(self, current_metrics: Dict[str, float], threshold: float = 0.1):
        """Check for performance drift"""
        if self.baseline_metrics is None:
            print("āš ļø  No baseline metrics set")
            return []
        
        drift_alerts = []
        
        for metric_name, current_value in current_metrics.items():
            if metric_name in self.baseline_metrics:
                baseline_value = self.baseline_metrics[metric_name]
                drift = abs(current_value - baseline_value)
                drift_percentage = (drift / baseline_value) * 100
                
                if drift > threshold:
                    alert = {
                        "metric": metric_name,
                        "baseline": baseline_value,
                        "current": current_value,
                        "drift": drift,
                        "drift_percentage": drift_percentage,
                        "timestamp": datetime.datetime.now()
                    }
                    drift_alerts.append(alert)
                    self.alerts.append(alert)
                    
                    print(f"āš ļø  DRIFT ALERT: {metric_name}")
                    print(f"   Baseline: {baseline_value:.4f}")
                    print(f"   Current: {current_value:.4f}")
                    print(f"   Drift: {drift_percentage:.2f}%")
        
        return drift_alerts
    
    def visualize_monitoring_dashboard(self):
        """Create monitoring dashboard visualization"""
        
        # Simulate historical metrics
        np.random.seed(42)
        hours = 24
        timestamps = pd.date_range(end=datetime.datetime.now(), periods=hours, freq='H')
        
        # Simulate metrics with some drift
        accuracy_values = 0.95 - np.random.exponential(0.01, hours) * 0.1
        precision_values = 0.93 - np.random.exponential(0.01, hours) * 0.08
        recall_values = 0.94 - np.random.exponential(0.01, hours) * 0.09
        latency_values = 50 + np.random.exponential(10, hours)
        
        # Create dashboard
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        
        # Accuracy over time
        axes[0, 0].plot(timestamps, accuracy_values, marker='o', markersize=4)
        axes[0, 0].axhline(y=0.95, color='g', linestyle='--', label='Baseline')
        axes[0, 0].axhline(y=0.85, color='r', linestyle='--', label='Alert Threshold')
        axes[0, 0].fill_between(timestamps, 0.85, accuracy_values, 
                               where=(accuracy_values < 0.85), 
                               color='red', alpha=0.3, label='Alert Zone')
        axes[0, 0].set_xlabel('Time')
        axes[0, 0].set_ylabel('Accuracy')
        axes[0, 0].set_title('Model Accuracy Monitoring')
        axes[0, 0].legend()
        axes[0, 0].grid(True, alpha=0.3)
        axes[0, 0].tick_params(axis='x', rotation=45)
        
        # Precision vs Recall
        axes[0, 1].scatter(precision_values, recall_values, c=range(hours), 
                          cmap='viridis', alpha=0.6)
        axes[0, 1].set_xlabel('Precision')
        axes[0, 1].set_ylabel('Recall')
        axes[0, 1].set_title('Precision vs Recall Drift')
        axes[0, 1].grid(True, alpha=0.3)
        
        # Latency monitoring
        axes[1, 0].plot(timestamps, latency_values, color='orange', 
                       marker='s', markersize=4)
        axes[1, 0].axhline(y=100, color='r', linestyle='--', label='SLA Limit')
        axes[1, 0].fill_between(timestamps, 100, latency_values,
                               where=(latency_values > 100),
                               color='red', alpha=0.3, label='SLA Violation')
        axes[1, 0].set_xlabel('Time')
        axes[1, 0].set_ylabel('Latency (ms)')
        axes[1, 0].set_title('Prediction Latency')
        axes[1, 0].legend()
        axes[1, 0].grid(True, alpha=0.3)
        axes[1, 0].tick_params(axis='x', rotation=45)
        
        # Alert summary
        alert_types = ['Accuracy', 'Precision', 'Recall', 'Latency']
        alert_counts = [3, 1, 2, 5]
        colors = ['red' if c > 2 else 'orange' for c in alert_counts]
        
        axes[1, 1].bar(alert_types, alert_counts, color=colors)
        axes[1, 1].set_ylabel('Alert Count (24h)')
        axes[1, 1].set_title('Alert Summary')
        axes[1, 1].grid(True, alpha=0.3, axis='y')
        
        # Add status text
        for i, (type_, count) in enumerate(zip(alert_types, alert_counts)):
            axes[1, 1].text(i, count + 0.1, str(count), ha='center')
        
        plt.suptitle(f'Model Monitoring Dashboard - {self.model_name}', 
                    fontsize=14, y=1.02)
        plt.tight_layout()
        plt.show()
    
    def generate_monitoring_report(self):
        """Generate monitoring report"""
        
        report = f"""
MODEL MONITORING REPORT
=======================
Model: {self.model_name}
Generated: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

CURRENT STATUS
--------------
Overall Health: {'āš ļø DEGRADED' if self.alerts else 'āœ… HEALTHY'}
Active Alerts: {len(self.alerts)}

BASELINE METRICS
----------------"""
        
        if self.baseline_metrics:
            for metric, value in self.baseline_metrics.items():
                report += f"\n{metric}: {value:.4f}"
        else:
            report += "\nNo baseline metrics set"
        
        report += "\n\nRECENT ALERTS\n-------------"
        
        if self.alerts:
            for alert in self.alerts[-5:]:  # Last 5 alerts
                report += f"\n- {alert['metric']}: {alert['drift_percentage']:.2f}% drift"
        else:
            report += "\nNo recent alerts"
        
        report += """

RECOMMENDATIONS
---------------
1. Review training data distribution
2. Check for data quality issues
3. Consider model retraining
4. Validate feature engineering pipeline
5. Monitor business metrics correlation
"""
        
        return report

# Demonstrate monitoring
print("\n" + "="*60)
print("MODEL MONITORING DEMO")
print("="*60)

# Create monitor
monitor = ModelMonitor("production_classifier")

# Set baseline metrics
baseline = {
    "accuracy": 0.95,
    "precision": 0.93,
    "recall": 0.94,
    "f1_score": 0.935
}
monitor.set_baseline(baseline)

# Simulate production metrics with drift
print("\nšŸ“Š Checking for drift...")
current_metrics = {
    "accuracy": 0.87,  # Significant drift
    "precision": 0.91,
    "recall": 0.88,  # Significant drift
    "f1_score": 0.895
}

# Check for drift
drift_alerts = monitor.check_drift(current_metrics, threshold=0.05)

# Visualize dashboard
print("\nšŸ“ˆ Generating monitoring dashboard...")
monitor.visualize_monitoring_dashboard()

# Generate report
print("\nšŸ“„ Monitoring Report:")
print(monitor.generate_monitoring_report())

Best Practices and Production Checklist

print("\n" + "="*60)
print("MLOPS BEST PRACTICES")
print("="*60)

best_practices = """
KEY BEST PRACTICES:

1. VERSION CONTROL:
   • Version code, data, and models
   • Use semantic versioning (major.minor.patch)
   • Tag releases and maintain changelog
   • Store model artifacts with metadata

2. REPRODUCIBILITY:
   • Set random seeds everywhere
   • Pin dependency versions
   • Document data sources and preprocessing
   • Use configuration files over hardcoding

3. TESTING:
   • Unit tests for data processing
   • Integration tests for pipelines
   • Model performance tests
   • A/B testing in production
   • Shadow deployment before full rollout

4. MONITORING:
   • Track model performance metrics
   • Monitor data distribution drift
   • Log predictions and errors
   • Set up alerts for anomalies
   • Regular model audits

5. DOCUMENTATION:
   • Model cards with capabilities/limitations
   • API documentation
   • Data dictionaries
   • Architecture decision records
   • Runbooks for common issues

6. SECURITY:
   • Encrypt sensitive data
   • Secure API endpoints
   • Implement rate limiting
   • Regular security audits
   • Access control and authentication

7. SCALABILITY:
   • Horizontal scaling with containers
   • Load balancing
   • Caching strategies
   • Asynchronous processing
   • Auto-scaling policies
"""

print(best_practices)

production_checklist = """
PRODUCTION DEPLOYMENT CHECKLIST:

Pre-Deployment:
ā–” Model performance meets requirements
ā–” Code review completed
ā–” Unit and integration tests pass
ā–” Documentation updated
ā–” Security review done
ā–” Load testing completed
ā–” Rollback plan prepared
ā–” Monitoring configured

Deployment:
ā–” Create deployment artifacts
ā–” Tag release version
ā–” Deploy to staging environment
ā–” Run smoke tests
ā–” Perform canary deployment
ā–” Monitor initial performance
ā–” Gradual traffic rollout

Post-Deployment:
ā–” Monitor metrics closely (first 24h)
ā–” Check for data drift
ā–” Validate business metrics
ā–” Gather user feedback
ā–” Document lessons learned
ā–” Update runbooks
ā–” Plan next iteration

Maintenance:
ā–” Regular model evaluation
ā–” Scheduled retraining
ā–” Update dependencies
ā–” Performance optimization
ā–” Cost monitoring
ā–” Compliance checks
ā–” Disaster recovery testing
"""

print("\nPRODUCTION CHECKLIST:")
print(production_checklist)

Practice Exercises

Exercise 1: Build Complete MLOps Pipeline

Create an end-to-end MLOps pipeline:

  1. Set up experiment tracking with MLflow
  2. Implement automated model training
  3. Create model registry with versioning
  4. Build CI/CD pipeline with GitHub Actions
  5. Deploy to cloud platform (AWS/GCP/Azure)
  6. Set up monitoring and alerting

Exercise 2: Implement Model A/B Testing

Build A/B testing framework:

  1. Deploy multiple model versions
  2. Implement traffic splitting
  3. Track performance metrics per version
  4. Statistical significance testing
  5. Automated winner selection
  6. Gradual rollout strategy

Exercise 3: Data Drift Detection System

Create comprehensive drift detection:

  1. Implement statistical drift tests
  2. Monitor feature distributions
  3. Detect concept drift
  4. Build automated retraining triggers
  5. Create drift visualization dashboard
  6. Set up alerting system

Summary and Key Takeaways

šŸŽÆ Key Points to Remember