- Authors

- Name
- Youngju Kim
- @fjvbn20031
Introduction
Modern machine learning projects don't end after building a model in a Jupyter Notebook. You need to reliably manage the entire lifecycle: data collection, preprocessing, training, evaluation, deployment, monitoring, and retraining. This is precisely why MLOps exists.
This guide takes you from the fundamentals of MLOps all the way through practical tool usage.
1. What is MLOps?
1.1 Defining MLOps
MLOps (Machine Learning Operations) is a methodology that integrates the development and operation of ML systems. It applies DevOps principles to ML workflows, automating continuous training (CT), continuous integration (CI), and continuous deployment (CD) of models.
1.2 MLOps vs DevOps vs DataOps
| Category | DevOps | DataOps | MLOps |
|---|---|---|---|
| Focus | Software deployment | Data pipelines | ML model lifecycle |
| Output | Applications | Data/Reports | ML models |
| Reproducibility | Code versioning | Data versioning | Code + Data + Model versioning |
| Automation | CI/CD | Data testing | Training + Evaluation + Deployment |
The key difference between MLOps and DevOps is data dependency. Even with the same code, training on different data produces entirely different models, and model performance is heavily influenced by data quality as much as code quality.
1.3 Unique Characteristics of ML Systems
ML systems have several characteristics distinct from traditional software:
- Data dependency: Model behavior is determined by data, not just code
- Experimental nature: Requires dozens to hundreds of iterative experiments
- Model decay: Model performance degrades as data distributions shift over time
- Reproducibility challenges: Difficult to guarantee identical results in the same environment
- Multiple artifacts: Code, data, and models all require version control
1.4 MLOps Maturity Model
Based on Google's MLOps maturity levels:
Level 0 - Manual ML
- All processes are manual
- Script-based experimentation
- Rare, manual deployments
- No monitoring
Level 1 - ML Pipeline Automation
- Automated training pipeline
- Continuous training enabled
- Experiment tracking begins
- Feature store introduced
Level 2 - CI/CD Pipeline Automation
- Fully automated CI/CD
- Model registry in use
- Automated retraining triggers
- Complete monitoring
1.5 MLOps Tool Ecosystem
Data Version Control: DVC, LakeFS, Delta Lake
Experiment Tracking: MLflow, W&B, Neptune, Comet ML
Pipelines: Airflow, Prefect, Metaflow, Kubeflow Pipelines
Model Registry: MLflow Registry, W&B Artifacts, Vertex AI
Containerization: Docker, Podman
Orchestration: Kubernetes, ECS, GKE
Model Serving: Triton, TorchServe, BentoML, KServe
Monitoring: Evidently, WhyLogs, Arize, Fiddler
Feature Store: Feast, Tecton, Vertex AI Feature Store
2. Data Version Control (DVC)
2.1 Introduction to DVC
DVC (Data Version Control) is a version control tool for ML projects designed to work alongside Git. It allows you to version large datasets and ML models just like Git.
# Install DVC
pip install dvc
# Install with S3 support
pip install "dvc[s3]"
# Install with GCS support
pip install "dvc[gs]"
# Install with all remote support
pip install "dvc[all]"
2.2 DVC Initialization and Basic Usage
# Initialize Git repository (if needed)
git init
# Initialize DVC
dvc init
# Inspect created files
ls .dvc/
# config .gitignore tmp/
# Start tracking data
dvc add data/train.csv
# A .dvc file is created (tracked by Git)
git add data/train.csv.dvc data/.gitignore
git commit -m "Add training data"
2.3 Configuring Remote Storage
# Configure S3 remote storage
dvc remote add -d myremote s3://my-bucket/dvc-store
# Set AWS credentials (environment variables or ~/.aws/credentials)
dvc remote modify myremote access_key_id YOUR_ACCESS_KEY
dvc remote modify myremote secret_access_key YOUR_SECRET_KEY
# GCS remote storage
dvc remote add -d gcsstorage gs://my-bucket/dvc-store
# Local remote storage (for testing)
dvc remote add -d localremote /tmp/dvc-storage
# Push data
dvc push
# Pull data
dvc pull
2.4 DVC Pipelines
DVC pipelines track dependencies between stages and re-run only stages that have changed.
# dvc.yaml
stages:
prepare:
cmd: python src/prepare.py
deps:
- src/prepare.py
- data/raw
outs:
- data/prepared
featurize:
cmd: python src/featurize.py
deps:
- src/featurize.py
- data/prepared
outs:
- data/features
train:
cmd: python src/train.py
deps:
- src/train.py
- data/features
params:
- params.yaml:
- train.lr
- train.n_estimators
outs:
- models/model.pkl
metrics:
- metrics/scores.json:
cache: false
evaluate:
cmd: python src/evaluate.py
deps:
- src/evaluate.py
- models/model.pkl
- data/features
metrics:
- metrics/eval.json:
cache: false
# Run the pipeline
dvc repro
# Run up to a specific stage
dvc repro train
# Visualize the pipeline DAG
dvc dag
# Compare experiment results
dvc metrics show
dvc metrics diff HEAD~1
2.5 Parameter Management
# params.yaml
train:
lr: 0.001
n_estimators: 100
max_depth: 5
batch_size: 32
epochs: 10
data:
test_size: 0.2
random_state: 42
# src/train.py
import yaml
# Load parameters
with open("params.yaml") as f:
params = yaml.safe_load(f)
lr = params["train"]["lr"]
n_estimators = params["train"]["n_estimators"]
# Train model
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=params["train"]["max_depth"],
random_state=params["data"]["random_state"]
)
2.6 Git + DVC Workflow
# Create a new experiment branch
git checkout -b experiment/increase-lr
# Modify parameters
# In params.yaml, change lr: 0.001 to lr: 0.01
# Re-run the pipeline
dvc repro
# Check results
dvc metrics show
# Commit
git add dvc.lock params.yaml
git commit -m "Experiment: increase learning rate to 0.01"
# Compare experiments
git checkout main
dvc metrics diff experiment/increase-lr
3. Experiment Tracking
3.1 MLflow Installation and Setup
# Install MLflow
pip install mlflow
# Start MLflow UI
mlflow ui
# Specify host and port
mlflow ui --host 0.0.0.0 --port 5001
# Set remote tracking server
export MLFLOW_TRACKING_URI=http://mlflow-server:5000
3.2 Basic MLflow Usage
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import numpy as np
# Set tracking URI (default: ./mlruns)
mlflow.set_tracking_uri("http://localhost:5000")
# Set experiment
mlflow.set_experiment("my-classification-experiment")
# Run experiment
with mlflow.start_run(run_name="random-forest-v1"):
# Log parameters
n_estimators = 100
max_depth = 5
lr = 0.001
mlflow.log_param("n_estimators", n_estimators)
mlflow.log_param("max_depth", max_depth)
mlflow.log_param("learning_rate", lr)
# Train model
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42
)
model.fit(X_train, y_train)
# Log metrics
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred, average='weighted')
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)
# Step-wise metrics (training curve)
for epoch in range(10):
train_loss = np.random.random() * 0.5 / (epoch + 1)
mlflow.log_metric("train_loss", train_loss, step=epoch)
# Log artifacts
mlflow.log_artifact("data/train.csv")
# Log model
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="RandomForestClassifier"
)
print(f"Accuracy: {accuracy:.4f}, F1: {f1:.4f}")
print(f"Run ID: {mlflow.active_run().info.run_id}")
3.3 MLflow Autologging
# Enable autologging for sklearn
mlflow.sklearn.autolog()
with mlflow.start_run():
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)
# Parameters, metrics, and model are logged automatically
# PyTorch autologging
mlflow.pytorch.autolog()
# XGBoost autologging
mlflow.xgboost.autolog()
3.4 Real-World: Tracking PyTorch Training
import torch
import torch.nn as nn
import torch.optim as optim
import mlflow
import mlflow.pytorch
class SimpleNet(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super().__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, output_dim)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.3)
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.dropout(x)
return self.fc2(x)
def train_with_mlflow(
model, train_loader, val_loader,
epochs=10, lr=0.001, experiment_name="pytorch-training"
):
mlflow.set_experiment(experiment_name)
with mlflow.start_run():
# Log hyperparameters
mlflow.log_params({
"epochs": epochs,
"learning_rate": lr,
"model_type": "SimpleNet",
"optimizer": "Adam",
"batch_size": train_loader.batch_size,
})
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)
for epoch in range(epochs):
# Training
model.train()
train_loss = 0.0
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_loss /= len(train_loader)
# Validation
model.eval()
val_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for batch_X, batch_y in val_loader:
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
val_loss += loss.item()
_, predicted = outputs.max(1)
total += batch_y.size(0)
correct += predicted.eq(batch_y).sum().item()
val_loss /= len(val_loader)
val_accuracy = correct / total
# Log step-wise metrics
mlflow.log_metrics({
"train_loss": train_loss,
"val_loss": val_loss,
"val_accuracy": val_accuracy,
"learning_rate": scheduler.get_last_lr()[0],
}, step=epoch)
scheduler.step()
print(f"Epoch {epoch+1}/{epochs} | "
f"Train Loss: {train_loss:.4f} | "
f"Val Loss: {val_loss:.4f} | "
f"Val Acc: {val_accuracy:.4f}")
# Save final model
mlflow.pytorch.log_model(model, "model")
return mlflow.active_run().info.run_id
3.5 W&B (Weights & Biases) Usage
# Install W&B
pip install wandb
# Login
wandb login
import wandb
import torch
# Initialize W&B
wandb.init(
project="my-ml-project",
name="experiment-001",
config={
"learning_rate": 0.001,
"epochs": 100,
"batch_size": 64,
"architecture": "ResNet50",
"dataset": "ImageNet",
}
)
# Access config values
lr = wandb.config.learning_rate
# Training loop
for epoch in range(wandb.config.epochs):
train_loss, train_acc = train_epoch(model, train_loader)
val_loss, val_acc = eval_epoch(model, val_loader)
# Log metrics
wandb.log({
"epoch": epoch,
"train_loss": train_loss,
"train_accuracy": train_acc,
"val_loss": val_loss,
"val_accuracy": val_acc,
"learning_rate": optimizer.param_groups[0]['lr'],
})
# Save model
wandb.save("model.pth")
# Log images
wandb.log({
"predictions": [
wandb.Image(img, caption=f"Pred: {pred}, True: {true}")
for img, pred, true in sample_predictions
]
})
# W&B Artifacts (dataset versioning)
artifact = wandb.Artifact("training-data", type="dataset")
artifact.add_dir("data/train")
wandb.log_artifact(artifact)
wandb.finish()
3.6 W&B Sweeps (Hyperparameter Optimization)
import wandb
# Sweep configuration
sweep_config = {
"method": "bayes", # random, grid, bayes
"metric": {
"name": "val_accuracy",
"goal": "maximize"
},
"parameters": {
"learning_rate": {
"distribution": "log_uniform_values",
"min": 1e-5,
"max": 1e-1,
},
"batch_size": {
"values": [16, 32, 64, 128]
},
"hidden_dim": {
"values": [64, 128, 256, 512]
},
"dropout": {
"distribution": "uniform",
"min": 0.1,
"max": 0.5,
}
}
}
def train_sweep():
with wandb.init() as run:
config = wandb.config
model = SimpleNet(
input_dim=784,
hidden_dim=config.hidden_dim,
output_dim=10
)
optimizer = torch.optim.Adam(
model.parameters(),
lr=config.learning_rate
)
for epoch in range(10):
train_loss, val_loss, val_acc = run_epoch(
model, optimizer, config.batch_size
)
wandb.log({
"train_loss": train_loss,
"val_loss": val_loss,
"val_accuracy": val_acc,
})
# Create and run sweep
sweep_id = wandb.sweep(sweep_config, project="my-project")
wandb.agent(sweep_id, function=train_sweep, count=50)
4. ML Pipeline Orchestration
4.1 ML Pipelines with Apache Airflow
# dags/ml_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'mlops-team',
'depends_on_past': False,
'start_date': datetime(2026, 1, 1),
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'ml_training_pipeline',
default_args=default_args,
description='ML model training pipeline',
schedule_interval='0 2 * * *', # Daily at 2 AM
catchup=False,
)
def extract_data(**kwargs):
"""Extract data"""
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@db:5432/mldb')
df = pd.read_sql(
"SELECT * FROM features WHERE date >= CURRENT_DATE - 7",
engine
)
output_path = '/tmp/raw_data.parquet'
df.to_parquet(output_path)
kwargs['ti'].xcom_push(key='data_path', value=output_path)
return output_path
def preprocess_data(**kwargs):
"""Preprocess data"""
ti = kwargs['ti']
data_path = ti.xcom_pull(task_ids='extract_data', key='data_path')
import pandas as pd
from sklearn.preprocessing import StandardScaler
df = pd.read_parquet(data_path)
scaler = StandardScaler()
features = df.drop('target', axis=1)
scaled_features = scaler.fit_transform(features)
processed_path = '/tmp/processed_data.parquet'
pd.DataFrame(scaled_features).to_parquet(processed_path)
ti.xcom_push(key='processed_path', value=processed_path)
def train_model(**kwargs):
"""Train model"""
ti = kwargs['ti']
processed_path = ti.xcom_pull(
task_ids='preprocess_data',
key='processed_path'
)
import mlflow
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
mlflow.set_experiment("production-training")
with mlflow.start_run():
data = pd.read_parquet(processed_path)
X = data.drop('target', axis=1)
y = data['target']
X_train, X_test, y_train, y_test = train_test_split(X, y)
model = GradientBoostingClassifier(n_estimators=200)
model.fit(X_train, y_train)
accuracy = model.score(X_test, y_test)
mlflow.log_metric("accuracy", accuracy)
mlflow.sklearn.log_model(model, "model")
run_id = mlflow.active_run().info.run_id
ti.xcom_push(key='run_id', value=run_id)
def evaluate_and_deploy(**kwargs):
"""Evaluate and deploy model"""
ti = kwargs['ti']
run_id = ti.xcom_pull(task_ids='train_model', key='run_id')
import mlflow
client = mlflow.tracking.MlflowClient()
run = client.get_run(run_id)
accuracy = run.data.metrics['accuracy']
if accuracy >= 0.90:
model_uri = f"runs:/{run_id}/model"
mlflow.register_model(model_uri, "ProductionModel")
print(f"Model deployed with accuracy: {accuracy:.4f}")
else:
raise ValueError(f"Model accuracy {accuracy:.4f} below threshold 0.90")
# Define tasks
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
dag=dag,
)
preprocess_task = PythonOperator(
task_id='preprocess_data',
python_callable=preprocess_data,
dag=dag,
)
train_task = PythonOperator(
task_id='train_model',
python_callable=train_model,
dag=dag,
)
evaluate_task = PythonOperator(
task_id='evaluate_and_deploy',
python_callable=evaluate_and_deploy,
dag=dag,
)
# Set dependencies
extract_task >> preprocess_task >> train_task >> evaluate_task
4.2 ML Pipelines with Prefect
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def load_data(data_path: str) -> pd.DataFrame:
"""Load data (with caching)"""
return pd.read_parquet(data_path)
@task(retries=3, retry_delay_seconds=10)
def preprocess(df: pd.DataFrame) -> pd.DataFrame:
"""Preprocess data"""
df = df.dropna()
df = df.drop_duplicates()
return df
@task
def train(df: pd.DataFrame, params: dict) -> str:
"""Train model"""
from sklearn.ensemble import RandomForestClassifier
mlflow.set_experiment("prefect-ml-pipeline")
with mlflow.start_run():
model = RandomForestClassifier(**params)
X = df.drop('target', axis=1)
y = df['target']
model.fit(X, y)
mlflow.sklearn.log_model(model, "model")
mlflow.log_params(params)
return mlflow.active_run().info.run_id
@task
def evaluate(run_id: str) -> float:
"""Evaluate model"""
client = mlflow.tracking.MlflowClient()
run = client.get_run(run_id)
return run.data.metrics.get('accuracy', 0.0)
@flow(name="ml-training-pipeline")
def ml_pipeline(data_path: str, params: dict = None):
"""Main pipeline"""
if params is None:
params = {"n_estimators": 100, "max_depth": 5}
df = load_data(data_path)
processed_df = preprocess(df)
run_id = train(processed_df, params)
accuracy = evaluate(run_id)
print(f"Pipeline complete. Accuracy: {accuracy:.4f}")
return accuracy
if __name__ == "__main__":
ml_pipeline(
data_path="data/train.parquet",
params={"n_estimators": 200, "max_depth": 7}
)
5. Model Registry
5.1 MLflow Model Registry
The model registry provides centralized management of model versions and tracks Staging/Production state.
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Register a model
model_uri = "runs:/abc123def456/model"
registered_model = mlflow.register_model(model_uri, "MyClassifier")
# Check version info
print(f"Version: {registered_model.version}")
print(f"Status: {registered_model.status}")
# Update model version metadata
client.update_model_version(
name="MyClassifier",
version=registered_model.version,
description="Random Forest with 200 estimators, accuracy 0.94"
)
# Add tags
client.set_model_version_tag(
name="MyClassifier",
version=registered_model.version,
key="validated_by",
value="data-science-team"
)
5.2 Promoting from Staging to Production
# Transition to Staging
client.transition_model_version_stage(
name="MyClassifier",
version=1,
stage="Staging",
archive_existing_versions=False
)
# Promote to Production after validation
def promote_to_production(model_name: str, version: int, min_accuracy: float = 0.90):
"""Promote model to Production"""
model_version = client.get_model_version(model_name, version)
run_id = model_version.run_id
run = client.get_run(run_id)
accuracy = run.data.metrics.get('accuracy', 0)
if accuracy < min_accuracy:
raise ValueError(
f"Model accuracy {accuracy:.4f} is below minimum {min_accuracy}"
)
# Promote to Production (archive existing Production versions)
client.transition_model_version_stage(
name=model_name,
version=version,
stage="Production",
archive_existing_versions=True
)
print(f"Model {model_name} v{version} promoted to Production!")
print(f"Accuracy: {accuracy:.4f}")
# Load Production model
production_model = mlflow.pyfunc.load_model(
model_uri=f"models:/MyClassifier/Production"
)
predictions = production_model.predict(X_test)
5.3 Model Serving (MLflow built-in)
# Serve MLflow model
mlflow models serve \
-m "models:/MyClassifier/Production" \
--host 0.0.0.0 \
--port 5001
# Make prediction request
curl -X POST http://localhost:5001/invocations \
-H "Content-Type: application/json" \
-d '{"dataframe_split": {"columns": ["feature1", "feature2"], "data": [[1.0, 2.0]]}}'
6. Containerization (Docker)
6.1 Dockerizing an ML Environment
# Dockerfile.train - Training image
FROM python:3.11-slim
WORKDIR /app
RUN apt-get update && apt-get install -y \
gcc \
g++ \
git \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ ./src/
COPY params.yaml .
COPY dvc.yaml .
CMD ["python", "src/train.py"]
6.2 Multi-Stage Builds
# Dockerfile.serve - Multi-stage build for serving
# Stage 1: Builder
FROM python:3.11 AS builder
WORKDIR /build
COPY requirements.txt .
RUN pip install --no-cache-dir --target /install -r requirements.txt
# Stage 2: Final slim image
FROM python:3.11-slim AS runtime
WORKDIR /app
COPY /install /usr/local/lib/python3.11/site-packages
COPY src/serve.py .
COPY models/ ./models/
EXPOSE 8080
RUN useradd -m -u 1000 mluser
USER mluser
CMD ["python", "serve.py"]
6.3 GPU Docker (nvidia-docker)
# Dockerfile.gpu - GPU-enabled image
FROM nvidia/cuda:12.1.0-cudnn8-devel-ubuntu22.04
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y \
python3.11 \
python3-pip \
git \
&& rm -rf /var/lib/apt/lists/*
RUN pip3 install torch torchvision torchaudio \
--index-url https://download.pytorch.org/whl/cu121
WORKDIR /app
COPY requirements-gpu.txt .
RUN pip3 install --no-cache-dir -r requirements-gpu.txt
COPY src/ ./src/
CMD ["python3", "src/train_gpu.py"]
# Run GPU container
docker run --gpus all \
-v /data:/data \
-v /models:/models \
--shm-size=8gb \
my-ml-gpu:latest
6.4 ML Stack with Docker Compose
# docker-compose.yml
version: '3.8'
services:
mlflow:
image: ghcr.io/mlflow/mlflow:latest
command: >
mlflow server
--backend-store-uri postgresql://mlflow:password@postgres:5432/mlflow
--default-artifact-root s3://my-bucket/mlflow
--host 0.0.0.0
--port 5000
ports:
- '5000:5000'
depends_on:
- postgres
postgres:
image: postgres:15
environment:
POSTGRES_DB: mlflow
POSTGRES_USER: mlflow
POSTGRES_PASSWORD: password
volumes:
- postgres_data:/var/lib/postgresql/data
minio:
image: minio/minio:latest
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
ports:
- '9000:9000'
- '9001:9001'
volumes:
- minio_data:/data
trainer:
build:
context: .
dockerfile: Dockerfile.train
environment:
MLFLOW_TRACKING_URI: http://mlflow:5000
volumes:
- ./data:/app/data
- ./models:/app/models
depends_on:
- mlflow
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
volumes:
postgres_data:
minio_data:
6.5 Real-World: PyTorch Model Server Dockerfile
# Dockerfile.pytorch-serve
FROM python:3.11-slim
WORKDIR /app
RUN pip install --no-cache-dir \
torch==2.2.0+cpu \
torchvision \
fastapi \
uvicorn \
pydantic \
numpy \
pillow \
--index-url https://download.pytorch.org/whl/cpu
COPY src/server.py .
COPY models/model.pt .
EXPOSE 8000
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]
# src/server.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
import numpy as np
from typing import List
app = FastAPI(title="ML Model Server")
# Load model once at server startup
model = torch.jit.load("model.pt")
model.eval()
class PredictionRequest(BaseModel):
features: List[List[float]]
class PredictionResponse(BaseModel):
predictions: List[int]
probabilities: List[List[float]]
@app.get("/health")
async def health_check():
return {"status": "healthy"}
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
try:
tensor = torch.tensor(request.features, dtype=torch.float32)
with torch.no_grad():
outputs = model(tensor)
probabilities = torch.softmax(outputs, dim=1)
predictions = torch.argmax(probabilities, dim=1)
return PredictionResponse(
predictions=predictions.tolist(),
probabilities=probabilities.tolist()
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
7. Kubernetes for ML
7.1 K8s Basics for ML Workloads
# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: ml-platform
labels:
app: ml-platform
environment: production
7.2 Training Job
# k8s/training-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: ml-training-job
namespace: ml-platform
spec:
backoffLimit: 3
template:
spec:
restartPolicy: OnFailure
nodeSelector:
accelerator: nvidia-t4
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
containers:
- name: trainer
image: my-registry/ml-trainer:v1.2.0
resources:
requests:
memory: '8Gi'
cpu: '4'
nvidia.com/gpu: '1'
limits:
memory: '16Gi'
cpu: '8'
nvidia.com/gpu: '1'
env:
- name: MLFLOW_TRACKING_URI
value: 'http://mlflow-service.ml-platform:5000'
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-credentials
key: access_key_id
volumeMounts:
- name: training-data
mountPath: /data
- name: dshm
mountPath: /dev/shm
volumes:
- name: training-data
persistentVolumeClaim:
claimName: training-data-pvc
- name: dshm
emptyDir:
medium: Memory
sizeLimit: 8Gi
7.3 Scheduled Retraining CronJob
# k8s/training-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: daily-retraining
namespace: ml-platform
spec:
schedule: '0 2 * * *' # Daily at 2 AM
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
jobTemplate:
spec:
template:
spec:
restartPolicy: OnFailure
containers:
- name: retrainer
image: my-registry/ml-trainer:latest
command: ['python', 'src/retrain.py']
resources:
requests:
memory: '4Gi'
cpu: '2'
limits:
memory: '8Gi'
cpu: '4'
7.4 Model Serving Deployment with HPA
# k8s/model-serving-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: model-server
namespace: ml-platform
spec:
replicas: 3
selector:
matchLabels:
app: model-server
template:
metadata:
labels:
app: model-server
spec:
containers:
- name: model-server
image: my-registry/model-server:v1.0.0
ports:
- containerPort: 8000
resources:
requests:
memory: '2Gi'
cpu: '1'
limits:
memory: '4Gi'
cpu: '2'
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 15
periodSeconds: 5
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: model-server-hpa
namespace: ml-platform
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: model-server
minReplicas: 2
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
8. Kubeflow
8.1 Introduction to Kubeflow
Kubeflow is an open-source platform for managing ML workflows on Kubernetes. It consists of:
- Pipelines: ML workflow orchestration
- Notebooks: Managed Jupyter notebooks
- Katib: Hyperparameter tuning
- KServe: Model serving
- Training Operator: Distributed training
8.2 Kubeflow Pipelines
# kubeflow_pipeline.py
import kfp
from kfp import dsl
@dsl.component(base_image="python:3.11", packages_to_install=["pandas", "scikit-learn"])
def prepare_data(data_path: str, output_path: kfp.dsl.OutputPath(str)):
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_parquet(data_path)
train, test = train_test_split(df, test_size=0.2)
train.to_parquet(f"{output_path}/train.parquet")
test.to_parquet(f"{output_path}/test.parquet")
@dsl.component(
base_image="python:3.11",
packages_to_install=["scikit-learn", "mlflow", "pandas"]
)
def train_model(
data_path: str,
n_estimators: int,
max_depth: int,
model_output: kfp.dsl.OutputPath(str),
mlflow_uri: str
):
import mlflow
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
mlflow.set_tracking_uri(mlflow_uri)
df = pd.read_parquet(f"{data_path}/train.parquet")
X = df.drop('target', axis=1)
y = df['target']
with mlflow.start_run():
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth
)
model.fit(X, y)
run_id = mlflow.active_run().info.run_id
mlflow.sklearn.log_model(model, "model")
with open(model_output, 'w') as f:
f.write(run_id)
@dsl.pipeline(
name="ML Training Pipeline",
description="End-to-end ML training pipeline"
)
def ml_pipeline(
data_path: str = "gs://my-bucket/data/train.parquet",
n_estimators: int = 100,
max_depth: int = 5,
mlflow_uri: str = "http://mlflow:5000"
):
prepare_task = prepare_data(data_path=data_path)
train_task = train_model(
data_path=prepare_task.output,
n_estimators=n_estimators,
max_depth=max_depth,
mlflow_uri=mlflow_uri
)
if __name__ == "__main__":
kfp.compiler.Compiler().compile(
pipeline_func=ml_pipeline,
package_path="ml_pipeline.yaml"
)
client = kfp.Client(host="http://kubeflow-host/pipeline")
run = client.create_run_from_pipeline_package(
pipeline_file="ml_pipeline.yaml",
arguments={"n_estimators": 200, "max_depth": 7},
run_name="training-run-001"
)
9. CI/CD for ML
9.1 GitHub Actions for ML
# .github/workflows/ml-pipeline.yml
name: ML CI/CD Pipeline
on:
push:
branches: [main, develop]
paths:
- 'src/**'
- 'params.yaml'
- 'requirements.txt'
pull_request:
branches: [main]
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
jobs:
test:
name: Unit Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run unit tests
run: pytest tests/unit/ -v --cov=src --cov-report=xml
train-and-evaluate:
name: Train and Evaluate Model
runs-on: ubuntu-latest
needs: test
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run DVC pipeline
run: dvc repro
- name: Check model performance
run: |
python scripts/check_metrics.py \
--min-accuracy 0.90 \
--metrics-file metrics/scores.json
- name: Register model
run: python scripts/register_model.py
build-and-push:
name: Build Docker Image
runs-on: ubuntu-latest
needs: train-and-evaluate
steps:
- uses: actions/checkout@v4
- name: Login to Container Registry
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v5
with:
context: .
file: Dockerfile.serve
push: true
tags: |
ghcr.io/${{ github.repository }}/model-server:latest
ghcr.io/${{ github.repository }}/model-server:${{ github.sha }}
deploy:
name: Deploy to Kubernetes
runs-on: ubuntu-latest
needs: build-and-push
environment: production
steps:
- uses: actions/checkout@v4
- name: Configure kubectl
uses: azure/k8s-set-context@v3
with:
kubeconfig: ${{ secrets.KUBECONFIG }}
- name: Deploy
run: |
kubectl set image deployment/model-server \
model-server=ghcr.io/${{ github.repository }}/model-server:${{ github.sha }} \
-n ml-platform
kubectl rollout status deployment/model-server -n ml-platform
10. Model Monitoring
10.1 Data Drift Detection with Evidently AI
pip install evidently
import pandas as pd
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.metrics import ColumnDriftMetric
# Reference data (training-time data)
reference_data = pd.read_parquet("data/reference.parquet")
# Current data (production prediction data)
current_data = pd.read_parquet("data/current.parquet")
# Data drift report
drift_report = Report(metrics=[
DataDriftPreset(),
DataQualityPreset(),
ColumnDriftMetric(column_name="age"),
ColumnDriftMetric(column_name="income"),
])
drift_report.run(
reference_data=reference_data,
current_data=current_data
)
drift_report.save_html("drift_report.html")
result = drift_report.as_dict()
drift_detected = result['metrics'][0]['result']['dataset_drift']
if drift_detected:
print("Data drift detected! Consider retraining.")
send_alert("Data drift detected in production model")
10.2 Model Performance Monitoring
from evidently.report import Report
from evidently.metric_preset import ClassificationPreset
prediction_data = pd.DataFrame({
'target': y_true,
'prediction': y_pred,
'prediction_proba': y_proba,
'feature1': X_test['feature1'],
'feature2': X_test['feature2'],
})
performance_report = Report(metrics=[ClassificationPreset()])
performance_report.run(
reference_data=reference_predictions,
current_data=prediction_data
)
metrics = performance_report.as_dict()
current_accuracy = metrics['metrics'][0]['result']['current']['accuracy']
reference_accuracy = metrics['metrics'][0]['result']['reference']['accuracy']
degradation = reference_accuracy - current_accuracy
if degradation > 0.05:
trigger_retraining(f"Model degraded by {degradation:.2%}")
10.3 Real-Time Monitoring with Prometheus
from prometheus_client import Counter, Histogram, Gauge, start_http_server
prediction_counter = Counter('model_predictions_total', 'Total predictions')
prediction_latency = Histogram('model_prediction_duration_seconds', 'Prediction latency')
model_accuracy_gauge = Gauge('model_accuracy', 'Current model accuracy')
drift_score_gauge = Gauge('data_drift_score', 'Data drift score')
class MonitoredModelServer:
def __init__(self, model, reference_data):
self.model = model
self.reference_data = reference_data
self.predictions_buffer = []
start_http_server(8001)
def predict(self, features):
with prediction_latency.time():
prediction = self.model.predict(features)
prediction_counter.inc()
self.predictions_buffer.append(prediction)
if len(self.predictions_buffer) >= 1000:
self._run_monitoring()
return prediction
def _run_monitoring(self):
current_df = pd.DataFrame(self.predictions_buffer)
drift_report = Report(metrics=[DataDriftPreset()])
drift_report.run(
reference_data=self.reference_data,
current_data=current_df
)
result = drift_report.as_dict()
drift_share = result['metrics'][0]['result']['share_of_drifted_columns']
drift_score_gauge.set(drift_share)
self.predictions_buffer = []
11. Feature Store
11.1 Feature Store Concepts
A Feature Store is infrastructure that enables centralized management and reuse of ML features.
Core Concepts:
- Online Store: Low-latency feature retrieval for real-time predictions (Redis, DynamoDB)
- Offline Store: Historical features for batch training (S3, BigQuery)
- Feature View: Mapping of feature definitions to data sources
- Entity: Subject of features (User ID, Product ID, etc.)
11.2 Feast Setup and Usage
pip install feast
feast init my-feature-store
cd my-feature-store
# features.py
from feast import Entity, FeatureView, FileSource, ValueType, Field
from feast.types import Float64, Int64
from datetime import timedelta
user = Entity(
name="user_id",
value_type=ValueType.INT64,
description="User ID"
)
user_stats_source = FileSource(
path="data/user_stats.parquet",
timestamp_field="event_timestamp",
)
user_stats_fv = FeatureView(
name="user_statistics",
entities=[user],
ttl=timedelta(days=30),
schema=[
Field(name="purchase_count_7d", dtype=Int64),
Field(name="purchase_amount_7d", dtype=Float64),
Field(name="avg_session_duration", dtype=Float64),
Field(name="last_purchase_days_ago", dtype=Int64),
],
online=True,
source=user_stats_source,
)
# feast_usage.py
from feast import FeatureStore
import pandas as pd
from datetime import datetime
store = FeatureStore(repo_path=".")
# Offline feature retrieval (for training)
entity_df = pd.DataFrame({
"user_id": [1001, 1002, 1003],
"event_timestamp": pd.to_datetime(["2026-01-01", "2026-01-02", "2026-01-03"])
})
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_statistics:purchase_count_7d",
"user_statistics:purchase_amount_7d",
"user_statistics:avg_session_duration",
]
).to_df()
# Materialize to online store
store.materialize_incremental(end_date=datetime.now())
# Online feature retrieval (for real-time inference)
online_features = store.get_online_features(
features=[
"user_statistics:purchase_count_7d",
"user_statistics:purchase_amount_7d",
],
entity_rows=[
{"user_id": 1001},
{"user_id": 1002},
]
).to_dict()
12. Real-World MLOps Project
12.1 Full Pipeline Architecture
Data Sources (DB, S3, API)
↓
Data Collection (Airflow DAG)
↓
Data Validation (Great Expectations)
↓
Feature Engineering (Feast)
↓
Model Training (MLflow tracking)
↓
Model Evaluation (Automated validation)
↓
Model Registry (MLflow Registry)
↓
CI/CD (GitHub Actions)
↓
Container Build (Docker)
↓
K8s Deployment (Kubernetes)
↓
Serving (FastAPI + Triton)
↓
Monitoring (Evidently + Prometheus + Grafana)
↓
Alerts (Trigger retraining on drift detection)
12.2 Real-World Project: Customer Churn Prediction System
# scripts/full_pipeline.py
import mlflow
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, roc_auc_score
from sklearn.preprocessing import StandardScaler
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ChurnPredictionPipeline:
"""Customer churn prediction pipeline"""
def __init__(self, mlflow_uri: str = "http://localhost:5000"):
mlflow.set_tracking_uri(mlflow_uri)
mlflow.set_experiment("churn-prediction")
self.scaler = StandardScaler()
def load_data(self, data_path: str) -> pd.DataFrame:
logger.info(f"Loading data from {data_path}")
df = pd.read_parquet(data_path)
assert df.shape[0] > 1000, "Too few samples"
assert 'churn' in df.columns, "Target column 'churn' missing"
missing_rate = df.isnull().mean()
high_missing = missing_rate[missing_rate > 0.5].index.tolist()
if high_missing:
logger.warning(f"Columns with >50% missing: {high_missing}")
df = df.drop(columns=high_missing)
return df
def engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
logger.info("Engineering features...")
numeric_cols = df.select_dtypes(include=[np.number]).columns
df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].median())
if 'tenure_months' in df.columns and 'monthly_charges' in df.columns:
df['total_value'] = df['tenure_months'] * df['monthly_charges']
if 'num_support_tickets' in df.columns and 'tenure_months' in df.columns:
df['tickets_per_month'] = (
df['num_support_tickets'] / (df['tenure_months'] + 1)
)
return df
def train(self, df: pd.DataFrame, params: dict = None) -> str:
if params is None:
params = {
"n_estimators": 200,
"max_depth": 5,
"learning_rate": 0.05,
"subsample": 0.8,
"random_state": 42,
}
feature_cols = [c for c in df.columns if c != 'churn']
X = df[feature_cols]
y = df['churn']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
with mlflow.start_run(run_name="churn-gbt"):
mlflow.log_params(params)
mlflow.log_param("train_size", len(X_train))
mlflow.log_param("n_features", len(feature_cols))
model = GradientBoostingClassifier(**params)
model.fit(X_train_scaled, y_train)
y_pred = model.predict(X_test_scaled)
y_proba = model.predict_proba(X_test_scaled)[:, 1]
accuracy = accuracy_score(y_test, y_pred)
auc_roc = roc_auc_score(y_test, y_proba)
mlflow.log_metrics({
"accuracy": accuracy,
"auc_roc": auc_roc,
})
cv_scores = cross_val_score(
model, X_train_scaled, y_train, cv=5, scoring='roc_auc'
)
mlflow.log_metric("cv_auc_mean", cv_scores.mean())
mlflow.log_metric("cv_auc_std", cv_scores.std())
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="ChurnPredictor"
)
run_id = mlflow.active_run().info.run_id
logger.info(f"Training complete. AUC-ROC: {auc_roc:.4f}")
return run_id
def promote_best_model(self, min_auc: float = 0.85):
client = mlflow.tracking.MlflowClient()
experiment = client.get_experiment_by_name("churn-prediction")
runs = client.search_runs(
experiment_ids=[experiment.experiment_id],
filter_string="metrics.auc_roc > 0.80",
order_by=["metrics.auc_roc DESC"],
max_results=1
)
if not runs:
raise ValueError("No runs found meeting criteria")
best_run = runs[0]
auc = best_run.data.metrics['auc_roc']
if auc < min_auc:
raise ValueError(f"Best AUC {auc:.4f} below minimum {min_auc}")
model_uri = f"runs:/{best_run.info.run_id}/model"
mv = mlflow.register_model(model_uri, "ChurnPredictor")
client.transition_model_version_stage(
name="ChurnPredictor",
version=mv.version,
stage="Production",
archive_existing_versions=True
)
logger.info(f"Model v{mv.version} promoted to Production! AUC: {auc:.4f}")
return mv.version
if __name__ == "__main__":
pipeline = ChurnPredictionPipeline()
df = pipeline.load_data("data/customers.parquet")
df = pipeline.engineer_features(df)
run_id = pipeline.train(df)
pipeline.promote_best_model(min_auc=0.85)
Conclusion
MLOps is not merely about learning tools — it is a culture and process for operating ML systems reliably, reproducibly, and at scale.
Keep these core principles in mind:
- Version everything: Code, data, models, and environments
- Automate first: Manual steps are sources of error
- Measure and monitor: You cannot improve what you cannot measure
- Detect failures fast: Immediately catch drift and performance degradation
- Guarantee reproducibility: Anyone, anywhere, anytime should be able to reproduce the same results
The MLOps journey is incremental. Start at Level 0 and mature at a pace that fits your organization's needs.