Why Monitor ML Models?
ML models degrade over time. Data distributions shift, user behavior changes, and the world evolves. Without monitoring, you won't know when your model stops working well.
- Data Drift: Input data distribution changes
- Concept Drift: Relationship between features and target changes
- Model Decay: Performance degradation over time
Key Metrics to Monitor
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
class ModelMonitor:
def __init__(self, model):
self.model = model
self.predictions = []
self.actuals = []
self.latencies = []
def log_prediction(self, features, prediction, actual=None, latency=None):
"""Log a single prediction for monitoring."""
self.predictions.append(prediction)
if actual is not None:
self.actuals.append(actual)
if latency is not None:
self.latencies.append(latency)
def get_performance_metrics(self):
"""Calculate performance metrics."""
if not self.actuals:
return {"error": "No ground truth available"}
return {
"accuracy": accuracy_score(self.actuals, self.predictions),
"precision": precision_score(self.actuals, self.predictions, average='weighted'),
"recall": recall_score(self.actuals, self.predictions, average='weighted'),
"f1": f1_score(self.actuals, self.predictions, average='weighted'),
"sample_count": len(self.predictions)
}
def get_operational_metrics(self):
"""Calculate operational metrics."""
return {
"avg_latency_ms": np.mean(self.latencies) if self.latencies else 0,
"p95_latency_ms": np.percentile(self.latencies, 95) if self.latencies else 0,
"p99_latency_ms": np.percentile(self.latencies, 99) if self.latencies else 0,
"prediction_count": len(self.predictions)
}
Data Drift Detection
from scipy import stats
import numpy as np
def detect_drift_ks(reference_data, current_data, threshold=0.05):
"""
Detect drift using Kolmogorov-Smirnov test.
Returns True if drift detected.
"""
statistic, p_value = stats.ks_2samp(reference_data, current_data)
return p_value < threshold, statistic, p_value
def detect_drift_psi(reference_data, current_data, buckets=10):
"""
Calculate Population Stability Index (PSI).
PSI < 0.1: No drift
0.1 <= PSI < 0.2: Moderate drift
PSI >= 0.2: Significant drift
"""
def get_bucket_percentages(data, bins):
hist, _ = np.histogram(data, bins=bins)
return hist / len(data)
# Create bins from reference data
_, bins = np.histogram(reference_data, bins=buckets)
ref_pct = get_bucket_percentages(reference_data, bins)
curr_pct = get_bucket_percentages(current_data, bins)
# Avoid division by zero
ref_pct = np.where(ref_pct == 0, 0.0001, ref_pct)
curr_pct = np.where(curr_pct == 0, 0.0001, curr_pct)
psi = np.sum((curr_pct - ref_pct) * np.log(curr_pct / ref_pct))
return psi
# Example usage
reference = training_data['feature1']
current = production_data['feature1']
drift_detected, ks_stat, p_value = detect_drift_ks(reference, current)
psi_score = detect_drift_psi(reference, current)
print(f"KS Test - Drift: {drift_detected}, Statistic: {ks_stat:.4f}")
print(f"PSI Score: {psi_score:.4f}")
Using Evidently for Monitoring
# pip install evidently
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, ClassificationPreset
from evidently.metrics import *
# Define column mapping
column_mapping = ColumnMapping(
target='target',
prediction='prediction',
numerical_features=['feature1', 'feature2', 'feature3'],
categorical_features=['category']
)
# Data Drift Report
data_drift_report = Report(metrics=[DataDriftPreset()])
data_drift_report.run(
reference_data=reference_df,
current_data=current_df,
column_mapping=column_mapping
)
data_drift_report.save_html('data_drift_report.html')
# Model Performance Report
classification_report = Report(metrics=[ClassificationPreset()])
classification_report.run(
reference_data=reference_df,
current_data=current_df,
column_mapping=column_mapping
)
classification_report.save_html('classification_report.html')
# Get metrics as dict
metrics = data_drift_report.as_dict()
Prometheus + Grafana Setup
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
# Define metrics
PREDICTIONS_TOTAL = Counter(
'model_predictions_total',
'Total number of predictions',
['model_version', 'prediction_class']
)
PREDICTION_LATENCY = Histogram(
'model_prediction_latency_seconds',
'Prediction latency in seconds',
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
MODEL_ACCURACY = Gauge(
'model_accuracy',
'Current model accuracy',
['model_version']
)
DATA_DRIFT_SCORE = Gauge(
'data_drift_psi',
'PSI score for data drift',
['feature']
)
# Start metrics server
start_http_server(8000)
# Use in prediction endpoint
@app.post('/predict')
async def predict(request: PredictionRequest):
start_time = time.time()
# Make prediction
prediction = model.predict(request.features)
# Record metrics
latency = time.time() - start_time
PREDICTION_LATENCY.observe(latency)
PREDICTIONS_TOTAL.labels(
model_version='v1',
prediction_class=str(prediction)
).inc()
return {'prediction': prediction}
Alerting System
import smtplib
from email.mime.text import MIMEText
import slack_sdk
class AlertManager:
def __init__(self, slack_token=None, email_config=None):
self.slack_client = slack_sdk.WebClient(token=slack_token) if slack_token else None
self.email_config = email_config
def send_alert(self, title, message, severity='warning'):
"""Send alert to configured channels."""
if self.slack_client:
self._send_slack_alert(title, message, severity)
if self.email_config:
self._send_email_alert(title, message)
def _send_slack_alert(self, title, message, severity):
color = {'critical': 'danger', 'warning': 'warning', 'info': 'good'}
self.slack_client.chat_postMessage(
channel='#ml-alerts',
attachments=[{
'color': color.get(severity, 'warning'),
'title': title,
'text': message
}]
)
def check_drift_and_alert(self, feature_name, psi_score):
"""Check drift and send alert if threshold exceeded."""
if psi_score >= 0.2:
self.send_alert(
title=f"Critical Data Drift Detected: {feature_name}",
message=f"PSI score: {psi_score:.4f}. Immediate action required.",
severity='critical'
)
elif psi_score >= 0.1:
self.send_alert(
title=f"Moderate Data Drift Detected: {feature_name}",
message=f"PSI score: {psi_score:.4f}. Consider retraining.",
severity='warning'
)
def check_performance_and_alert(self, metric_name, current_value, threshold):
"""Alert if performance drops below threshold."""
if current_value < threshold:
self.send_alert(
title=f"Model Performance Degradation: {metric_name}",
message=f"Current: {current_value:.4f}, Threshold: {threshold:.4f}",
severity='critical'
)
Automated Retraining Pipeline
class RetrainingTrigger:
def __init__(self, drift_threshold=0.2, performance_threshold=0.85):
self.drift_threshold = drift_threshold
self.performance_threshold = performance_threshold
def should_retrain(self, drift_scores, performance_metrics):
"""Determine if model should be retrained."""
# Check drift
max_drift = max(drift_scores.values())
if max_drift >= self.drift_threshold:
return True, f"Drift detected: {max_drift:.4f}"
# Check performance
if performance_metrics.get('accuracy', 1.0) < self.performance_threshold:
return True, f"Performance below threshold"
return False, "No retraining needed"
def trigger_retraining(self):
"""Trigger retraining pipeline."""
# Option 1: Call Airflow DAG
# requests.post('http://airflow/api/v1/dags/retrain/dagRuns', ...)
# Option 2: GitHub Actions workflow
# github_api.dispatch_workflow('retrain.yml')
# Option 3: AWS Step Functions
# stepfunctions.start_execution(...)
pass
# Scheduled monitoring job
def monitoring_job():
# Calculate drift
drift_scores = {}
for feature in features:
psi = detect_drift_psi(reference[feature], current[feature])
drift_scores[feature] = psi
# Calculate performance (if ground truth available)
performance = monitor.get_performance_metrics()
# Check if retraining needed
trigger = RetrainingTrigger()
should_retrain, reason = trigger.should_retrain(drift_scores, performance)
if should_retrain:
alert_manager.send_alert("Retraining Triggered", reason)
trigger.trigger_retraining()
Monitoring Dashboard Checklist
- Prediction Volume: Track request counts over time
- Latency: p50, p95, p99 latencies
- Error Rate: Failed predictions percentage
- Feature Distributions: Histograms of input features
- Prediction Distribution: Output class balance
- Drift Scores: PSI/KS test results per feature
- Model Performance: Accuracy, F1 (when labels available)
- Resource Usage: CPU, memory, GPU utilization
Master MLOps Monitoring
Our Data Science program covers production ML monitoring, drift detection, and maintaining healthy ML systems.
Explore Data Science Program