Why Monitor ML Models?

ML models degrade over time. Data distributions shift, user behavior changes, and the world evolves. Without monitoring, you won't know when your model stops working well.

  • Data Drift: Input data distribution changes
  • Concept Drift: Relationship between features and target changes
  • Model Decay: Performance degradation over time

Key Metrics to Monitor

import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

class ModelMonitor:
    def __init__(self, model):
        self.model = model
        self.predictions = []
        self.actuals = []
        self.latencies = []

    def log_prediction(self, features, prediction, actual=None, latency=None):
        """Log a single prediction for monitoring."""
        self.predictions.append(prediction)
        if actual is not None:
            self.actuals.append(actual)
        if latency is not None:
            self.latencies.append(latency)

    def get_performance_metrics(self):
        """Calculate performance metrics."""
        if not self.actuals:
            return {"error": "No ground truth available"}

        return {
            "accuracy": accuracy_score(self.actuals, self.predictions),
            "precision": precision_score(self.actuals, self.predictions, average='weighted'),
            "recall": recall_score(self.actuals, self.predictions, average='weighted'),
            "f1": f1_score(self.actuals, self.predictions, average='weighted'),
            "sample_count": len(self.predictions)
        }

    def get_operational_metrics(self):
        """Calculate operational metrics."""
        return {
            "avg_latency_ms": np.mean(self.latencies) if self.latencies else 0,
            "p95_latency_ms": np.percentile(self.latencies, 95) if self.latencies else 0,
            "p99_latency_ms": np.percentile(self.latencies, 99) if self.latencies else 0,
            "prediction_count": len(self.predictions)
        }

Data Drift Detection

from scipy import stats
import numpy as np

def detect_drift_ks(reference_data, current_data, threshold=0.05):
    """
    Detect drift using Kolmogorov-Smirnov test.
    Returns True if drift detected.
    """
    statistic, p_value = stats.ks_2samp(reference_data, current_data)
    return p_value < threshold, statistic, p_value

def detect_drift_psi(reference_data, current_data, buckets=10):
    """
    Calculate Population Stability Index (PSI).
    PSI < 0.1: No drift
    0.1 <= PSI < 0.2: Moderate drift
    PSI >= 0.2: Significant drift
    """
    def get_bucket_percentages(data, bins):
        hist, _ = np.histogram(data, bins=bins)
        return hist / len(data)

    # Create bins from reference data
    _, bins = np.histogram(reference_data, bins=buckets)

    ref_pct = get_bucket_percentages(reference_data, bins)
    curr_pct = get_bucket_percentages(current_data, bins)

    # Avoid division by zero
    ref_pct = np.where(ref_pct == 0, 0.0001, ref_pct)
    curr_pct = np.where(curr_pct == 0, 0.0001, curr_pct)

    psi = np.sum((curr_pct - ref_pct) * np.log(curr_pct / ref_pct))
    return psi

# Example usage
reference = training_data['feature1']
current = production_data['feature1']

drift_detected, ks_stat, p_value = detect_drift_ks(reference, current)
psi_score = detect_drift_psi(reference, current)

print(f"KS Test - Drift: {drift_detected}, Statistic: {ks_stat:.4f}")
print(f"PSI Score: {psi_score:.4f}")

Using Evidently for Monitoring

# pip install evidently
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, ClassificationPreset
from evidently.metrics import *

# Define column mapping
column_mapping = ColumnMapping(
    target='target',
    prediction='prediction',
    numerical_features=['feature1', 'feature2', 'feature3'],
    categorical_features=['category']
)

# Data Drift Report
data_drift_report = Report(metrics=[DataDriftPreset()])
data_drift_report.run(
    reference_data=reference_df,
    current_data=current_df,
    column_mapping=column_mapping
)
data_drift_report.save_html('data_drift_report.html')

# Model Performance Report
classification_report = Report(metrics=[ClassificationPreset()])
classification_report.run(
    reference_data=reference_df,
    current_data=current_df,
    column_mapping=column_mapping
)
classification_report.save_html('classification_report.html')

# Get metrics as dict
metrics = data_drift_report.as_dict()

Prometheus + Grafana Setup

from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time

# Define metrics
PREDICTIONS_TOTAL = Counter(
    'model_predictions_total',
    'Total number of predictions',
    ['model_version', 'prediction_class']
)

PREDICTION_LATENCY = Histogram(
    'model_prediction_latency_seconds',
    'Prediction latency in seconds',
    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)

MODEL_ACCURACY = Gauge(
    'model_accuracy',
    'Current model accuracy',
    ['model_version']
)

