Skip to content
Published on

MLOps Complete Guide: From ML Pipeline to Production Deployment

Authors

Introduction

Modern machine learning projects don't end after building a model in a Jupyter Notebook. You need to reliably manage the entire lifecycle: data collection, preprocessing, training, evaluation, deployment, monitoring, and retraining. This is precisely why MLOps exists.

This guide takes you from the fundamentals of MLOps all the way through practical tool usage.


1. What is MLOps?

1.1 Defining MLOps

MLOps (Machine Learning Operations) is a methodology that integrates the development and operation of ML systems. It applies DevOps principles to ML workflows, automating continuous training (CT), continuous integration (CI), and continuous deployment (CD) of models.

1.2 MLOps vs DevOps vs DataOps

CategoryDevOpsDataOpsMLOps
FocusSoftware deploymentData pipelinesML model lifecycle
OutputApplicationsData/ReportsML models
ReproducibilityCode versioningData versioningCode + Data + Model versioning
AutomationCI/CDData testingTraining + Evaluation + Deployment

The key difference between MLOps and DevOps is data dependency. Even with the same code, training on different data produces entirely different models, and model performance is heavily influenced by data quality as much as code quality.

1.3 Unique Characteristics of ML Systems

ML systems have several characteristics distinct from traditional software:

  1. Data dependency: Model behavior is determined by data, not just code
  2. Experimental nature: Requires dozens to hundreds of iterative experiments
  3. Model decay: Model performance degrades as data distributions shift over time
  4. Reproducibility challenges: Difficult to guarantee identical results in the same environment
  5. Multiple artifacts: Code, data, and models all require version control

1.4 MLOps Maturity Model

Based on Google's MLOps maturity levels:

Level 0 - Manual ML

  • All processes are manual
  • Script-based experimentation
  • Rare, manual deployments
  • No monitoring

Level 1 - ML Pipeline Automation

  • Automated training pipeline
  • Continuous training enabled
  • Experiment tracking begins
  • Feature store introduced

Level 2 - CI/CD Pipeline Automation

  • Fully automated CI/CD
  • Model registry in use
  • Automated retraining triggers
  • Complete monitoring

1.5 MLOps Tool Ecosystem

Data Version Control: DVC, LakeFS, Delta Lake
Experiment Tracking: MLflow, W&B, Neptune, Comet ML
Pipelines: Airflow, Prefect, Metaflow, Kubeflow Pipelines
Model Registry: MLflow Registry, W&B Artifacts, Vertex AI
Containerization: Docker, Podman
Orchestration: Kubernetes, ECS, GKE
Model Serving: Triton, TorchServe, BentoML, KServe
Monitoring: Evidently, WhyLogs, Arize, Fiddler
Feature Store: Feast, Tecton, Vertex AI Feature Store

2. Data Version Control (DVC)

2.1 Introduction to DVC

DVC (Data Version Control) is a version control tool for ML projects designed to work alongside Git. It allows you to version large datasets and ML models just like Git.

# Install DVC
pip install dvc

# Install with S3 support
pip install "dvc[s3]"

# Install with GCS support
pip install "dvc[gs]"

# Install with all remote support
pip install "dvc[all]"

2.2 DVC Initialization and Basic Usage

# Initialize Git repository (if needed)
git init

# Initialize DVC
dvc init

# Inspect created files
ls .dvc/
# config  .gitignore  tmp/

# Start tracking data
dvc add data/train.csv

# A .dvc file is created (tracked by Git)
git add data/train.csv.dvc data/.gitignore
git commit -m "Add training data"

2.3 Configuring Remote Storage

# Configure S3 remote storage
dvc remote add -d myremote s3://my-bucket/dvc-store

# Set AWS credentials (environment variables or ~/.aws/credentials)
dvc remote modify myremote access_key_id YOUR_ACCESS_KEY
dvc remote modify myremote secret_access_key YOUR_SECRET_KEY

# GCS remote storage
dvc remote add -d gcsstorage gs://my-bucket/dvc-store

# Local remote storage (for testing)
dvc remote add -d localremote /tmp/dvc-storage

# Push data
dvc push

# Pull data
dvc pull

2.4 DVC Pipelines

DVC pipelines track dependencies between stages and re-run only stages that have changed.

# dvc.yaml
stages:
  prepare:
    cmd: python src/prepare.py
    deps:
      - src/prepare.py
      - data/raw
    outs:
      - data/prepared

  featurize:
    cmd: python src/featurize.py
    deps:
      - src/featurize.py
      - data/prepared
    outs:
      - data/features

  train:
    cmd: python src/train.py
    deps:
      - src/train.py
      - data/features
    params:
      - params.yaml:
          - train.lr
          - train.n_estimators
    outs:
      - models/model.pkl
    metrics:
      - metrics/scores.json:
          cache: false

  evaluate:
    cmd: python src/evaluate.py
    deps:
      - src/evaluate.py
      - models/model.pkl
      - data/features
    metrics:
      - metrics/eval.json:
          cache: false
# Run the pipeline
dvc repro

# Run up to a specific stage
dvc repro train

# Visualize the pipeline DAG
dvc dag

# Compare experiment results
dvc metrics show
dvc metrics diff HEAD~1

2.5 Parameter Management

# params.yaml
train:
  lr: 0.001
  n_estimators: 100
  max_depth: 5
  batch_size: 32
  epochs: 10

