Model Serving Patterns

Model serving is about making your trained ML models available for predictions. There are several patterns depending on your requirements:

  • REST API: HTTP endpoints for synchronous predictions
  • Batch Processing: Process large datasets offline
  • Streaming: Real-time predictions on data streams
  • Edge Deployment: Run models on devices

Flask Model Server

from flask import Flask, request, jsonify
import joblib
import numpy as np

app = Flask(__name__)

# Load model at startup
model = joblib.load('model.pkl')

@app.route('/health', methods=['GET'])
def health():
    return jsonify({'status': 'healthy'})

@app.route('/predict', methods=['POST'])
def predict():
    try:
        data = request.json
        features = np.array(data['features']).reshape(1, -1)

        prediction = model.predict(features)
        probability = model.predict_proba(features)

        return jsonify({
            'prediction': int(prediction[0]),
            'probability': probability[0].tolist()
        })
    except Exception as e:
        return jsonify({'error': str(e)}), 400

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

# Production: use Gunicorn
# gunicorn -w 4 -b 0.0.0.0:5000 app:app

FastAPI Model Server

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
import joblib
import numpy as np

app = FastAPI(title="ML Model API")

model = joblib.load('model.pkl')

class PredictionRequest(BaseModel):
    features: List[float]

class PredictionResponse(BaseModel):
    prediction: int
    probability: List[float]

@app.post('/predict', response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    features = np.array(request.features).reshape(1, -1)
    prediction = model.predict(features)[0]
    probability = model.predict_proba(features)[0].tolist()

    return PredictionResponse(
        prediction=int(prediction),
        probability=probability
    )

# Run: uvicorn app:app --host 0.0.0.0 --port 8000

TensorFlow Serving

# Save model in SavedModel format
import tensorflow as tf

model.save('models/my_model/1/')  # Version 1

# Directory structure:
# models/
#   my_model/
#     1/
#       saved_model.pb
#       variables/

# Run TensorFlow Serving with Docker
docker run -p 8501:8501 \
    -v "$(pwd)/models:/models" \
    -e MODEL_NAME=my_model \
    tensorflow/serving

# REST API prediction
import requests
import json

data = {"instances": [[1.0, 2.0, 3.0, 4.0]]}
response = requests.post(
    'http://localhost:8501/v1/models/my_model:predict',
    json=data
)
predictions = response.json()['predictions']

# gRPC client (faster)
import grpc
from tensorflow_serving.apis import predict_pb2, prediction_service_pb2_grpc

channel = grpc.insecure_channel('localhost:8500')
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)

request = predict_pb2.PredictRequest()
request.model_spec.name = 'my_model'
request.inputs['input'].CopyFrom(tf.make_tensor_proto(data))

NVIDIA Triton Inference Server

# Model repository structure
model_repository/
  my_model/
    config.pbtxt
    1/
      model.onnx

# config.pbtxt
name: "my_model"
platform: "onnxruntime_onnx"
max_batch_size: 8
input [
  {
    name: "input"
    data_type: TYPE_FP32
    dims: [ 4 ]
  }
]
output [
  {
    name: "output"
    data_type: TYPE_FP32
    dims: [ 3 ]
  }
]

# Run Triton
docker run --gpus=all -p 8000:8000 -p 8001:8001 -p 8002:8002 \
    -v $(pwd)/model_repository:/models \
    nvcr.io/nvidia/tritonserver:23.10-py3 \
    tritonserver --model-repository=/models

# Python client
import tritonclient.http as httpclient

client = httpclient.InferenceServerClient(url="localhost:8000")

inputs = [httpclient.InferInput("input", [1, 4], "FP32")]
inputs[0].set_data_from_numpy(np.array([[1.0, 2.0, 3.0, 4.0]], dtype=np.float32))

outputs = [httpclient.InferRequestedOutput("output")]
result = client.infer("my_model", inputs, outputs=outputs)
predictions = result.as_numpy("output")

Batch Prediction

import pandas as pd
import joblib
from pathlib import Path

def batch_predict(input_path, output_path, model_path):
    """Run batch predictions on a CSV file."""
    # Load model
    model = joblib.load(model_path)

    # Load data
    df = pd.read_csv(input_path)

    # Preprocess
    features = df[['feature1', 'feature2', 'feature3']]

    # Predict
    predictions = model.predict(features)
    probabilities = model.predict_proba(features)

    # Add predictions to dataframe
    df['prediction'] = predictions
    df['probability'] = probabilities.max(axis=1)

    # Save results
    df.to_csv(output_path, index=False)
    print(f"Predictions saved to {output_path}")

# Using Apache Spark for large-scale batch
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel

spark = SparkSession.builder.appName("BatchPrediction").getOrCreate()
model = PipelineModel.load("spark_model")
df = spark.read.parquet("input_data")
predictions = model.transform(df)
predictions.write.parquet("predictions")

Model Caching and Optimization

from functools import lru_cache
import hashlib
import redis
import json

# In-memory caching with LRU
@lru_cache(maxsize=1000)
def predict_cached(features_tuple):
    features = np.array(features_tuple).reshape(1, -1)
    return model.predict(features)[0]

# Redis caching for distributed systems
redis_client = redis.Redis(host='localhost', port=6379)

def predict_with_redis_cache(features):
    # Create cache key
    key = hashlib.md5(str(features).encode()).hexdigest()

    # Check cache
    cached = redis_client.get(key)
    if cached:
        return json.loads(cached)

    # Compute prediction
    prediction = model.predict(np.array(features).reshape(1, -1))
    result = {'prediction': int(prediction[0])}

    # Store in cache (expire in 1 hour)
    redis_client.setex(key, 3600, json.dumps(result))

    return result

# Model optimization with ONNX
import onnx
from skl2onnx import convert_sklearn

# Convert to ONNX
onnx_model = convert_sklearn(model, initial_types=[('input', FloatTensorType([None, 4]))])
onnx.save_model(onnx_model, 'model.onnx')

# Run with ONNX Runtime (faster inference)
import onnxruntime as ort

session = ort.InferenceSession('model.onnx')
result = session.run(None, {'input': features.astype(np.float32)})

A/B Testing Models

import random

class ABTestingServer:
    def __init__(self):
        self.models = {
            'control': joblib.load('model_v1.pkl'),
            'treatment': joblib.load('model_v2.pkl')
        }
        self.traffic_split = {'control': 0.5, 'treatment': 0.5}

    def get_model(self, user_id):
        """Deterministic assignment based on user_id."""
        hash_value = hash(user_id) % 100
        if hash_value < self.traffic_split['control'] * 100:
            return 'control', self.models['control']
        return 'treatment', self.models['treatment']

    def predict(self, user_id, features):
        model_name, model = self.get_model(user_id)
        prediction = model.predict(features)

        # Log for analysis
        self.log_prediction(user_id, model_name, prediction)

        return {
            'prediction': prediction,
            'model_version': model_name
        }

# Usage in FastAPI
ab_server = ABTestingServer()

@app.post('/predict')
async def predict(request: PredictionRequest, user_id: str):
    return ab_server.predict(user_id, request.features)

Deployment Checklist

  • Health endpoints: /health, /ready endpoints
  • Logging: Log predictions, latencies, errors
  • Monitoring: Track latency, throughput, errors
  • Rate limiting: Protect against abuse
  • Authentication: Secure your endpoints
  • Versioning: API versioning (v1/predict)
  • Documentation: OpenAPI/Swagger docs
  • Graceful shutdown: Handle SIGTERM properly

Deploy Models Like a Pro

Our Data Science program covers model serving, MLOps, and production deployment. Build scalable ML systems.

Explore Data Science Program

Related Articles