DATA_DRIFT_SCORE = Gauge(
    'data_drift_psi',
    'PSI score for data drift',
    ['feature']
)

# Start metrics server
start_http_server(8000)

# Use in prediction endpoint
@app.post('/predict')
async def predict(request: PredictionRequest):
    start_time = time.time()

    # Make prediction
    prediction = model.predict(request.features)

    # Record metrics
    latency = time.time() - start_time
    PREDICTION_LATENCY.observe(latency)
    PREDICTIONS_TOTAL.labels(
        model_version='v1',
        prediction_class=str(prediction)
    ).inc()

    return {'prediction': prediction}

Alerting System

import smtplib
from email.mime.text import MIMEText
import slack_sdk

class AlertManager:
    def __init__(self, slack_token=None, email_config=None):
        self.slack_client = slack_sdk.WebClient(token=slack_token) if slack_token else None
        self.email_config = email_config

    def send_alert(self, title, message, severity='warning'):
        """Send alert to configured channels."""
        if self.slack_client:
            self._send_slack_alert(title, message, severity)
        if self.email_config:
            self._send_email_alert(title, message)

    def _send_slack_alert(self, title, message, severity):
        color = {'critical': 'danger', 'warning': 'warning', 'info': 'good'}
        self.slack_client.chat_postMessage(
            channel='#ml-alerts',
            attachments=[{
                'color': color.get(severity, 'warning'),
                'title': title,
                'text': message
            }]
        )

    def check_drift_and_alert(self, feature_name, psi_score):
        """Check drift and send alert if threshold exceeded."""
        if psi_score >= 0.2:
            self.send_alert(
                title=f"Critical Data Drift Detected: {feature_name}",
                message=f"PSI score: {psi_score:.4f}. Immediate action required.",
                severity='critical'
            )
        elif psi_score >= 0.1:
            self.send_alert(
                title=f"Moderate Data Drift Detected: {feature_name}",
                message=f"PSI score: {psi_score:.4f}. Consider retraining.",
                severity='warning'
            )

    def check_performance_and_alert(self, metric_name, current_value, threshold):
        """Alert if performance drops below threshold."""
        if current_value < threshold:
            self.send_alert(
                title=f"Model Performance Degradation: {metric_name}",
                message=f"Current: {current_value:.4f}, Threshold: {threshold:.4f}",
                severity='critical'
            )

Automated Retraining Pipeline

class RetrainingTrigger:
    def __init__(self, drift_threshold=0.2, performance_threshold=0.85):
        self.drift_threshold = drift_threshold
        self.performance_threshold = performance_threshold

    def should_retrain(self, drift_scores, performance_metrics):
        """Determine if model should be retrained."""
        # Check drift
        max_drift = max(drift_scores.values())
        if max_drift >= self.drift_threshold:
            return True, f"Drift detected: {max_drift:.4f}"

        # Check performance
        if performance_metrics.get('accuracy', 1.0) < self.performance_threshold:
            return True, f"Performance below threshold"

        return False, "No retraining needed"

    def trigger_retraining(self):
        """Trigger retraining pipeline."""
        # Option 1: Call Airflow DAG
        # requests.post('http://airflow/api/v1/dags/retrain/dagRuns', ...)

        # Option 2: GitHub Actions workflow
        # github_api.dispatch_workflow('retrain.yml')

        # Option 3: AWS Step Functions
        # stepfunctions.start_execution(...)
        pass

# Scheduled monitoring job
def monitoring_job():
    # Calculate drift
    drift_scores = {}
    for feature in features:
        psi = detect_drift_psi(reference[feature], current[feature])
        drift_scores[feature] = psi

    # Calculate performance (if ground truth available)
    performance = monitor.get_performance_metrics()

    # Check if retraining needed
    trigger = RetrainingTrigger()
    should_retrain, reason = trigger.should_retrain(drift_scores, performance)

    if should_retrain:
        alert_manager.send_alert("Retraining Triggered", reason)
        trigger.trigger_retraining()

Monitoring Dashboard Checklist

  • Prediction Volume: Track request counts over time
  • Latency: p50, p95, p99 latencies
  • Error Rate: Failed predictions percentage
  • Feature Distributions: Histograms of input features
  • Prediction Distribution: Output class balance
  • Drift Scores: PSI/KS test results per feature
  • Model Performance: Accuracy, F1 (when labels available)
  • Resource Usage: CPU, memory, GPU utilization

Master MLOps Monitoring

Our Data Science program covers production ML monitoring, drift detection, and maintaining healthy ML systems.

Explore Data Science Program

Related Articles