data:
  test_size: 0.2
  random_state: 42
# src/train.py
import yaml

# Load parameters
with open("params.yaml") as f:
    params = yaml.safe_load(f)

lr = params["train"]["lr"]
n_estimators = params["train"]["n_estimators"]

# Train model
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(
    n_estimators=n_estimators,
    max_depth=params["train"]["max_depth"],
    random_state=params["data"]["random_state"]
)

2.6 Git + DVC Workflow

# Create a new experiment branch
git checkout -b experiment/increase-lr

# Modify parameters
# In params.yaml, change lr: 0.001 to lr: 0.01

# Re-run the pipeline
dvc repro

# Check results
dvc metrics show

# Commit
git add dvc.lock params.yaml
git commit -m "Experiment: increase learning rate to 0.01"

# Compare experiments
git checkout main
dvc metrics diff experiment/increase-lr

3. Experiment Tracking

3.1 MLflow Installation and Setup

# Install MLflow
pip install mlflow

# Start MLflow UI
mlflow ui

# Specify host and port
mlflow ui --host 0.0.0.0 --port 5001

# Set remote tracking server
export MLFLOW_TRACKING_URI=http://mlflow-server:5000

3.2 Basic MLflow Usage

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import numpy as np

# Set tracking URI (default: ./mlruns)
mlflow.set_tracking_uri("http://localhost:5000")

# Set experiment
mlflow.set_experiment("my-classification-experiment")

# Run experiment
with mlflow.start_run(run_name="random-forest-v1"):
    # Log parameters
    n_estimators = 100
    max_depth = 5
    lr = 0.001

    mlflow.log_param("n_estimators", n_estimators)
    mlflow.log_param("max_depth", max_depth)
    mlflow.log_param("learning_rate", lr)

    # Train model
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    model = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        random_state=42
    )
    model.fit(X_train, y_train)

    # Log metrics
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='weighted')

    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("f1_score", f1)

    # Step-wise metrics (training curve)
    for epoch in range(10):
        train_loss = np.random.random() * 0.5 / (epoch + 1)
        mlflow.log_metric("train_loss", train_loss, step=epoch)

    # Log artifacts
    mlflow.log_artifact("data/train.csv")

    # Log model
    mlflow.sklearn.log_model(
        model,
        "model",
        registered_model_name="RandomForestClassifier"
    )

    print(f"Accuracy: {accuracy:.4f}, F1: {f1:.4f}")
    print(f"Run ID: {mlflow.active_run().info.run_id}")

3.3 MLflow Autologging

# Enable autologging for sklearn
mlflow.sklearn.autolog()

with mlflow.start_run():
    model = RandomForestClassifier(n_estimators=100)
    model.fit(X_train, y_train)
    # Parameters, metrics, and model are logged automatically

# PyTorch autologging
mlflow.pytorch.autolog()

# XGBoost autologging
mlflow.xgboost.autolog()

3.4 Real-World: Tracking PyTorch Training

import torch
import torch.nn as nn
import torch.optim as optim
import mlflow
import mlflow.pytorch

class SimpleNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        return self.fc2(x)

def train_with_mlflow(
    model, train_loader, val_loader,
    epochs=10, lr=0.001, experiment_name="pytorch-training"
):
    mlflow.set_experiment(experiment_name)

    with mlflow.start_run():
        # Log hyperparameters
        mlflow.log_params({
            "epochs": epochs,
            "learning_rate": lr,
            "model_type": "SimpleNet",
            "optimizer": "Adam",
            "batch_size": train_loader.batch_size,
        })

        optimizer = optim.Adam(model.parameters(), lr=lr)
        criterion = nn.CrossEntropyLoss()
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

        for epoch in range(epochs):
            # Training
            model.train()
            train_loss = 0.0
            for batch_X, batch_y in train_loader:
                optimizer.zero_grad()
                outputs = model(batch_X)
                loss = criterion(outputs, batch_y)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()

            train_loss /= len(train_loader)

            # Validation
            model.eval()
            val_loss = 0.0
            correct = 0
            total = 0

            with torch.no_grad():
                for batch_X, batch_y in val_loader:
                    outputs = model(batch_X)
                    loss = criterion(outputs, batch_y)
                    val_loss += loss.item()

                    _, predicted = outputs.max(1)
                    total += batch_y.size(0)
                    correct += predicted.eq(batch_y).sum().item()

            val_loss /= len(val_loader)
            val_accuracy = correct / total

            # Log step-wise metrics
            mlflow.log_metrics({
                "train_loss": train_loss,
                "val_loss": val_loss,
                "val_accuracy": val_accuracy,
                "learning_rate": scheduler.get_last_lr()[0],
            }, step=epoch)

            scheduler.step()

            print(f"Epoch {epoch+1}/{epochs} | "
                  f"Train Loss: {train_loss:.4f} | "
                  f"Val Loss: {val_loss:.4f} | "
                  f"Val Acc: {val_accuracy:.4f}")

        # Save final model
        mlflow.pytorch.log_model(model, "model")

        return mlflow.active_run().info.run_id

3.5 W&B (Weights & Biases) Usage

# Install W&B
pip install wandb

# Login
wandb login
import wandb
import torch

# Initialize W&B
wandb.init(
    project="my-ml-project",
    name="experiment-001",
    config={
        "learning_rate": 0.001,
        "epochs": 100,
        "batch_size": 64,
        "architecture": "ResNet50",
        "dataset": "ImageNet",
    }
)

