MLOps (Machine Learning Operations) bridges the gap between model development and production deployment. It encompasses the practices, tools, and culture needed to deploy, monitor, and maintain ML models at scale. From version control and containerization to CI/CD pipelines and model monitoring, MLOps ensures that your models deliver value reliably in production. This lesson covers the MLOps lifecycle, essential tools, model versioning, experiment tracking, and the foundations for building production-ready ML systems.
import os
import json
import pickle
import joblib
import hashlib
import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from typing import Dict, List, Any, Optional
import warnings
warnings.filterwarnings('ignore')
# For demonstration purposes
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
print("="*60)
print("MLOPS FUNDAMENTALS")
print("="*60)
mlops_concepts = """
MLOPS KEY CONCEPTS:
1. WHAT IS MLOPS:
⢠Set of practices combining ML, DevOps, and Data Engineering
⢠Streamlines ML model lifecycle from development to production
⢠Ensures reliability, scalability, and maintainability
⢠Automates repetitive tasks and workflows
2. MLOPS LIFECYCLE:
⢠Data Collection & Preparation
⢠Feature Engineering
⢠Model Development & Training
⢠Model Evaluation & Validation
⢠Model Deployment
⢠Monitoring & Maintenance
⢠Continuous Improvement
3. KEY COMPONENTS:
⢠Version Control: Code, data, and model versioning
⢠CI/CD: Automated testing and deployment
⢠Containerization: Docker, Kubernetes
⢠Orchestration: Workflow automation
⢠Monitoring: Performance tracking and alerts
⢠Infrastructure: Cloud platforms and scaling
4. CHALLENGES ADDRESSED:
⢠Model Reproducibility
⢠Deployment Complexity
⢠Performance Degradation
⢠Data/Concept Drift
⢠Scaling Issues
⢠Team Collaboration
5. TOOLS ECOSYSTEM:
⢠Experiment Tracking: MLflow, Weights & Biases, Neptune
⢠Model Registry: MLflow, DVC, Kubeflow
⢠Deployment: Docker, Kubernetes, Seldon, BentoML
⢠Monitoring: Prometheus, Grafana, Evidently AI
⢠Orchestration: Airflow, Kubeflow, Prefect
⢠Cloud: AWS SageMaker, GCP AI Platform, Azure ML
6. MATURITY LEVELS:
Level 0: Manual process, no automation
Level 1: ML pipeline automation
Level 2: CI/CD pipeline automation
Level 3: Full MLOps automation
7. BEST PRACTICES:
⢠Automate everything possible
⢠Version control everything
⢠Monitor continuously
⢠Test rigorously
⢠Document thoroughly
⢠Collaborate effectively
"""
print(mlops_concepts)
class ModelVersionManager:
"""Manage model versions, metadata, and lineage"""
def __init__(self, base_path: str = "./models"):
self.base_path = Path(base_path)
self.base_path.mkdir(exist_ok=True)
self.registry_path = self.base_path / "registry.json"
self.registry = self.load_registry()
def load_registry(self) -> Dict:
"""Load model registry from disk"""
if self.registry_path.exists():
with open(self.registry_path, 'r') as f:
return json.load(f)
return {"models": {}}
def save_registry(self):
"""Save model registry to disk"""
with open(self.registry_path, 'w') as f:
json.dump(self.registry, f, indent=2, default=str)
def generate_model_id(self, model_name: str) -> str:
"""Generate unique model ID"""
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
return f"{model_name}_{timestamp}"
def calculate_model_hash(self, model) -> str:
"""Calculate model hash for integrity checking"""
model_bytes = pickle.dumps(model)
return hashlib.sha256(model_bytes).hexdigest()
def save_model(self, model, model_name: str, metrics: Dict,
metadata: Optional[Dict] = None) -> str:
"""Save model with versioning and metadata"""
# Generate model ID
model_id = self.generate_model_id(model_name)
model_path = self.base_path / f"{model_id}.pkl"
# Save model
joblib.dump(model, model_path)
# Calculate model hash
model_hash = self.calculate_model_hash(model)
# Prepare model metadata
model_info = {
"model_id": model_id,
"model_name": model_name,
"path": str(model_path),
"hash": model_hash,
"timestamp": datetime.datetime.now().isoformat(),
"metrics": metrics,
"metadata": metadata or {},
"status": "staging" # staging, production, archived
}
# Update registry
if model_name not in self.registry["models"]:
self.registry["models"][model_name] = []
self.registry["models"][model_name].append(model_info)
self.save_registry()
print(f"ā
Model saved: {model_id}")
print(f" Path: {model_path}")
print(f" Hash: {model_hash[:8]}...")
print(f" Metrics: {metrics}")
return model_id
def load_model(self, model_id: str):
"""Load specific model version"""
# Find model in registry
for model_name, versions in self.registry["models"].items():
for version in versions:
if version["model_id"] == model_id:
model = joblib.load(version["path"])
return model, version
raise ValueError(f"Model {model_id} not found in registry")
def promote_model(self, model_id: str, status: str = "production"):
"""Promote model to production or other status"""
for model_name, versions in self.registry["models"].items():
for version in versions:
if version["model_id"] == model_id:
# Demote current production model if exists
if status == "production":
for v in versions:
if v["status"] == "production":
v["status"] = "archived"
# Promote selected model
version["status"] = status
version["promoted_at"] = datetime.datetime.now().isoformat()
self.save_registry()
print(f"ā
Model {model_id} promoted to {status}")
return
raise ValueError(f"Model {model_id} not found")
def get_production_model(self, model_name: str):
"""Get current production model"""
if model_name in self.registry["models"]:
for version in self.registry["models"][model_name]:
if version["status"] == "production":
model = joblib.load(version["path"])
return model, version
raise ValueError(f"No production model found for {model_name}")
def list_models(self, model_name: Optional[str] = None):
"""List all models or specific model versions"""
if model_name:
if model_name in self.registry["models"]:
versions = self.registry["models"][model_name]
df = pd.DataFrame(versions)
return df[['model_id', 'timestamp', 'status', 'metrics']]
else:
all_models = []
for name, versions in self.registry["models"].items():
for v in versions:
v['model_name'] = name
all_models.append(v)
if all_models:
df = pd.DataFrame(all_models)
return df[['model_name', 'model_id', 'timestamp', 'status']]
return pd.DataFrame()
def compare_models(self, model_ids: List[str]):
"""Compare metrics across model versions"""
comparisons = []
for model_id in model_ids:
for model_name, versions in self.registry["models"].items():
for version in versions:
if version["model_id"] == model_id:
comparison = {
"model_id": model_id,
"status": version["status"],
**version["metrics"]
}
comparisons.append(comparison)
break
return pd.DataFrame(comparisons)
# Demonstrate model versioning
print("\n" + "="*60)
print("MODEL VERSIONING DEMO")
print("="*60)
# Create model manager
model_manager = ModelVersionManager()
# Generate sample data and train models
X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train multiple model versions
models_trained = []
for i in range(3):
# Train model with different parameters
model = RandomForestClassifier(
n_estimators=50 + i*50,
max_depth=5 + i*2,
random_state=42
)
model.fit(X_train, y_train)
# Calculate metrics
y_pred = model.predict(X_test)
metrics = {
"accuracy": round(accuracy_score(y_test, y_pred), 4),
"precision": round(precision_score(y_test, y_pred), 4),
"recall": round(recall_score(y_test, y_pred), 4),
"f1_score": round(f1_score(y_test, y_pred), 4)
}
# Save model
metadata = {
"n_estimators": 50 + i*50,
"max_depth": 5 + i*2,
"training_samples": len(X_train),
"features": X.shape[1]
}
model_id = model_manager.save_model(
model,
"classifier_v1",
metrics,
metadata
)
models_trained.append(model_id)
print()
# List models
print("\nModel Registry:")
print(model_manager.list_models())
# Compare models
print("\nModel Comparison:")
print(model_manager.compare_models(models_trained))
# Promote best model to production
best_model_id = models_trained[1] # Assume second model is best
model_manager.promote_model(best_model_id, "production")
class ExperimentTracker:
"""Track ML experiments, hyperparameters, and results"""
def __init__(self, experiment_name: str, base_path: str = "./experiments"):
self.experiment_name = experiment_name
self.base_path = Path(base_path)
self.experiment_path = self.base_path / experiment_name
self.experiment_path.mkdir(parents=True, exist_ok=True)
self.runs = []
self.current_run = None
def start_run(self, run_name: Optional[str] = None):
"""Start a new experimental run"""
run_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
if run_name:
run_id = f"{run_name}_{run_id}"
self.current_run = {
"run_id": run_id,
"start_time": datetime.datetime.now(),
"parameters": {},
"metrics": {},
"artifacts": [],
"tags": []
}
print(f"š¬ Starting run: {run_id}")
return run_id
def log_params(self, params: Dict[str, Any]):
"""Log hyperparameters"""
if self.current_run is None:
raise ValueError("No active run. Call start_run() first.")
self.current_run["parameters"].update(params)
print(f"š Parameters logged: {params}")
def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None):
"""Log metrics"""
if self.current_run is None:
raise ValueError("No active run. Call start_run() first.")
if step is not None:
# Log metrics at specific step (for training curves)
for key, value in metrics.items():
if key not in self.current_run["metrics"]:
self.current_run["metrics"][key] = []
self.current_run["metrics"][key].append((step, value))
else:
# Log single metrics
for key, value in metrics.items():
self.current_run["metrics"][key] = value
print(f"š Metrics logged: {metrics}")
def log_artifact(self, artifact_path: str, artifact_type: str = "file"):
"""Log artifacts (models, plots, data)"""
if self.current_run is None:
raise ValueError("No active run. Call start_run() first.")
artifact_info = {
"path": artifact_path,
"type": artifact_type,
"timestamp": datetime.datetime.now().isoformat()
}
self.current_run["artifacts"].append(artifact_info)
print(f"š¦ Artifact logged: {artifact_path}")
def add_tags(self, tags: List[str]):
"""Add tags to current run"""
if self.current_run is None:
raise ValueError("No active run. Call start_run() first.")
self.current_run["tags"].extend(tags)
print(f"š·ļø Tags added: {tags}")
def end_run(self):
"""End current run and save results"""
if self.current_run is None:
raise ValueError("No active run to end.")
self.current_run["end_time"] = datetime.datetime.now()
self.current_run["duration"] = (
self.current_run["end_time"] - self.current_run["start_time"]
).total_seconds()
# Save run data
run_file = self.experiment_path / f"{self.current_run['run_id']}.json"
with open(run_file, 'w') as f:
json.dump(self.current_run, f, indent=2, default=str)
self.runs.append(self.current_run)
print(f"ā
Run ended: {self.current_run['run_id']}")
print(f" Duration: {self.current_run['duration']:.2f} seconds")
self.current_run = None
def get_all_runs(self) -> pd.DataFrame:
"""Get all runs as DataFrame"""
all_runs = []
for run_file in self.experiment_path.glob("*.json"):
with open(run_file, 'r') as f:
run_data = json.load(f)
# Flatten run data for DataFrame
flat_run = {
"run_id": run_data["run_id"],
"duration": run_data.get("duration", 0),
**run_data["parameters"],
**{k: v if not isinstance(v, list) else v[-1][1]
for k, v in run_data["metrics"].items()}
}
all_runs.append(flat_run)
return pd.DataFrame(all_runs)
def plot_metric_history(self, metric_name: str):
"""Plot metric history across runs"""
fig, ax = plt.subplots(figsize=(10, 6))
for run_file in self.experiment_path.glob("*.json"):
with open(run_file, 'r') as f:
run_data = json.load(f)
if metric_name in run_data["metrics"]:
metric_data = run_data["metrics"][metric_name]
if isinstance(metric_data, list):
# Plot training curve
steps, values = zip(*metric_data)
ax.plot(steps, values, label=run_data["run_id"],
marker='o', markersize=4)
else:
# Single value
ax.axhline(y=metric_data, label=run_data["run_id"],
linestyle='--')
ax.set_xlabel('Step')
ax.set_ylabel(metric_name)
ax.set_title(f'{metric_name} History - {self.experiment_name}')
ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def find_best_run(self, metric_name: str, mode: str = 'max') -> Dict:
"""Find best run based on metric"""
best_run = None
best_value = float('-inf') if mode == 'max' else float('inf')
for run_file in self.experiment_path.glob("*.json"):
with open(run_file, 'r') as f:
run_data = json.load(f)
if metric_name in run_data["metrics"]:
metric_value = run_data["metrics"][metric_name]
if isinstance(metric_value, list):
metric_value = metric_value[-1][1] # Last value
if (mode == 'max' and metric_value > best_value) or \
(mode == 'min' and metric_value < best_value):
best_value = metric_value
best_run = run_data
return best_run
# Demonstrate experiment tracking
print("\n" + "="*60)
print("EXPERIMENT TRACKING DEMO")
print("="*60)
# Create experiment tracker
tracker = ExperimentTracker("hyperparameter_tuning")
# Run multiple experiments
hyperparameters = [
{"n_estimators": 50, "max_depth": 5, "min_samples_split": 5},
{"n_estimators": 100, "max_depth": 10, "min_samples_split": 2},
{"n_estimators": 200, "max_depth": 15, "min_samples_split": 10}
]
for i, params in enumerate(hyperparameters):
# Start run
tracker.start_run(f"run_{i+1}")
# Log parameters
tracker.log_params(params)
# Train model
model = RandomForestClassifier(**params, random_state=42)
# Simulate training with epochs
for epoch in range(5):
# Training step
model.fit(X_train, y_train)
# Calculate metrics
train_pred = model.predict(X_train)
val_pred = model.predict(X_test)
train_acc = accuracy_score(y_train, train_pred)
val_acc = accuracy_score(y_test, val_pred)
# Log metrics
tracker.log_metrics({
"train_accuracy": train_acc,
"val_accuracy": val_acc
}, step=epoch)
# Log final metrics
final_metrics = {
"final_accuracy": accuracy_score(y_test, val_pred),
"final_f1": f1_score(y_test, val_pred)
}
tracker.log_metrics(final_metrics)
# Add tags
tracker.add_tags([f"depth_{params['max_depth']}", "random_forest"])
# End run
tracker.end_run()
print()
# Visualize results
print("\nAll Experiment Runs:")
print(tracker.get_all_runs())
print("\nPlotting validation accuracy history:")
tracker.plot_metric_history("val_accuracy")
# Find best run
best_run = tracker.find_best_run("final_accuracy", mode='max')
if best_run:
print(f"\nBest Run: {best_run['run_id']}")
print(f"Parameters: {best_run['parameters']}")
print(f"Best Accuracy: {best_run['metrics']['final_accuracy']}")
class ModelDeploymentPipeline:
"""Pipeline for model deployment and serving"""
def __init__(self, model_name: str):
self.model_name = model_name
self.deployment_path = Path("./deployments") / model_name
self.deployment_path.mkdir(parents=True, exist_ok=True)
def create_model_api(self, model, preprocessing_fn=None):
"""Create API wrapper for model serving"""
api_code = '''
import pickle
import numpy as np
from flask import Flask, request, jsonify
import joblib
app = Flask(__name__)
# Load model
model = joblib.load('model.pkl')
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
return jsonify({"status": "healthy", "model": "''' + self.model_name + '''"})
@app.route('/predict', methods=['POST'])
def predict():
"""Prediction endpoint"""
try:
# Get data from request
data = request.json
# Convert to numpy array
features = np.array(data['features']).reshape(1, -1)
# Make prediction
prediction = model.predict(features)[0]
probability = model.predict_proba(features)[0].tolist()
return jsonify({
"prediction": int(prediction),
"probability": probability,
"model_name": "''' + self.model_name + '''"
})
except Exception as e:
return jsonify({"error": str(e)}), 400
@app.route('/batch_predict', methods=['POST'])
def batch_predict():
"""Batch prediction endpoint"""
try:
# Get data from request
data = request.json
# Convert to numpy array
features = np.array(data['features'])
# Make predictions
predictions = model.predict(features).tolist()
probabilities = model.predict_proba(features).tolist()
return jsonify({
"predictions": predictions,
"probabilities": probabilities,
"batch_size": len(predictions)
})
except Exception as e:
return jsonify({"error": str(e)}), 400
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False)
'''
# Save API code
api_path = self.deployment_path / "app.py"
with open(api_path, 'w') as f:
f.write(api_code)
print(f"ā
API code created: {api_path}")
return api_path
def create_dockerfile(self):
"""Create Dockerfile for containerization"""
dockerfile_content = f'''
FROM python:3.9-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy model and app
COPY model.pkl .
COPY app.py .
# Expose port
EXPOSE 5000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \\
CMD curl -f http://localhost:5000/health || exit 1
# Run app
CMD ["python", "app.py"]
'''
# Save Dockerfile
dockerfile_path = self.deployment_path / "Dockerfile"
with open(dockerfile_path, 'w') as f:
f.write(dockerfile_content.strip())
print(f"ā
Dockerfile created: {dockerfile_path}")
return dockerfile_path
def create_requirements(self):
"""Create requirements.txt for dependencies"""
requirements = '''
flask==2.3.0
numpy==1.24.0
scikit-learn==1.3.0
joblib==1.3.0
gunicorn==21.2.0
'''
# Save requirements
req_path = self.deployment_path / "requirements.txt"
with open(req_path, 'w') as f:
f.write(requirements.strip())
print(f"ā
Requirements file created: {req_path}")
return req_path
def create_kubernetes_deployment(self):
"""Create Kubernetes deployment manifest"""
k8s_yaml = f'''
apiVersion: apps/v1
kind: Deployment
metadata:
name: {self.model_name}-deployment
labels:
app: {self.model_name}
spec:
replicas: 3
selector:
matchLabels:
app: {self.model_name}
template:
metadata:
labels:
app: {self.model_name}
spec:
containers:
- name: {self.model_name}
image: {self.model_name}:latest
ports:
- containerPort: 5000
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: {self.model_name}-service
spec:
selector:
app: {self.model_name}
ports:
- protocol: TCP
port: 80
targetPort: 5000
type: LoadBalancer
'''
# Save Kubernetes manifest
k8s_path = self.deployment_path / "kubernetes.yaml"
with open(k8s_path, 'w') as f:
f.write(k8s_yaml.strip())
print(f"ā
Kubernetes manifest created: {k8s_path}")
return k8s_path
def create_deployment_package(self, model):
"""Create complete deployment package"""
print(f"\nš¦ Creating deployment package for {self.model_name}")
print("-" * 40)
# Save model
model_path = self.deployment_path / "model.pkl"
joblib.dump(model, model_path)
print(f"ā
Model saved: {model_path}")
# Create API
self.create_model_api(model)
# Create Dockerfile
self.create_dockerfile()
# Create requirements
self.create_requirements()
# Create Kubernetes manifest
self.create_kubernetes_deployment()
# Create deployment script
deploy_script = f'''#!/bin/bash
echo "Deploying {self.model_name}..."
# Build Docker image
docker build -t {self.model_name}:latest .
# Run locally for testing
# docker run -p 5000:5000 {self.model_name}:latest
# Push to registry (update with your registry)
# docker tag {self.model_name}:latest your-registry/{self.model_name}:latest
# docker push your-registry/{self.model_name}:latest
# Deploy to Kubernetes
# kubectl apply -f kubernetes.yaml
echo "Deployment complete!"
'''
deploy_script_path = self.deployment_path / "deploy.sh"
with open(deploy_script_path, 'w') as f:
f.write(deploy_script.strip())
print(f"ā
Deployment script created: {deploy_script_path}")
print(f"\n⨠Deployment package ready at: {self.deployment_path}")
print("\nNext steps:")
print("1. cd", self.deployment_path)
print("2. chmod +x deploy.sh")
print("3. ./deploy.sh")
# Create deployment pipeline
print("\n" + "="*60)
print("MODEL DEPLOYMENT PIPELINE")
print("="*60)
# Train a model for deployment
model_for_deployment = RandomForestClassifier(n_estimators=100, random_state=42)
model_for_deployment.fit(X_train, y_train)
# Create deployment package
deployment = ModelDeploymentPipeline("production_classifier")
deployment.create_deployment_package(model_for_deployment)
class ModelMonitor:
"""Monitor model performance and data drift"""
def __init__(self, model_name: str):
self.model_name = model_name
self.metrics_history = []
self.alerts = []
self.baseline_metrics = None
def set_baseline(self, metrics: Dict[str, float]):
"""Set baseline metrics for comparison"""
self.baseline_metrics = metrics
print(f"š Baseline metrics set: {metrics}")
def log_prediction(self, features, prediction, actual=None, timestamp=None):
"""Log individual prediction for monitoring"""
if timestamp is None:
timestamp = datetime.datetime.now()
log_entry = {
"timestamp": timestamp,
"features": features.tolist() if hasattr(features, 'tolist') else features,
"prediction": prediction,
"actual": actual
}
# In production, this would go to a database or monitoring service
return log_entry
def calculate_metrics(self, predictions, actuals):
"""Calculate performance metrics"""
metrics = {
"accuracy": accuracy_score(actuals, predictions),
"precision": precision_score(actuals, predictions, average='weighted'),
"recall": recall_score(actuals, predictions, average='weighted'),
"f1_score": f1_score(actuals, predictions, average='weighted')
}
return metrics
def check_drift(self, current_metrics: Dict[str, float], threshold: float = 0.1):
"""Check for performance drift"""
if self.baseline_metrics is None:
print("ā ļø No baseline metrics set")
return []
drift_alerts = []
for metric_name, current_value in current_metrics.items():
if metric_name in self.baseline_metrics:
baseline_value = self.baseline_metrics[metric_name]
drift = abs(current_value - baseline_value)
drift_percentage = (drift / baseline_value) * 100
if drift > threshold:
alert = {
"metric": metric_name,
"baseline": baseline_value,
"current": current_value,
"drift": drift,
"drift_percentage": drift_percentage,
"timestamp": datetime.datetime.now()
}
drift_alerts.append(alert)
self.alerts.append(alert)
print(f"ā ļø DRIFT ALERT: {metric_name}")
print(f" Baseline: {baseline_value:.4f}")
print(f" Current: {current_value:.4f}")
print(f" Drift: {drift_percentage:.2f}%")
return drift_alerts
def visualize_monitoring_dashboard(self):
"""Create monitoring dashboard visualization"""
# Simulate historical metrics
np.random.seed(42)
hours = 24
timestamps = pd.date_range(end=datetime.datetime.now(), periods=hours, freq='H')
# Simulate metrics with some drift
accuracy_values = 0.95 - np.random.exponential(0.01, hours) * 0.1
precision_values = 0.93 - np.random.exponential(0.01, hours) * 0.08
recall_values = 0.94 - np.random.exponential(0.01, hours) * 0.09
latency_values = 50 + np.random.exponential(10, hours)
# Create dashboard
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# Accuracy over time
axes[0, 0].plot(timestamps, accuracy_values, marker='o', markersize=4)
axes[0, 0].axhline(y=0.95, color='g', linestyle='--', label='Baseline')
axes[0, 0].axhline(y=0.85, color='r', linestyle='--', label='Alert Threshold')
axes[0, 0].fill_between(timestamps, 0.85, accuracy_values,
where=(accuracy_values < 0.85),
color='red', alpha=0.3, label='Alert Zone')
axes[0, 0].set_xlabel('Time')
axes[0, 0].set_ylabel('Accuracy')
axes[0, 0].set_title('Model Accuracy Monitoring')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
axes[0, 0].tick_params(axis='x', rotation=45)
# Precision vs Recall
axes[0, 1].scatter(precision_values, recall_values, c=range(hours),
cmap='viridis', alpha=0.6)
axes[0, 1].set_xlabel('Precision')
axes[0, 1].set_ylabel('Recall')
axes[0, 1].set_title('Precision vs Recall Drift')
axes[0, 1].grid(True, alpha=0.3)
# Latency monitoring
axes[1, 0].plot(timestamps, latency_values, color='orange',
marker='s', markersize=4)
axes[1, 0].axhline(y=100, color='r', linestyle='--', label='SLA Limit')
axes[1, 0].fill_between(timestamps, 100, latency_values,
where=(latency_values > 100),
color='red', alpha=0.3, label='SLA Violation')
axes[1, 0].set_xlabel('Time')
axes[1, 0].set_ylabel('Latency (ms)')
axes[1, 0].set_title('Prediction Latency')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
axes[1, 0].tick_params(axis='x', rotation=45)
# Alert summary
alert_types = ['Accuracy', 'Precision', 'Recall', 'Latency']
alert_counts = [3, 1, 2, 5]
colors = ['red' if c > 2 else 'orange' for c in alert_counts]
axes[1, 1].bar(alert_types, alert_counts, color=colors)
axes[1, 1].set_ylabel('Alert Count (24h)')
axes[1, 1].set_title('Alert Summary')
axes[1, 1].grid(True, alpha=0.3, axis='y')
# Add status text
for i, (type_, count) in enumerate(zip(alert_types, alert_counts)):
axes[1, 1].text(i, count + 0.1, str(count), ha='center')
plt.suptitle(f'Model Monitoring Dashboard - {self.model_name}',
fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
def generate_monitoring_report(self):
"""Generate monitoring report"""
report = f"""
MODEL MONITORING REPORT
=======================
Model: {self.model_name}
Generated: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
CURRENT STATUS
--------------
Overall Health: {'ā ļø DEGRADED' if self.alerts else 'ā
HEALTHY'}
Active Alerts: {len(self.alerts)}
BASELINE METRICS
----------------"""
if self.baseline_metrics:
for metric, value in self.baseline_metrics.items():
report += f"\n{metric}: {value:.4f}"
else:
report += "\nNo baseline metrics set"
report += "\n\nRECENT ALERTS\n-------------"
if self.alerts:
for alert in self.alerts[-5:]: # Last 5 alerts
report += f"\n- {alert['metric']}: {alert['drift_percentage']:.2f}% drift"
else:
report += "\nNo recent alerts"
report += """
RECOMMENDATIONS
---------------
1. Review training data distribution
2. Check for data quality issues
3. Consider model retraining
4. Validate feature engineering pipeline
5. Monitor business metrics correlation
"""
return report
# Demonstrate monitoring
print("\n" + "="*60)
print("MODEL MONITORING DEMO")
print("="*60)
# Create monitor
monitor = ModelMonitor("production_classifier")
# Set baseline metrics
baseline = {
"accuracy": 0.95,
"precision": 0.93,
"recall": 0.94,
"f1_score": 0.935
}
monitor.set_baseline(baseline)
# Simulate production metrics with drift
print("\nš Checking for drift...")
current_metrics = {
"accuracy": 0.87, # Significant drift
"precision": 0.91,
"recall": 0.88, # Significant drift
"f1_score": 0.895
}
# Check for drift
drift_alerts = monitor.check_drift(current_metrics, threshold=0.05)
# Visualize dashboard
print("\nš Generating monitoring dashboard...")
monitor.visualize_monitoring_dashboard()
# Generate report
print("\nš Monitoring Report:")
print(monitor.generate_monitoring_report())
print("\n" + "="*60)
print("MLOPS BEST PRACTICES")
print("="*60)
best_practices = """
KEY BEST PRACTICES:
1. VERSION CONTROL:
⢠Version code, data, and models
⢠Use semantic versioning (major.minor.patch)
⢠Tag releases and maintain changelog
⢠Store model artifacts with metadata
2. REPRODUCIBILITY:
⢠Set random seeds everywhere
⢠Pin dependency versions
⢠Document data sources and preprocessing
⢠Use configuration files over hardcoding
3. TESTING:
⢠Unit tests for data processing
⢠Integration tests for pipelines
⢠Model performance tests
⢠A/B testing in production
⢠Shadow deployment before full rollout
4. MONITORING:
⢠Track model performance metrics
⢠Monitor data distribution drift
⢠Log predictions and errors
⢠Set up alerts for anomalies
⢠Regular model audits
5. DOCUMENTATION:
⢠Model cards with capabilities/limitations
⢠API documentation
⢠Data dictionaries
⢠Architecture decision records
⢠Runbooks for common issues
6. SECURITY:
⢠Encrypt sensitive data
⢠Secure API endpoints
⢠Implement rate limiting
⢠Regular security audits
⢠Access control and authentication
7. SCALABILITY:
⢠Horizontal scaling with containers
⢠Load balancing
⢠Caching strategies
⢠Asynchronous processing
⢠Auto-scaling policies
"""
print(best_practices)
production_checklist = """
PRODUCTION DEPLOYMENT CHECKLIST:
Pre-Deployment:
ā” Model performance meets requirements
ā” Code review completed
ā” Unit and integration tests pass
ā” Documentation updated
ā” Security review done
ā” Load testing completed
ā” Rollback plan prepared
ā” Monitoring configured
Deployment:
ā” Create deployment artifacts
ā” Tag release version
ā” Deploy to staging environment
ā” Run smoke tests
ā” Perform canary deployment
ā” Monitor initial performance
ā” Gradual traffic rollout
Post-Deployment:
ā” Monitor metrics closely (first 24h)
ā” Check for data drift
ā” Validate business metrics
ā” Gather user feedback
ā” Document lessons learned
ā” Update runbooks
ā” Plan next iteration
Maintenance:
ā” Regular model evaluation
ā” Scheduled retraining
ā” Update dependencies
ā” Performance optimization
ā” Cost monitoring
ā” Compliance checks
ā” Disaster recovery testing
"""
print("\nPRODUCTION CHECKLIST:")
print(production_checklist)
Create an end-to-end MLOps pipeline:
Build A/B testing framework:
Create comprehensive drift detection: