Skip to content
Published on

The Complete MLOps & AI Model Deployment Guide — From Training to Serving and Monitoring

Authors

Table of Contents

  1. What Is MLOps
  2. Experiment Management
  3. Data Pipelines
  4. Model Training Infrastructure
  5. Model Serving
  6. LLM Serving Special
  7. CI/CD for ML
  8. Monitoring
  9. A/B Testing and Deployment Strategies
  10. Cost Optimization
  11. Practical: Building an LLM RAG Pipeline

1. What Is MLOps

When DevOps Meets ML

MLOps is a portmanteau of Machine Learning and Operations. Think of it as applying DevOps principles from software engineering to the machine learning lifecycle.

In traditional software development, you only had to manage code. In ML systems, however, you must manage code, data, and models simultaneously. When data changes, the model changes. When the model changes, the serving approach may need to change too.

The ML Lifecycle

The overall flow of an ML project looks like this:

  1. Problem Definition - Translate business requirements into an ML problem
  2. Data Collection and Preprocessing - Build data pipelines
  3. Feature Engineering - Transform raw data into model-ready features
  4. Model Training - Experiment with algorithms and hyperparameters
  5. Model Evaluation - Validate performance with offline metrics
  6. Model Deployment - Serve in production
  7. Monitoring - Track performance and detect drift
  8. Retraining - Update the model when performance degrades

This is not a one-time process but a continuous cycle.

Google's MLOps Maturity Model

Google defines three levels of MLOps maturity.

Level 0 - Manual Process

  • Data scientists train models manually in notebooks
  • Model deployment is manual and irregular
  • No monitoring
  • Most teams are at this level

Level 1 - ML Pipeline Automation

  • Training pipeline is automated
  • Continuous Training implemented
  • Experiment tracking system in place
  • Model registry in use

Level 2 - CI/CD Pipeline Automation

  • CI/CD for the pipeline itself
  • Automated testing and validation
  • Feature store utilization
  • Complete monitoring and alerting

Most organizations can gain tremendous value just by moving from Level 0 to Level 1.


2. Experiment Management

MLflow - The Open-Source ML Platform

MLflow is the de facto standard for ML experiment management. It consists of four core components.

MLflow Tracking - Experiment Logging

import mlflow

mlflow.set_experiment("recommendation-model-v2")

with mlflow.start_run():
    # Log hyperparameters
    mlflow.log_param("learning_rate", 0.001)
    mlflow.log_param("batch_size", 64)
    mlflow.log_param("epochs", 50)

    # Run training
    model = train_model(lr=0.001, batch_size=64, epochs=50)

    # Log metrics
    mlflow.log_metric("accuracy", 0.923)
    mlflow.log_metric("f1_score", 0.891)
    mlflow.log_metric("auc_roc", 0.956)

    # Save model
    mlflow.pytorch.log_model(model, "model")

MLflow Models - Model Packaging

# Register in model registry
mlflow.register_model(
    "runs:/abc123/model",
    "recommendation-model"
)

# Transition stage
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
    name="recommendation-model",
    version=3,
    stage="Production"
)

Weights and Biases

W&B is a commercial experiment management platform with excellent visualization capabilities.

import wandb

wandb.init(project="image-classification", config={
    "learning_rate": 0.001,
    "architecture": "ResNet50",
    "dataset": "imagenet-subset",
    "epochs": 100,
})

for epoch in range(100):
    train_loss, val_loss = train_one_epoch(model)
    wandb.log({
        "train_loss": train_loss,
        "val_loss": val_loss,
        "epoch": epoch
    })

Optuna - Hyperparameter Optimization

Manually tuning hyperparameters is inefficient. Optuna performs automated search based on Bayesian optimization.

import optuna

def objective(trial):
    lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
    batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128])
    n_layers = trial.suggest_int("n_layers", 1, 5)
    dropout = trial.suggest_float("dropout", 0.1, 0.5)

    model = create_model(n_layers=n_layers, dropout=dropout)
    accuracy = train_and_evaluate(model, lr=lr, batch_size=batch_size)

    return accuracy

study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100)

print(f"Best accuracy: {study.best_trial.value}")
print(f"Best params: {study.best_trial.params}")

3. Data Pipelines

Feature Store

A feature store is a centralized repository for ML features. It ensures the same features are used for both training and serving.

Feast Example

from feast import FeatureStore

store = FeatureStore(repo_path="feature_repo/")

# Feature definition
from feast import Entity, FeatureView, Field
from feast.types import Float32, Int64

user = Entity(name="user_id", join_keys=["user_id"])

user_features = FeatureView(
    name="user_features",
    entities=[user],
    schema=[
        Field(name="total_purchases", dtype=Int64),
        Field(name="avg_order_value", dtype=Float32),
        Field(name="days_since_last_order", dtype=Int64),
    ],
    source=user_source,
)

# Retrieve training data
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_features:total_purchases",
        "user_features:avg_order_value",
        "user_features:days_since_last_order",
    ],
).to_df()

# Real-time serving query
online_features = store.get_online_features(
    features=[
        "user_features:total_purchases",
        "user_features:avg_order_value",
    ],
    entity_rows=[{"user_id": 12345}],
).to_dict()

DVC - Data Version Control

DVC enables Git-based version control for data.

# Initialize DVC
dvc init

# Track data file
dvc add data/training_data.csv

# Configure remote storage
dvc remote add -d myremote s3://my-bucket/dvc-storage

# Push data
dvc push

# Retrieve a specific version
git checkout v1.0
dvc pull

Data Validation - Great Expectations

Data quality validation is a cornerstone of the pipeline.

import great_expectations as gx

context = gx.get_context()

# Define expectations
validator = context.sources.pandas_default.read_csv(
    "data/training_data.csv"
)
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=150)
validator.expect_column_values_to_be_in_set("country", ["KR", "US", "JP", "UK"])

# Run validation
results = validator.validate()
if not results.success:
    raise ValueError("Data validation failed!")

4. Model Training Infrastructure

GPU Cluster Setup

GPU clusters are essential for training large-scale models.

Kubernetes + GPU Node Pool

apiVersion: v1
kind: Pod
metadata:
  name: training-job
spec:
  containers:
    - name: trainer
      image: training-image:latest
      resources:
        limits:
          nvidia.com/gpu: 4
      volumeMounts:
        - name: dataset
          mountPath: /data
  nodeSelector:
    gpu-type: a100
  tolerations:
    - key: "nvidia.com/gpu"
      operator: "Exists"
      effect: "NoSchedule"

Distributed Training

Models that cannot fit on a single GPU require distributed training.

PyTorch DDP (Distributed Data Parallel)

import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

def setup(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)

def train(rank, world_size):
    setup(rank, world_size)

    model = MyModel().to(rank)
    model = DDP(model, device_ids=[rank])

    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(num_epochs):
        sampler.set_epoch(epoch)
        for batch in dataloader:
            loss = model(batch)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

    dist.destroy_process_group()

Kubeflow Training Operator

Kubeflow lets you declaratively manage distributed training on Kubernetes.

apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
  name: pytorch-training
spec:
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      template:
        spec:
          containers:
            - name: pytorch
              image: my-training:latest
              resources:
                limits:
                  nvidia.com/gpu: 1
    Worker:
      replicas: 3
      template:
        spec:
          containers:
            - name: pytorch
              image: my-training:latest
              resources:
                limits:
                  nvidia.com/gpu: 1

5. Model Serving

Serving Framework Comparison

FrameworkStrengthsWeaknessesBest For
TorchServeNative PyTorch, easy setupLimited performanceSmall PyTorch models
TritonMulti-framework, high perfComplex configLarge-scale production
vLLMLLM-optimized, PagedAttentionLLM onlyLLM serving
FastAPIMaximum flexibilityManual implementationCustom logic needed

TorchServe

# Create model archive
torch-model-archiver \
  --model-name resnet50 \
  --version 1.0 \
  --model-file model.py \
  --serialized-file model.pth \
  --handler image_classifier

# Start server
torchserve --start \
  --model-store model_store \
  --models resnet50=resnet50.mar

NVIDIA Triton Inference Server

Triton can serve models from multiple frameworks simultaneously.

# Model repository structure
model_repository/
  resnet50/
    config.pbtxt
    1/
      model.onnx
  bert_base/
    config.pbtxt
    1/
      model.plan

config.pbtxt Configuration

name: "resnet50"
platform: "onnxruntime_onnx"
max_batch_size: 32
input [
  {
    name: "input"
    data_type: TYPE_FP32
    dims: [3, 224, 224]
  }
]
output [
  {
    name: "output"
    data_type: TYPE_FP32
    dims: [1000]
  }
]
dynamic_batching {
  preferred_batch_size: [8, 16, 32]
  max_queue_delay_microseconds: 100
}
instance_group [
  {
    count: 2
    kind: KIND_GPU
  }
]

FastAPI Wrapping

FastAPI is well-suited for simple model serving scenarios.

from fastapi import FastAPI
import torch
from pydantic import BaseModel

app = FastAPI()

# Load model once at startup
model = torch.jit.load("model.pt")
model.eval()

class PredictionRequest(BaseModel):
    features: list[float]

class PredictionResponse(BaseModel):
    prediction: float
    confidence: float

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    tensor = torch.tensor([request.features])
    with torch.no_grad():
        output = model(tensor)
    prob = torch.softmax(output, dim=1)
    prediction = torch.argmax(prob, dim=1).item()
    confidence = prob[0][prediction].item()

    return PredictionResponse(
        prediction=prediction,
        confidence=confidence
    )

@app.get("/health")
async def health():
    return {"status": "healthy"}

6. LLM Serving Special

vLLM - High-Performance LLM Serving

vLLM uses PagedAttention technology to maximize LLM serving performance.

from vllm import LLM, SamplingParams

# Load model
llm = LLM(
    model="meta-llama/Llama-3.1-8B-Instruct",
    tensor_parallel_size=2,
    gpu_memory_utilization=0.9,
    max_model_len=8192,
)

# Inference
sampling_params = SamplingParams(
    temperature=0.7,
    top_p=0.9,
    max_tokens=512,
)

outputs = llm.generate(
    ["Explain MLOps in simple terms."],
    sampling_params,
)
print(outputs[0].outputs[0].text)

vLLM OpenAI-Compatible Server

python -m vllm.entrypoints.openai.api_server \
  --model meta-llama/Llama-3.1-8B-Instruct \
  --tensor-parallel-size 2 \
  --port 8000
# Can call using the OpenAI SDK
from openai import OpenAI

client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy")

response = client.chat.completions.create(
    model="meta-llama/Llama-3.1-8B-Instruct",
    messages=[
        {"role": "user", "content": "What is MLOps?"}
    ],
    max_tokens=256,
)

Text Generation Inference (TGI)

HuggingFace's TGI is Rust-based and reliable.

docker run --gpus all \
  -p 8080:80 \
  -v /data:/data \
  ghcr.io/huggingface/text-generation-inference:latest \
  --model-id meta-llama/Llama-3.1-8B-Instruct \
  --quantize gptq \
  --max-input-length 4096 \
  --max-total-tokens 8192

Quantization Techniques Compared

Quantization is essential for reducing model size when deploying LLMs.

TechniqueSize ReductionQuality LossSpeed GainNotes
GPTQ4xLow2-3xGPU-optimized, good accuracy
AWQ4xVery low2-3xActivation-aware, better than GPTQ
GGUF2-8xVariableCPU possibleFor llama.cpp, CPU/GPU hybrid
BitsAndBytes2-4xLow1.5xEasy to apply, QLoRA training
# BitsAndBytes 4-bit quantization
from transformers import AutoModelForCausalLM, BitsAndBytesConfig

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True,
)

model = AutoModelForCausalLM.from_pretrained(
    "meta-llama/Llama-3.1-70B-Instruct",
    quantization_config=quantization_config,
    device_map="auto",
)

Ollama - Local LLM

Ollama is convenient for development environments or small-scale deployments.

# Download and run a model
ollama pull llama3.1
ollama run llama3.1

# API server mode
ollama serve
import requests

response = requests.post("http://localhost:11434/api/generate", json={
    "model": "llama3.1",
    "prompt": "Explain MLOps briefly.",
    "stream": False,
})
print(response.json()["response"])

7. CI/CD for ML

Model Testing Strategy

ML models require different testing approaches compared to traditional software.

import pytest

class TestModelQuality:
    """Model quality tests"""

    def test_accuracy_threshold(self, model, test_data):
        """Verify accuracy meets threshold"""
        accuracy = evaluate(model, test_data)
        assert accuracy >= 0.90, f"Accuracy {accuracy} below threshold 0.90"

    def test_latency(self, model, sample_input):
        """Check inference latency"""
        import time
        start = time.time()
        model.predict(sample_input)
        latency = time.time() - start
        assert latency < 0.1, f"Latency {latency}s exceeds 100ms"

    def test_no_bias(self, model, fairness_data):
        """Fairness verification"""
        results_group_a = model.predict(fairness_data["group_a"])
        results_group_b = model.predict(fairness_data["group_b"])
        disparity = abs(results_group_a.mean() - results_group_b.mean())
        assert disparity < 0.05, f"Bias detected: disparity={disparity}"

    def test_model_size(self, model_path):
        """Model size limit"""
        import os
        size_mb = os.path.getsize(model_path) / (1024 * 1024)
        assert size_mb < 500, f"Model size {size_mb}MB exceeds 500MB limit"

ML Pipeline with GitHub Actions

name: ML Pipeline

on:
  push:
    paths:
      - "src/**"
      - "data/**"
      - "configs/**"

jobs:
  data-validation:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Validate data
        run: python scripts/validate_data.py

  train:
    needs: data-validation
    runs-on: [self-hosted, gpu]
    steps:
      - uses: actions/checkout@v4
      - name: Train model
        run: python scripts/train.py --config configs/production.yaml
      - name: Upload model artifact
        uses: actions/upload-artifact@v4
        with:
          name: model
          path: outputs/model/

  evaluate:
    needs: train
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Download model
        uses: actions/download-artifact@v4
        with:
          name: model
      - name: Run evaluation
        run: python scripts/evaluate.py
      - name: Check quality gates
        run: python scripts/quality_gate.py

  deploy:
    needs: evaluate
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to staging
        run: ./scripts/deploy.sh staging
      - name: Smoke test
        run: python scripts/smoke_test.py
      - name: Deploy to production
        run: ./scripts/deploy.sh production

Data Validation Pipeline

def validate_training_data(data_path: str) -> bool:
    """Pipeline to validate training data quality"""

    df = pd.read_parquet(data_path)

    checks = [
        # Check data size
        len(df) >= 10000,
        # Required columns exist
        all(col in df.columns for col in REQUIRED_COLUMNS),
        # Null ratio check
        df.isnull().mean().max() < 0.05,
        # Label distribution check
        df["label"].value_counts(normalize=True).min() > 0.1,
        # Duplicate check
        df.duplicated().mean() < 0.01,
    ]

    if not all(checks):
        failed = [i for i, c in enumerate(checks) if not c]
        raise ValueError(f"Data validation failed at checks: {failed}")

    return True

8. Monitoring

Model Performance Monitoring

Model performance can degrade over time after deployment.

from prometheus_client import Gauge, Histogram, Counter

# Define metrics
prediction_latency = Histogram(
    "model_prediction_latency_seconds",
    "Prediction latency in seconds",
    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)

prediction_count = Counter(
    "model_prediction_total",
    "Total number of predictions",
    ["model_version", "status"]
)

model_accuracy = Gauge(
    "model_accuracy",
    "Current model accuracy",
    ["model_name"]
)

# Usage
import time

def predict_with_monitoring(model, input_data):
    start = time.time()
    try:
        result = model.predict(input_data)
        prediction_count.labels(
            model_version="v2.1",
            status="success"
        ).inc()
        return result
    except Exception as e:
        prediction_count.labels(
            model_version="v2.1",
            status="error"
        ).inc()
        raise
    finally:
        latency = time.time() - start
        prediction_latency.observe(latency)

Data Drift Detection

When the distribution of input data changes, model performance degrades.

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset

# Generate drift report
report = Report(metrics=[
    DataDriftPreset(),
    TargetDriftPreset(),
])

column_mapping = ColumnMapping(
    target="label",
    prediction="prediction",
    numerical_features=["feature_1", "feature_2", "feature_3"],
    categorical_features=["category", "region"],
)

report.run(
    reference_data=training_data,
    current_data=production_data,
    column_mapping=column_mapping,
)

# Check drift detection results
drift_results = report.as_dict()
if drift_results["metrics"][0]["result"]["dataset_drift"]:
    trigger_retraining_pipeline()

Concept Drift

Not only can data distributions change, but the relationship between inputs and outputs itself can shift. For example, consumer behavior patterns changed dramatically before and after COVID.

class ConceptDriftDetector:
    """PSI (Population Stability Index) based drift detection"""

    def __init__(self, threshold=0.2):
        self.threshold = threshold

    def calculate_psi(self, expected, actual, bins=10):
        """Calculate PSI"""
        breakpoints = np.percentile(expected, np.linspace(0, 100, bins + 1))
        expected_counts = np.histogram(expected, breakpoints)[0] / len(expected)
        actual_counts = np.histogram(actual, breakpoints)[0] / len(actual)

        # Prevent division by zero
        expected_counts = np.clip(expected_counts, 1e-6, None)
        actual_counts = np.clip(actual_counts, 1e-6, None)

        psi = np.sum(
            (actual_counts - expected_counts) *
            np.log(actual_counts / expected_counts)
        )
        return psi

    def check_drift(self, reference_predictions, current_predictions):
        psi = self.calculate_psi(reference_predictions, current_predictions)

        if psi > self.threshold:
            return {"drift_detected": True, "psi": psi, "action": "retrain"}
        elif psi > self.threshold / 2:
            return {"drift_detected": False, "psi": psi, "action": "monitor"}
        else:
            return {"drift_detected": False, "psi": psi, "action": "none"}

Grafana Dashboard Setup

Grafana is ideal for visualizing monitoring metrics. You can create dashboards from metrics collected by Prometheus.

Key monitoring panels:

  • Request Throughput: Prediction requests per second
  • Latency Distribution: p50, p95, p99 latency
  • Error Rate: Prediction failure ratio
  • Model Accuracy: Real-time performance metrics
  • Data Drift Score: PSI, KL Divergence
  • Resource Usage: GPU memory, CPU, network

9. A/B Testing and Deployment Strategies

Canary Deployment

A strategy that first applies a new model to only a small portion of traffic.

# Traffic splitting with Istio VirtualService
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: model-serving
spec:
  hosts:
    - model-service
  http:
    - match:
        - headers:
            canary:
              exact: "true"
      route:
        - destination:
            host: model-service
            subset: v2
    - route:
        - destination:
            host: model-service
            subset: v1
          weight: 90
        - destination:
            host: model-service
            subset: v2
          weight: 10

Shadow Mode

The new model runs inference on real traffic, but results are only stored and not returned to users. This allows safe validation by comparing against the existing model's results.

class ShadowModelRouter:
    def __init__(self, primary_model, shadow_model):
        self.primary = primary_model
        self.shadow = shadow_model

    async def predict(self, input_data):
        # Primary model result (returned to user)
        primary_result = await self.primary.predict(input_data)

        # Shadow model result (stored for comparison)
        try:
            shadow_result = await self.shadow.predict(input_data)
            await self.log_comparison(
                input_data, primary_result, shadow_result
            )
        except Exception:
            pass  # Shadow model errors are ignored

        return primary_result

    async def log_comparison(self, input_data, primary, shadow):
        comparison = {
            "timestamp": datetime.utcnow().isoformat(),
            "input_hash": hash(str(input_data)),
            "primary_prediction": primary,
            "shadow_prediction": shadow,
            "agreement": primary == shadow,
        }
        await self.metrics_store.insert(comparison)

Multi-Armed Bandit

A more efficient dynamic traffic allocation method than A/B testing. It automatically routes more traffic to better-performing models.

import numpy as np

class ThompsonSamplingRouter:
    """Thompson Sampling based model router"""

    def __init__(self, model_names):
        self.models = model_names
        # Beta distribution parameters (alpha, beta)
        self.successes = {name: 1 for name in model_names}
        self.failures = {name: 1 for name in model_names}

    def select_model(self):
        """Select a model via Thompson Sampling"""
        samples = {}
        for name in self.models:
            samples[name] = np.random.beta(
                self.successes[name],
                self.failures[name]
            )
        return max(samples, key=samples.get)

    def update(self, model_name, success: bool):
        """Update the distribution based on results"""
        if success:
            self.successes[model_name] += 1
        else:
            self.failures[model_name] += 1

    def get_allocation(self):
        """Check current traffic ratios"""
        total = sum(self.successes[m] + self.failures[m] for m in self.models)
        return {
            m: (self.successes[m] + self.failures[m]) / total
            for m in self.models
        }

10. Cost Optimization

Leveraging Spot Instances

Training jobs are interruptible, so spot instances can save over 70% on costs.

# AWS SageMaker spot training
import sagemaker

estimator = sagemaker.estimator.Estimator(
    image_uri="my-training-image:latest",
    role="arn:aws:iam::ACCOUNT:role/SageMakerRole",
    instance_count=4,
    instance_type="ml.p4d.24xlarge",
    use_spot_instances=True,
    max_wait=7200,  # Maximum wait time
    max_run=3600,   # Maximum run time
    checkpoint_s3_uri="s3://bucket/checkpoints/",
)

estimator.fit({"training": "s3://bucket/data/"})

The key is checkpoint saving. You must be able to resume from a checkpoint when a spot instance is interrupted.

Model Compression

Methods to reduce production serving costs:

Knowledge Distillation

class DistillationTrainer:
    def __init__(self, teacher, student, temperature=3.0, alpha=0.5):
        self.teacher = teacher
        self.student = student
        self.temperature = temperature
        self.alpha = alpha

    def distillation_loss(self, student_logits, teacher_logits, labels):
        # Soft target loss (KD loss)
        soft_targets = F.softmax(teacher_logits / self.temperature, dim=1)
        soft_loss = F.kl_div(
            F.log_softmax(student_logits / self.temperature, dim=1),
            soft_targets,
            reduction="batchmean"
        ) * (self.temperature ** 2)

        # Hard target loss
        hard_loss = F.cross_entropy(student_logits, labels)

        return self.alpha * soft_loss + (1 - self.alpha) * hard_loss

ONNX Conversion for Inference Optimization

import torch
import onnx

# PyTorch -> ONNX conversion
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
    model,
    dummy_input,
    "model.onnx",
    input_names=["input"],
    output_names=["output"],
    dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
    opset_version=17,
)

# Inference with ONNX Runtime
import onnxruntime as ort

session = ort.InferenceSession("model.onnx", providers=["CUDAExecutionProvider"])
result = session.run(None, {"input": input_array})

Batch Inference vs Real-Time Inference

AspectBatch InferenceReal-Time Inference
LatencyMinutes to hoursMilliseconds to seconds
ThroughputVery highMedium
CostLow (spot instances)High (always on)
Best ForRecommendation systems, reportsChatbots, search
# Batch inference example (Spark)
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

spark = SparkSession.builder.getOrCreate()

# Register model inference as a UDF
@udf("float")
def predict_udf(features):
    return float(model.predict([features])[0])

# Run batch inference on large dataset
df = spark.read.parquet("s3://bucket/daily-data/")
predictions = df.withColumn("prediction", predict_udf("features"))
predictions.write.parquet("s3://bucket/predictions/")

11. Practical: Building an LLM RAG Pipeline

RAG (Retrieval-Augmented Generation) is a key pattern for overcoming LLM limitations. It searches an external knowledge base and provides context to the LLM.

Overall Architecture

  1. Document Collection - Gather data from various sources
  2. Chunking - Split documents into appropriate sizes
  3. Embedding - Convert text to vectors
  4. Vector DB Storage - Store in a vector index
  5. Retrieval - Search for similar documents for a query
  6. Generation - Pass search results as context to the LLM

Document Collection and Chunking

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import (
    PyPDFLoader,
    WebBaseLoader,
    NotionDirectoryLoader,
)

# Load documents from various sources
pdf_docs = PyPDFLoader("docs/manual.pdf").load()
web_docs = WebBaseLoader("https://docs.example.com").load()
notion_docs = NotionDirectoryLoader("notion_export/").load()

all_docs = pdf_docs + web_docs + notion_docs

# Chunking
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["\n\n", "\n", ".", " "],
)

chunks = text_splitter.split_documents(all_docs)
print(f"Total chunks: {len(chunks)}")

Embedding and Vector DB

from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma

# Embedding model
embedding_model = HuggingFaceEmbeddings(
    model_name="BAAI/bge-large-en-v1.5",
    model_kwargs={"device": "cuda"},
    encode_kwargs={"normalize_embeddings": True},
)

# Store in Chroma vector DB
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embedding_model,
    persist_directory="./chroma_db",
    collection_name="knowledge_base",
)

# Similarity search
results = vectorstore.similarity_search(
    "How to deploy a model to production?",
    k=5,
)

Assembling the RAG Pipeline

from langchain.chains import RetrievalQA
from langchain.llms import VLLM
from langchain.prompts import PromptTemplate

# LLM setup
llm = VLLM(
    model="meta-llama/Llama-3.1-8B-Instruct",
    tensor_parallel_size=1,
    max_new_tokens=512,
    temperature=0.1,
)

# Prompt template
prompt_template = PromptTemplate(
    template="""Answer the question using the following context.
If the information is not in the context, say "I don't know."

Context:
{context}

Question: {question}

Answer:""",
    input_variables=["context", "question"],
)

# RAG chain
rag_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(
        search_type="mmr",
        search_kwargs={"k": 5, "fetch_k": 20},
    ),
    chain_type_kwargs={"prompt": prompt_template},
    return_source_documents=True,
)

# Query
result = rag_chain.invoke("What is the production deployment procedure?")
print(result["result"])
for doc in result["source_documents"]:
    print(f"  Source: {doc.metadata['source']}")

Production RAG Serving

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class QueryRequest(BaseModel):
    question: str
    top_k: int = 5

class QueryResponse(BaseModel):
    answer: str
    sources: list[str]
    latency_ms: float

@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
    import time
    start = time.time()

    result = rag_chain.invoke(request.question)

    sources = list(set(
        doc.metadata.get("source", "unknown")
        for doc in result["source_documents"]
    ))

    latency = (time.time() - start) * 1000

    return QueryResponse(
        answer=result["result"],
        sources=sources,
        latency_ms=round(latency, 2),
    )

Conclusion

MLOps is an essential discipline for reliably operating ML models in production.

Here are the key takeaways:

  1. Start with experiment management - Even adopting MLflow alone makes a huge difference.
  2. Invest in automation - Manual processes will inevitably fail.
  3. Monitoring is non-negotiable - The real work begins after deployment.
  4. Start small - Going from Google Level 0 to Level 1 is the most important step.
  5. Evolve for the LLM era - Actively adopt new tools like vLLM and RAG.

Training a model accounts for only 20% of the total effort. The remaining 80% is data pipelines, serving, monitoring, and operations. MLOps is the methodology for systematically managing that 80%.

MLOps Self-Check Checklist

Experiment Management

  • Are you tracking all experiment hyperparameters and metrics?
  • Can you reproduce experiment results?

Data Pipelines

  • Are you version-controlling your data?
  • Is data quality validation automated?

Training Infrastructure

  • Is your training pipeline automated?
  • Are you saving checkpoints?

Serving

  • Are you monitoring model serving latency?
  • Are you using a deployment strategy that supports rollback?

Monitoring

  • Are you detecting data drift?
  • Do you get alerts when model performance degrades?

Cost

  • Are you leveraging spot instances?
  • Have you considered model compression?