# Access config values
lr = wandb.config.learning_rate

# Training loop
for epoch in range(wandb.config.epochs):
    train_loss, train_acc = train_epoch(model, train_loader)
    val_loss, val_acc = eval_epoch(model, val_loader)

    # Log metrics
    wandb.log({
        "epoch": epoch,
        "train_loss": train_loss,
        "train_accuracy": train_acc,
        "val_loss": val_loss,
        "val_accuracy": val_acc,
        "learning_rate": optimizer.param_groups[0]['lr'],
    })

# Save model
wandb.save("model.pth")

# Log images
wandb.log({
    "predictions": [
        wandb.Image(img, caption=f"Pred: {pred}, True: {true}")
        for img, pred, true in sample_predictions
    ]
})

# W&B Artifacts (dataset versioning)
artifact = wandb.Artifact("training-data", type="dataset")
artifact.add_dir("data/train")
wandb.log_artifact(artifact)

wandb.finish()

3.6 W&B Sweeps (Hyperparameter Optimization)

import wandb

# Sweep configuration
sweep_config = {
    "method": "bayes",  # random, grid, bayes
    "metric": {
        "name": "val_accuracy",
        "goal": "maximize"
    },
    "parameters": {
        "learning_rate": {
            "distribution": "log_uniform_values",
            "min": 1e-5,
            "max": 1e-1,
        },
        "batch_size": {
            "values": [16, 32, 64, 128]
        },
        "hidden_dim": {
            "values": [64, 128, 256, 512]
        },
        "dropout": {
            "distribution": "uniform",
            "min": 0.1,
            "max": 0.5,
        }
    }
}

def train_sweep():
    with wandb.init() as run:
        config = wandb.config

        model = SimpleNet(
            input_dim=784,
            hidden_dim=config.hidden_dim,
            output_dim=10
        )

        optimizer = torch.optim.Adam(
            model.parameters(),
            lr=config.learning_rate
        )

        for epoch in range(10):
            train_loss, val_loss, val_acc = run_epoch(
                model, optimizer, config.batch_size
            )
            wandb.log({
                "train_loss": train_loss,
                "val_loss": val_loss,
                "val_accuracy": val_acc,
            })

# Create and run sweep
sweep_id = wandb.sweep(sweep_config, project="my-project")
wandb.agent(sweep_id, function=train_sweep, count=50)

4. ML Pipeline Orchestration

4.1 ML Pipelines with Apache Airflow

# dags/ml_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'mlops-team',
    'depends_on_past': False,
    'start_date': datetime(2026, 1, 1),
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'ml_training_pipeline',
    default_args=default_args,
    description='ML model training pipeline',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    catchup=False,
)

def extract_data(**kwargs):
    """Extract data"""
    import pandas as pd
    from sqlalchemy import create_engine

    engine = create_engine('postgresql://user:pass@db:5432/mldb')
    df = pd.read_sql(
        "SELECT * FROM features WHERE date >= CURRENT_DATE - 7",
        engine
    )

    output_path = '/tmp/raw_data.parquet'
    df.to_parquet(output_path)
    kwargs['ti'].xcom_push(key='data_path', value=output_path)
    return output_path

def preprocess_data(**kwargs):
    """Preprocess data"""
    ti = kwargs['ti']
    data_path = ti.xcom_pull(task_ids='extract_data', key='data_path')

    import pandas as pd
    from sklearn.preprocessing import StandardScaler

    df = pd.read_parquet(data_path)
    scaler = StandardScaler()
    features = df.drop('target', axis=1)
    scaled_features = scaler.fit_transform(features)

    processed_path = '/tmp/processed_data.parquet'
    pd.DataFrame(scaled_features).to_parquet(processed_path)
    ti.xcom_push(key='processed_path', value=processed_path)

def train_model(**kwargs):
    """Train model"""
    ti = kwargs['ti']
    processed_path = ti.xcom_pull(
        task_ids='preprocess_data',
        key='processed_path'
    )

    import mlflow
    import pandas as pd
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.model_selection import train_test_split

    mlflow.set_experiment("production-training")

    with mlflow.start_run():
        data = pd.read_parquet(processed_path)
        X = data.drop('target', axis=1)
        y = data['target']
        X_train, X_test, y_train, y_test = train_test_split(X, y)

        model = GradientBoostingClassifier(n_estimators=200)
        model.fit(X_train, y_train)

        accuracy = model.score(X_test, y_test)
        mlflow.log_metric("accuracy", accuracy)
        mlflow.sklearn.log_model(model, "model")

        run_id = mlflow.active_run().info.run_id
        ti.xcom_push(key='run_id', value=run_id)

def evaluate_and_deploy(**kwargs):
    """Evaluate and deploy model"""
    ti = kwargs['ti']
    run_id = ti.xcom_pull(task_ids='train_model', key='run_id')

    import mlflow
    client = mlflow.tracking.MlflowClient()

    run = client.get_run(run_id)
    accuracy = run.data.metrics['accuracy']

    if accuracy >= 0.90:
        model_uri = f"runs:/{run_id}/model"
        mlflow.register_model(model_uri, "ProductionModel")
        print(f"Model deployed with accuracy: {accuracy:.4f}")
    else:
        raise ValueError(f"Model accuracy {accuracy:.4f} below threshold 0.90")

# Define tasks
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
)

preprocess_task = PythonOperator(
    task_id='preprocess_data',
    python_callable=preprocess_data,
    dag=dag,
)

train_task = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag,
)

evaluate_task = PythonOperator(
    task_id='evaluate_and_deploy',
    python_callable=evaluate_and_deploy,
    dag=dag,
)

# Set dependencies
extract_task >> preprocess_task >> train_task >> evaluate_task

4.2 ML Pipelines with Prefect

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def load_data(data_path: str) -> pd.DataFrame:
    """Load data (with caching)"""
    return pd.read_parquet(data_path)

@task(retries=3, retry_delay_seconds=10)
def preprocess(df: pd.DataFrame) -> pd.DataFrame:
    """Preprocess data"""
    df = df.dropna()
    df = df.drop_duplicates()
    return df

@task
def train(df: pd.DataFrame, params: dict) -> str:
    """Train model"""
    from sklearn.ensemble import RandomForestClassifier

    mlflow.set_experiment("prefect-ml-pipeline")

    with mlflow.start_run():
        model = RandomForestClassifier(**params)
        X = df.drop('target', axis=1)
        y = df['target']
        model.fit(X, y)

        mlflow.sklearn.log_model(model, "model")
        mlflow.log_params(params)

        return mlflow.active_run().info.run_id

@task
def evaluate(run_id: str) -> float:
    """Evaluate model"""
    client = mlflow.tracking.MlflowClient()
    run = client.get_run(run_id)
    return run.data.metrics.get('accuracy', 0.0)

@flow(name="ml-training-pipeline")
def ml_pipeline(data_path: str, params: dict = None):
    """Main pipeline"""
    if params is None:
        params = {"n_estimators": 100, "max_depth": 5}

    df = load_data(data_path)
    processed_df = preprocess(df)
    run_id = train(processed_df, params)
    accuracy = evaluate(run_id)

    print(f"Pipeline complete. Accuracy: {accuracy:.4f}")
    return accuracy

if __name__ == "__main__":
    ml_pipeline(
        data_path="data/train.parquet",
        params={"n_estimators": 200, "max_depth": 7}
    )

5. Model Registry

5.1 MLflow Model Registry

The model registry provides centralized management of model versions and tracks Staging/Production state.

import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient()

# Register a model
model_uri = "runs:/abc123def456/model"
registered_model = mlflow.register_model(model_uri, "MyClassifier")

# Check version info
print(f"Version: {registered_model.version}")
print(f"Status: {registered_model.status}")

# Update model version metadata
client.update_model_version(
    name="MyClassifier",
    version=registered_model.version,
    description="Random Forest with 200 estimators, accuracy 0.94"
)

# Add tags
client.set_model_version_tag(
    name="MyClassifier",
    version=registered_model.version,
    key="validated_by",
    value="data-science-team"
)

5.2 Promoting from Staging to Production

# Transition to Staging
client.transition_model_version_stage(
    name="MyClassifier",
    version=1,
    stage="Staging",
    archive_existing_versions=False
)

# Promote to Production after validation
def promote_to_production(model_name: str, version: int, min_accuracy: float = 0.90):
    """Promote model to Production"""
    model_version = client.get_model_version(model_name, version)
    run_id = model_version.run_id
    run = client.get_run(run_id)

    accuracy = run.data.metrics.get('accuracy', 0)

    if accuracy < min_accuracy:
        raise ValueError(
            f"Model accuracy {accuracy:.4f} is below minimum {min_accuracy}"
        )

    # Promote to Production (archive existing Production versions)
    client.transition_model_version_stage(
        name=model_name,
        version=version,
        stage="Production",
        archive_existing_versions=True
    )

    print(f"Model {model_name} v{version} promoted to Production!")
    print(f"Accuracy: {accuracy:.4f}")

# Load Production model
production_model = mlflow.pyfunc.load_model(
    model_uri=f"models:/MyClassifier/Production"
)
predictions = production_model.predict(X_test)

5.3 Model Serving (MLflow built-in)

# Serve MLflow model
mlflow models serve \
  -m "models:/MyClassifier/Production" \
  --host 0.0.0.0 \
  --port 5001

# Make prediction request
curl -X POST http://localhost:5001/invocations \
  -H "Content-Type: application/json" \
  -d '{"dataframe_split": {"columns": ["feature1", "feature2"], "data": [[1.0, 2.0]]}}'

6. Containerization (Docker)

6.1 Dockerizing an ML Environment

# Dockerfile.train - Training image
FROM python:3.11-slim

WORKDIR /app

RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    git \
    && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src/ ./src/
COPY params.yaml .
COPY dvc.yaml .

CMD ["python", "src/train.py"]

6.2 Multi-Stage Builds

# Dockerfile.serve - Multi-stage build for serving
# Stage 1: Builder
FROM python:3.11 AS builder

WORKDIR /build

COPY requirements.txt .
RUN pip install --no-cache-dir --target /install -r requirements.txt

# Stage 2: Final slim image
FROM python:3.11-slim AS runtime

WORKDIR /app

COPY --from=builder /install /usr/local/lib/python3.11/site-packages

COPY src/serve.py .
COPY models/ ./models/

EXPOSE 8080

RUN useradd -m -u 1000 mluser
USER mluser

CMD ["python", "serve.py"]

6.3 GPU Docker (nvidia-docker)

# Dockerfile.gpu - GPU-enabled image
FROM nvidia/cuda:12.1.0-cudnn8-devel-ubuntu22.04

ENV DEBIAN_FRONTEND=noninteractive

RUN apt-get update && apt-get install -y \
    python3.11 \
    python3-pip \
    git \
    && rm -rf /var/lib/apt/lists/*

RUN pip3 install torch torchvision torchaudio \
    --index-url https://download.pytorch.org/whl/cu121

WORKDIR /app
COPY requirements-gpu.txt .
RUN pip3 install --no-cache-dir -r requirements-gpu.txt

COPY src/ ./src/

CMD ["python3", "src/train_gpu.py"]
# Run GPU container
docker run --gpus all \
  -v /data:/data \
  -v /models:/models \
  --shm-size=8gb \
  my-ml-gpu:latest

6.4 ML Stack with Docker Compose

# docker-compose.yml
version: '3.8'

services:
  mlflow:
    image: ghcr.io/mlflow/mlflow:latest
    command: >
      mlflow server
      --backend-store-uri postgresql://mlflow:password@postgres:5432/mlflow
      --default-artifact-root s3://my-bucket/mlflow
      --host 0.0.0.0
      --port 5000
    ports:
      - '5000:5000'
    depends_on:
      - postgres

  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: mlflow
      POSTGRES_USER: mlflow
      POSTGRES_PASSWORD: password
    volumes:
      - postgres_data:/var/lib/postgresql/data

  minio:
    image: minio/minio:latest
    command: server /data --console-address ":9001"
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin
    ports:
      - '9000:9000'
      - '9001:9001'
    volumes:
      - minio_data:/data

  trainer:
    build:
      context: .
      dockerfile: Dockerfile.train
    environment:
      MLFLOW_TRACKING_URI: http://mlflow:5000
    volumes:
      - ./data:/app/data
      - ./models:/app/models
    depends_on:
      - mlflow
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]

volumes:
  postgres_data:
  minio_data:

6.5 Real-World: PyTorch Model Server Dockerfile

# Dockerfile.pytorch-serve
FROM python:3.11-slim

WORKDIR /app

RUN pip install --no-cache-dir \
    torch==2.2.0+cpu \
    torchvision \
    fastapi \
    uvicorn \
    pydantic \
    numpy \
    pillow \
    --index-url https://download.pytorch.org/whl/cpu

COPY src/server.py .
COPY models/model.pt .

EXPOSE 8000

CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]
# src/server.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
import numpy as np
from typing import List

app = FastAPI(title="ML Model Server")

# Load model once at server startup
model = torch.jit.load("model.pt")
model.eval()

class PredictionRequest(BaseModel):
    features: List[List[float]]

class PredictionResponse(BaseModel):
    predictions: List[int]
    probabilities: List[List[float]]

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    try:
        tensor = torch.tensor(request.features, dtype=torch.float32)

        with torch.no_grad():
            outputs = model(tensor)
            probabilities = torch.softmax(outputs, dim=1)
            predictions = torch.argmax(probabilities, dim=1)

        return PredictionResponse(
            predictions=predictions.tolist(),
            probabilities=probabilities.tolist()
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

7. Kubernetes for ML

7.1 K8s Basics for ML Workloads

# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: ml-platform
  labels:
    app: ml-platform
    environment: production

7.2 Training Job

# k8s/training-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: ml-training-job
  namespace: ml-platform
spec:
  backoffLimit: 3
  template:
    spec:
      restartPolicy: OnFailure

      nodeSelector:
        accelerator: nvidia-t4

      tolerations:
        - key: nvidia.com/gpu
          operator: Exists
          effect: NoSchedule

      containers:
        - name: trainer
          image: my-registry/ml-trainer:v1.2.0

          resources:
            requests:
              memory: '8Gi'
              cpu: '4'
              nvidia.com/gpu: '1'
            limits:
              memory: '16Gi'
              cpu: '8'
              nvidia.com/gpu: '1'

          env:
            - name: MLFLOW_TRACKING_URI
              value: 'http://mlflow-service.ml-platform:5000'
            - name: AWS_ACCESS_KEY_ID
              valueFrom:
                secretKeyRef:
                  name: aws-credentials
                  key: access_key_id

          volumeMounts:
            - name: training-data
              mountPath: /data
            - name: dshm
              mountPath: /dev/shm

      volumes:
        - name: training-data
          persistentVolumeClaim:
            claimName: training-data-pvc
        - name: dshm
          emptyDir:
            medium: Memory
            sizeLimit: 8Gi

7.3 Scheduled Retraining CronJob

# k8s/training-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
  name: daily-retraining
  namespace: ml-platform
spec:
  schedule: '0 2 * * *' # Daily at 2 AM
  concurrencyPolicy: Forbid
  successfulJobsHistoryLimit: 3
  failedJobsHistoryLimit: 3

  jobTemplate:
    spec:
      template:
        spec:
          restartPolicy: OnFailure
          containers:
            - name: retrainer
              image: my-registry/ml-trainer:latest
              command: ['python', 'src/retrain.py']
              resources:
                requests:
                  memory: '4Gi'
                  cpu: '2'
                limits:
                  memory: '8Gi'
                  cpu: '4'

7.4 Model Serving Deployment with HPA

# k8s/model-serving-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: model-server
  namespace: ml-platform
spec:
  replicas: 3
  selector:
    matchLabels:
      app: model-server

  template:
    metadata:
      labels:
        app: model-server
    spec:
      containers:
        - name: model-server
          image: my-registry/model-server:v1.0.0
          ports:
            - containerPort: 8000

          resources:
            requests:
              memory: '2Gi'
              cpu: '1'
            limits:
              memory: '4Gi'
              cpu: '2'

          livenessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 30
            periodSeconds: 10

          readinessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 15
            periodSeconds: 5

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: model-server-hpa
  namespace: ml-platform
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: model-server
  minReplicas: 2
  maxReplicas: 20
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70

8. Kubeflow

8.1 Introduction to Kubeflow

Kubeflow is an open-source platform for managing ML workflows on Kubernetes. It consists of:

  • Pipelines: ML workflow orchestration
  • Notebooks: Managed Jupyter notebooks
  • Katib: Hyperparameter tuning
  • KServe: Model serving
  • Training Operator: Distributed training

8.2 Kubeflow Pipelines

# kubeflow_pipeline.py
import kfp
from kfp import dsl

@dsl.component(base_image="python:3.11", packages_to_install=["pandas", "scikit-learn"])
def prepare_data(data_path: str, output_path: kfp.dsl.OutputPath(str)):
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_parquet(data_path)
    train, test = train_test_split(df, test_size=0.2)
    train.to_parquet(f"{output_path}/train.parquet")
    test.to_parquet(f"{output_path}/test.parquet")

@dsl.component(
    base_image="python:3.11",
    packages_to_install=["scikit-learn", "mlflow", "pandas"]
)
def train_model(
    data_path: str,
    n_estimators: int,
    max_depth: int,
    model_output: kfp.dsl.OutputPath(str),
    mlflow_uri: str
):
    import mlflow
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier

    mlflow.set_tracking_uri(mlflow_uri)

    df = pd.read_parquet(f"{data_path}/train.parquet")
    X = df.drop('target', axis=1)
    y = df['target']

    with mlflow.start_run():
        model = RandomForestClassifier(
            n_estimators=n_estimators,
            max_depth=max_depth
        )
        model.fit(X, y)

        run_id = mlflow.active_run().info.run_id
        mlflow.sklearn.log_model(model, "model")

    with open(model_output, 'w') as f:
        f.write(run_id)

@dsl.pipeline(
    name="ML Training Pipeline",
    description="End-to-end ML training pipeline"
)
def ml_pipeline(
    data_path: str = "gs://my-bucket/data/train.parquet",
    n_estimators: int = 100,
    max_depth: int = 5,
    mlflow_uri: str = "http://mlflow:5000"
):
    prepare_task = prepare_data(data_path=data_path)

    train_task = train_model(
        data_path=prepare_task.output,
        n_estimators=n_estimators,
        max_depth=max_depth,
        mlflow_uri=mlflow_uri
    )

if __name__ == "__main__":
    kfp.compiler.Compiler().compile(
        pipeline_func=ml_pipeline,
        package_path="ml_pipeline.yaml"
    )

    client = kfp.Client(host="http://kubeflow-host/pipeline")
    run = client.create_run_from_pipeline_package(
        pipeline_file="ml_pipeline.yaml",
        arguments={"n_estimators": 200, "max_depth": 7},
        run_name="training-run-001"
    )

9. CI/CD for ML

9.1 GitHub Actions for ML

# .github/workflows/ml-pipeline.yml
name: ML CI/CD Pipeline

on:
  push:
    branches: [main, develop]
    paths:
      - 'src/**'
      - 'params.yaml'
      - 'requirements.txt'
  pull_request:
    branches: [main]

env:
  MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}

jobs:
  test:
    name: Unit Tests
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          cache: 'pip'
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-cov
      - name: Run unit tests
        run: pytest tests/unit/ -v --cov=src --cov-report=xml

  train-and-evaluate:
    name: Train and Evaluate Model
    runs-on: ubuntu-latest
    needs: test
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v4
      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      - name: Install dependencies
        run: pip install -r requirements.txt
      - name: Run DVC pipeline
        run: dvc repro
      - name: Check model performance
        run: |
          python scripts/check_metrics.py \
            --min-accuracy 0.90 \
            --metrics-file metrics/scores.json
      - name: Register model
        run: python scripts/register_model.py

  build-and-push:
    name: Build Docker Image
    runs-on: ubuntu-latest
    needs: train-and-evaluate
    steps:
      - uses: actions/checkout@v4
      - name: Login to Container Registry
        uses: docker/login-action@v3
        with:
          registry: ghcr.io
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}
      - name: Build and push
        uses: docker/build-push-action@v5
        with:
          context: .
          file: Dockerfile.serve
          push: true
          tags: |
            ghcr.io/${{ github.repository }}/model-server:latest
            ghcr.io/${{ github.repository }}/model-server:${{ github.sha }}

  deploy:
    name: Deploy to Kubernetes
    runs-on: ubuntu-latest
    needs: build-and-push
    environment: production
    steps:
      - uses: actions/checkout@v4
      - name: Configure kubectl
        uses: azure/k8s-set-context@v3
        with:
          kubeconfig: ${{ secrets.KUBECONFIG }}
      - name: Deploy
        run: |
          kubectl set image deployment/model-server \
            model-server=ghcr.io/${{ github.repository }}/model-server:${{ github.sha }} \
            -n ml-platform
          kubectl rollout status deployment/model-server -n ml-platform

10. Model Monitoring

10.1 Data Drift Detection with Evidently AI

pip install evidently
import pandas as pd
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.metrics import ColumnDriftMetric

# Reference data (training-time data)
reference_data = pd.read_parquet("data/reference.parquet")

# Current data (production prediction data)
current_data = pd.read_parquet("data/current.parquet")

# Data drift report
drift_report = Report(metrics=[
    DataDriftPreset(),
    DataQualityPreset(),
    ColumnDriftMetric(column_name="age"),
    ColumnDriftMetric(column_name="income"),
])

drift_report.run(
    reference_data=reference_data,
    current_data=current_data
)

drift_report.save_html("drift_report.html")

result = drift_report.as_dict()
drift_detected = result['metrics'][0]['result']['dataset_drift']

if drift_detected:
    print("Data drift detected! Consider retraining.")
    send_alert("Data drift detected in production model")

10.2 Model Performance Monitoring

from evidently.report import Report
from evidently.metric_preset import ClassificationPreset

prediction_data = pd.DataFrame({
    'target': y_true,
    'prediction': y_pred,
    'prediction_proba': y_proba,
    'feature1': X_test['feature1'],
    'feature2': X_test['feature2'],
})

performance_report = Report(metrics=[ClassificationPreset()])
performance_report.run(
    reference_data=reference_predictions,
    current_data=prediction_data
)

metrics = performance_report.as_dict()
current_accuracy = metrics['metrics'][0]['result']['current']['accuracy']
reference_accuracy = metrics['metrics'][0]['result']['reference']['accuracy']

degradation = reference_accuracy - current_accuracy
if degradation > 0.05:
    trigger_retraining(f"Model degraded by {degradation:.2%}")

10.3 Real-Time Monitoring with Prometheus

from prometheus_client import Counter, Histogram, Gauge, start_http_server

prediction_counter = Counter('model_predictions_total', 'Total predictions')
prediction_latency = Histogram('model_prediction_duration_seconds', 'Prediction latency')
model_accuracy_gauge = Gauge('model_accuracy', 'Current model accuracy')
drift_score_gauge = Gauge('data_drift_score', 'Data drift score')

class MonitoredModelServer:
    def __init__(self, model, reference_data):
        self.model = model
        self.reference_data = reference_data
        self.predictions_buffer = []
        start_http_server(8001)

    def predict(self, features):
        with prediction_latency.time():
            prediction = self.model.predict(features)

        prediction_counter.inc()
        self.predictions_buffer.append(prediction)

        if len(self.predictions_buffer) >= 1000:
            self._run_monitoring()

        return prediction

    def _run_monitoring(self):
        current_df = pd.DataFrame(self.predictions_buffer)
        drift_report = Report(metrics=[DataDriftPreset()])
        drift_report.run(
            reference_data=self.reference_data,
            current_data=current_df
        )

        result = drift_report.as_dict()
        drift_share = result['metrics'][0]['result']['share_of_drifted_columns']
        drift_score_gauge.set(drift_share)

        self.predictions_buffer = []

11. Feature Store

11.1 Feature Store Concepts

A Feature Store is infrastructure that enables centralized management and reuse of ML features.

Core Concepts:

  • Online Store: Low-latency feature retrieval for real-time predictions (Redis, DynamoDB)
  • Offline Store: Historical features for batch training (S3, BigQuery)
  • Feature View: Mapping of feature definitions to data sources
  • Entity: Subject of features (User ID, Product ID, etc.)

11.2 Feast Setup and Usage

pip install feast
feast init my-feature-store
cd my-feature-store
# features.py
from feast import Entity, FeatureView, FileSource, ValueType, Field
from feast.types import Float64, Int64
from datetime import timedelta

user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="User ID"
)

user_stats_source = FileSource(
    path="data/user_stats.parquet",
    timestamp_field="event_timestamp",
)

user_stats_fv = FeatureView(
    name="user_statistics",
    entities=[user],
    ttl=timedelta(days=30),
    schema=[
        Field(name="purchase_count_7d", dtype=Int64),
        Field(name="purchase_amount_7d", dtype=Float64),
        Field(name="avg_session_duration", dtype=Float64),
        Field(name="last_purchase_days_ago", dtype=Int64),
    ],
    online=True,
    source=user_stats_source,
)
# feast_usage.py
from feast import FeatureStore
import pandas as pd
from datetime import datetime

store = FeatureStore(repo_path=".")

# Offline feature retrieval (for training)
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003],
    "event_timestamp": pd.to_datetime(["2026-01-01", "2026-01-02", "2026-01-03"])
})

training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_statistics:purchase_count_7d",
        "user_statistics:purchase_amount_7d",
        "user_statistics:avg_session_duration",
    ]
).to_df()

# Materialize to online store
store.materialize_incremental(end_date=datetime.now())

# Online feature retrieval (for real-time inference)
online_features = store.get_online_features(
    features=[
        "user_statistics:purchase_count_7d",
        "user_statistics:purchase_amount_7d",
    ],
    entity_rows=[
        {"user_id": 1001},
        {"user_id": 1002},
    ]
).to_dict()

12. Real-World MLOps Project

12.1 Full Pipeline Architecture

Data Sources (DB, S3, API)
Data Collection (Airflow DAG)
Data Validation (Great Expectations)
Feature Engineering (Feast)
Model Training (MLflow tracking)
Model Evaluation (Automated validation)
Model Registry (MLflow Registry)
CI/CD (GitHub Actions)
Container Build (Docker)
K8s Deployment (Kubernetes)
Serving (FastAPI + Triton)
Monitoring (Evidently + Prometheus + Grafana)
Alerts (Trigger retraining on drift detection)

12.2 Real-World Project: Customer Churn Prediction System

# scripts/full_pipeline.py
import mlflow
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, roc_auc_score
from sklearn.preprocessing import StandardScaler
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ChurnPredictionPipeline:
    """Customer churn prediction pipeline"""

    def __init__(self, mlflow_uri: str = "http://localhost:5000"):
        mlflow.set_tracking_uri(mlflow_uri)
        mlflow.set_experiment("churn-prediction")
        self.scaler = StandardScaler()

    def load_data(self, data_path: str) -> pd.DataFrame:
        logger.info(f"Loading data from {data_path}")
        df = pd.read_parquet(data_path)

        assert df.shape[0] > 1000, "Too few samples"
        assert 'churn' in df.columns, "Target column 'churn' missing"

        missing_rate = df.isnull().mean()
        high_missing = missing_rate[missing_rate > 0.5].index.tolist()
        if high_missing:
            logger.warning(f"Columns with >50% missing: {high_missing}")
            df = df.drop(columns=high_missing)

        return df

    def engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
        logger.info("Engineering features...")

        numeric_cols = df.select_dtypes(include=[np.number]).columns
        df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].median())

        if 'tenure_months' in df.columns and 'monthly_charges' in df.columns:
            df['total_value'] = df['tenure_months'] * df['monthly_charges']

        if 'num_support_tickets' in df.columns and 'tenure_months' in df.columns:
            df['tickets_per_month'] = (
                df['num_support_tickets'] / (df['tenure_months'] + 1)
            )

        return df

    def train(self, df: pd.DataFrame, params: dict = None) -> str:
        if params is None:
            params = {
                "n_estimators": 200,
                "max_depth": 5,
                "learning_rate": 0.05,
                "subsample": 0.8,
                "random_state": 42,
            }

        feature_cols = [c for c in df.columns if c != 'churn']
        X = df[feature_cols]
        y = df['churn']

        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )

        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)

        with mlflow.start_run(run_name="churn-gbt"):
            mlflow.log_params(params)
            mlflow.log_param("train_size", len(X_train))
            mlflow.log_param("n_features", len(feature_cols))

            model = GradientBoostingClassifier(**params)
            model.fit(X_train_scaled, y_train)

            y_pred = model.predict(X_test_scaled)
            y_proba = model.predict_proba(X_test_scaled)[:, 1]

            accuracy = accuracy_score(y_test, y_pred)
            auc_roc = roc_auc_score(y_test, y_proba)

            mlflow.log_metrics({
                "accuracy": accuracy,
                "auc_roc": auc_roc,
            })

            cv_scores = cross_val_score(
                model, X_train_scaled, y_train, cv=5, scoring='roc_auc'
            )
            mlflow.log_metric("cv_auc_mean", cv_scores.mean())
            mlflow.log_metric("cv_auc_std", cv_scores.std())

            mlflow.sklearn.log_model(
                model,
                "model",
                registered_model_name="ChurnPredictor"
            )

            run_id = mlflow.active_run().info.run_id
            logger.info(f"Training complete. AUC-ROC: {auc_roc:.4f}")
            return run_id

    def promote_best_model(self, min_auc: float = 0.85):
        client = mlflow.tracking.MlflowClient()

        experiment = client.get_experiment_by_name("churn-prediction")
        runs = client.search_runs(
            experiment_ids=[experiment.experiment_id],
            filter_string="metrics.auc_roc > 0.80",
            order_by=["metrics.auc_roc DESC"],
            max_results=1
        )

        if not runs:
            raise ValueError("No runs found meeting criteria")

        best_run = runs[0]
        auc = best_run.data.metrics['auc_roc']

        if auc < min_auc:
            raise ValueError(f"Best AUC {auc:.4f} below minimum {min_auc}")

        model_uri = f"runs:/{best_run.info.run_id}/model"
        mv = mlflow.register_model(model_uri, "ChurnPredictor")

        client.transition_model_version_stage(
            name="ChurnPredictor",
            version=mv.version,
            stage="Production",
            archive_existing_versions=True
        )

        logger.info(f"Model v{mv.version} promoted to Production! AUC: {auc:.4f}")
        return mv.version

if __name__ == "__main__":
    pipeline = ChurnPredictionPipeline()
    df = pipeline.load_data("data/customers.parquet")
    df = pipeline.engineer_features(df)
    run_id = pipeline.train(df)
    pipeline.promote_best_model(min_auc=0.85)

Conclusion

MLOps is not merely about learning tools — it is a culture and process for operating ML systems reliably, reproducibly, and at scale.

Keep these core principles in mind:

  1. Version everything: Code, data, models, and environments
  2. Automate first: Manual steps are sources of error
  3. Measure and monitor: You cannot improve what you cannot measure
  4. Detect failures fast: Immediately catch drift and performance degradation
  5. Guarantee reproducibility: Anyone, anywhere, anytime should be able to reproduce the same results

The MLOps journey is incremental. Start at Level 0 and mature at a pace that fits your organization's needs.


References