- Published on
The Complete MLOps & AI Model Deployment Guide — From Training to Serving and Monitoring
- Authors

- Name
- Youngju Kim
- @fjvbn20031
Table of Contents
- What Is MLOps
- Experiment Management
- Data Pipelines
- Model Training Infrastructure
- Model Serving
- LLM Serving Special
- CI/CD for ML
- Monitoring
- A/B Testing and Deployment Strategies
- Cost Optimization
- Practical: Building an LLM RAG Pipeline
1. What Is MLOps
When DevOps Meets ML
MLOps is a portmanteau of Machine Learning and Operations. Think of it as applying DevOps principles from software engineering to the machine learning lifecycle.
In traditional software development, you only had to manage code. In ML systems, however, you must manage code, data, and models simultaneously. When data changes, the model changes. When the model changes, the serving approach may need to change too.
The ML Lifecycle
The overall flow of an ML project looks like this:
- Problem Definition - Translate business requirements into an ML problem
- Data Collection and Preprocessing - Build data pipelines
- Feature Engineering - Transform raw data into model-ready features
- Model Training - Experiment with algorithms and hyperparameters
- Model Evaluation - Validate performance with offline metrics
- Model Deployment - Serve in production
- Monitoring - Track performance and detect drift
- Retraining - Update the model when performance degrades
This is not a one-time process but a continuous cycle.
Google's MLOps Maturity Model
Google defines three levels of MLOps maturity.
Level 0 - Manual Process
- Data scientists train models manually in notebooks
- Model deployment is manual and irregular
- No monitoring
- Most teams are at this level
Level 1 - ML Pipeline Automation
- Training pipeline is automated
- Continuous Training implemented
- Experiment tracking system in place
- Model registry in use
Level 2 - CI/CD Pipeline Automation
- CI/CD for the pipeline itself
- Automated testing and validation
- Feature store utilization
- Complete monitoring and alerting
Most organizations can gain tremendous value just by moving from Level 0 to Level 1.
2. Experiment Management
MLflow - The Open-Source ML Platform
MLflow is the de facto standard for ML experiment management. It consists of four core components.
MLflow Tracking - Experiment Logging
import mlflow
mlflow.set_experiment("recommendation-model-v2")
with mlflow.start_run():
# Log hyperparameters
mlflow.log_param("learning_rate", 0.001)
mlflow.log_param("batch_size", 64)
mlflow.log_param("epochs", 50)
# Run training
model = train_model(lr=0.001, batch_size=64, epochs=50)
# Log metrics
mlflow.log_metric("accuracy", 0.923)
mlflow.log_metric("f1_score", 0.891)
mlflow.log_metric("auc_roc", 0.956)
# Save model
mlflow.pytorch.log_model(model, "model")
MLflow Models - Model Packaging
# Register in model registry
mlflow.register_model(
"runs:/abc123/model",
"recommendation-model"
)
# Transition stage
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name="recommendation-model",
version=3,
stage="Production"
)
Weights and Biases
W&B is a commercial experiment management platform with excellent visualization capabilities.
import wandb
wandb.init(project="image-classification", config={
"learning_rate": 0.001,
"architecture": "ResNet50",
"dataset": "imagenet-subset",
"epochs": 100,
})
for epoch in range(100):
train_loss, val_loss = train_one_epoch(model)
wandb.log({
"train_loss": train_loss,
"val_loss": val_loss,
"epoch": epoch
})
Optuna - Hyperparameter Optimization
Manually tuning hyperparameters is inefficient. Optuna performs automated search based on Bayesian optimization.
import optuna
def objective(trial):
lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128])
n_layers = trial.suggest_int("n_layers", 1, 5)
dropout = trial.suggest_float("dropout", 0.1, 0.5)
model = create_model(n_layers=n_layers, dropout=dropout)
accuracy = train_and_evaluate(model, lr=lr, batch_size=batch_size)
return accuracy
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100)
print(f"Best accuracy: {study.best_trial.value}")
print(f"Best params: {study.best_trial.params}")
3. Data Pipelines
Feature Store
A feature store is a centralized repository for ML features. It ensures the same features are used for both training and serving.
Feast Example
from feast import FeatureStore
store = FeatureStore(repo_path="feature_repo/")
# Feature definition
from feast import Entity, FeatureView, Field
from feast.types import Float32, Int64
user = Entity(name="user_id", join_keys=["user_id"])
user_features = FeatureView(
name="user_features",
entities=[user],
schema=[
Field(name="total_purchases", dtype=Int64),
Field(name="avg_order_value", dtype=Float32),
Field(name="days_since_last_order", dtype=Int64),
],
source=user_source,
)
# Retrieve training data
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_features:total_purchases",
"user_features:avg_order_value",
"user_features:days_since_last_order",
],
).to_df()
# Real-time serving query
online_features = store.get_online_features(
features=[
"user_features:total_purchases",
"user_features:avg_order_value",
],
entity_rows=[{"user_id": 12345}],
).to_dict()
DVC - Data Version Control
DVC enables Git-based version control for data.
# Initialize DVC
dvc init
# Track data file
dvc add data/training_data.csv
# Configure remote storage
dvc remote add -d myremote s3://my-bucket/dvc-storage
# Push data
dvc push
# Retrieve a specific version
git checkout v1.0
dvc pull
Data Validation - Great Expectations
Data quality validation is a cornerstone of the pipeline.
import great_expectations as gx
context = gx.get_context()
# Define expectations
validator = context.sources.pandas_default.read_csv(
"data/training_data.csv"
)
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=150)
validator.expect_column_values_to_be_in_set("country", ["KR", "US", "JP", "UK"])
# Run validation
results = validator.validate()
if not results.success:
raise ValueError("Data validation failed!")
4. Model Training Infrastructure
GPU Cluster Setup
GPU clusters are essential for training large-scale models.
Kubernetes + GPU Node Pool
apiVersion: v1
kind: Pod
metadata:
name: training-job
spec:
containers:
- name: trainer
image: training-image:latest
resources:
limits:
nvidia.com/gpu: 4
volumeMounts:
- name: dataset
mountPath: /data
nodeSelector:
gpu-type: a100
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
Distributed Training
Models that cannot fit on a single GPU require distributed training.
PyTorch DDP (Distributed Data Parallel)
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
def setup(rank, world_size):
dist.init_process_group("nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
def train(rank, world_size):
setup(rank, world_size)
model = MyModel().to(rank)
model = DDP(model, device_ids=[rank])
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(num_epochs):
sampler.set_epoch(epoch)
for batch in dataloader:
loss = model(batch)
loss.backward()
optimizer.step()
optimizer.zero_grad()
dist.destroy_process_group()
Kubeflow Training Operator
Kubeflow lets you declaratively manage distributed training on Kubernetes.
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: pytorch-training
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
template:
spec:
containers:
- name: pytorch
image: my-training:latest
resources:
limits:
nvidia.com/gpu: 1
Worker:
replicas: 3
template:
spec:
containers:
- name: pytorch
image: my-training:latest
resources:
limits:
nvidia.com/gpu: 1
5. Model Serving
Serving Framework Comparison
| Framework | Strengths | Weaknesses | Best For |
|---|---|---|---|
| TorchServe | Native PyTorch, easy setup | Limited performance | Small PyTorch models |
| Triton | Multi-framework, high perf | Complex config | Large-scale production |
| vLLM | LLM-optimized, PagedAttention | LLM only | LLM serving |
| FastAPI | Maximum flexibility | Manual implementation | Custom logic needed |
TorchServe
# Create model archive
torch-model-archiver \
--model-name resnet50 \
--version 1.0 \
--model-file model.py \
--serialized-file model.pth \
--handler image_classifier
# Start server
torchserve --start \
--model-store model_store \
--models resnet50=resnet50.mar
NVIDIA Triton Inference Server
Triton can serve models from multiple frameworks simultaneously.
# Model repository structure
model_repository/
resnet50/
config.pbtxt
1/
model.onnx
bert_base/
config.pbtxt
1/
model.plan
config.pbtxt Configuration
name: "resnet50"
platform: "onnxruntime_onnx"
max_batch_size: 32
input [
{
name: "input"
data_type: TYPE_FP32
dims: [3, 224, 224]
}
]
output [
{
name: "output"
data_type: TYPE_FP32
dims: [1000]
}
]
dynamic_batching {
preferred_batch_size: [8, 16, 32]
max_queue_delay_microseconds: 100
}
instance_group [
{
count: 2
kind: KIND_GPU
}
]
FastAPI Wrapping
FastAPI is well-suited for simple model serving scenarios.
from fastapi import FastAPI
import torch
from pydantic import BaseModel
app = FastAPI()
# Load model once at startup
model = torch.jit.load("model.pt")
model.eval()
class PredictionRequest(BaseModel):
features: list[float]
class PredictionResponse(BaseModel):
prediction: float
confidence: float
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
tensor = torch.tensor([request.features])
with torch.no_grad():
output = model(tensor)
prob = torch.softmax(output, dim=1)
prediction = torch.argmax(prob, dim=1).item()
confidence = prob[0][prediction].item()
return PredictionResponse(
prediction=prediction,
confidence=confidence
)
@app.get("/health")
async def health():
return {"status": "healthy"}
6. LLM Serving Special
vLLM - High-Performance LLM Serving
vLLM uses PagedAttention technology to maximize LLM serving performance.
from vllm import LLM, SamplingParams
# Load model
llm = LLM(
model="meta-llama/Llama-3.1-8B-Instruct",
tensor_parallel_size=2,
gpu_memory_utilization=0.9,
max_model_len=8192,
)
# Inference
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.9,
max_tokens=512,
)
outputs = llm.generate(
["Explain MLOps in simple terms."],
sampling_params,
)
print(outputs[0].outputs[0].text)
vLLM OpenAI-Compatible Server
python -m vllm.entrypoints.openai.api_server \
--model meta-llama/Llama-3.1-8B-Instruct \
--tensor-parallel-size 2 \
--port 8000
# Can call using the OpenAI SDK
from openai import OpenAI
client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy")
response = client.chat.completions.create(
model="meta-llama/Llama-3.1-8B-Instruct",
messages=[
{"role": "user", "content": "What is MLOps?"}
],
max_tokens=256,
)
Text Generation Inference (TGI)
HuggingFace's TGI is Rust-based and reliable.
docker run --gpus all \
-p 8080:80 \
-v /data:/data \
ghcr.io/huggingface/text-generation-inference:latest \
--model-id meta-llama/Llama-3.1-8B-Instruct \
--quantize gptq \
--max-input-length 4096 \
--max-total-tokens 8192
Quantization Techniques Compared
Quantization is essential for reducing model size when deploying LLMs.
| Technique | Size Reduction | Quality Loss | Speed Gain | Notes |
|---|---|---|---|---|
| GPTQ | 4x | Low | 2-3x | GPU-optimized, good accuracy |
| AWQ | 4x | Very low | 2-3x | Activation-aware, better than GPTQ |
| GGUF | 2-8x | Variable | CPU possible | For llama.cpp, CPU/GPU hybrid |
| BitsAndBytes | 2-4x | Low | 1.5x | Easy to apply, QLoRA training |
# BitsAndBytes 4-bit quantization
from transformers import AutoModelForCausalLM, BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True,
)
model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-3.1-70B-Instruct",
quantization_config=quantization_config,
device_map="auto",
)
Ollama - Local LLM
Ollama is convenient for development environments or small-scale deployments.
# Download and run a model
ollama pull llama3.1
ollama run llama3.1
# API server mode
ollama serve
import requests
response = requests.post("http://localhost:11434/api/generate", json={
"model": "llama3.1",
"prompt": "Explain MLOps briefly.",
"stream": False,
})
print(response.json()["response"])
7. CI/CD for ML
Model Testing Strategy
ML models require different testing approaches compared to traditional software.
import pytest
class TestModelQuality:
"""Model quality tests"""
def test_accuracy_threshold(self, model, test_data):
"""Verify accuracy meets threshold"""
accuracy = evaluate(model, test_data)
assert accuracy >= 0.90, f"Accuracy {accuracy} below threshold 0.90"
def test_latency(self, model, sample_input):
"""Check inference latency"""
import time
start = time.time()
model.predict(sample_input)
latency = time.time() - start
assert latency < 0.1, f"Latency {latency}s exceeds 100ms"
def test_no_bias(self, model, fairness_data):
"""Fairness verification"""
results_group_a = model.predict(fairness_data["group_a"])
results_group_b = model.predict(fairness_data["group_b"])
disparity = abs(results_group_a.mean() - results_group_b.mean())
assert disparity < 0.05, f"Bias detected: disparity={disparity}"
def test_model_size(self, model_path):
"""Model size limit"""
import os
size_mb = os.path.getsize(model_path) / (1024 * 1024)
assert size_mb < 500, f"Model size {size_mb}MB exceeds 500MB limit"
ML Pipeline with GitHub Actions
name: ML Pipeline
on:
push:
paths:
- "src/**"
- "data/**"
- "configs/**"
jobs:
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate data
run: python scripts/validate_data.py
train:
needs: data-validation
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v4
- name: Train model
run: python scripts/train.py --config configs/production.yaml
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: model
path: outputs/model/
evaluate:
needs: train
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Download model
uses: actions/download-artifact@v4
with:
name: model
- name: Run evaluation
run: python scripts/evaluate.py
- name: Check quality gates
run: python scripts/quality_gate.py
deploy:
needs: evaluate
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Deploy to staging
run: ./scripts/deploy.sh staging
- name: Smoke test
run: python scripts/smoke_test.py
- name: Deploy to production
run: ./scripts/deploy.sh production
Data Validation Pipeline
def validate_training_data(data_path: str) -> bool:
"""Pipeline to validate training data quality"""
df = pd.read_parquet(data_path)
checks = [
# Check data size
len(df) >= 10000,
# Required columns exist
all(col in df.columns for col in REQUIRED_COLUMNS),
# Null ratio check
df.isnull().mean().max() < 0.05,
# Label distribution check
df["label"].value_counts(normalize=True).min() > 0.1,
# Duplicate check
df.duplicated().mean() < 0.01,
]
if not all(checks):
failed = [i for i, c in enumerate(checks) if not c]
raise ValueError(f"Data validation failed at checks: {failed}")
return True
8. Monitoring
Model Performance Monitoring
Model performance can degrade over time after deployment.
from prometheus_client import Gauge, Histogram, Counter
# Define metrics
prediction_latency = Histogram(
"model_prediction_latency_seconds",
"Prediction latency in seconds",
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
prediction_count = Counter(
"model_prediction_total",
"Total number of predictions",
["model_version", "status"]
)
model_accuracy = Gauge(
"model_accuracy",
"Current model accuracy",
["model_name"]
)
# Usage
import time
def predict_with_monitoring(model, input_data):
start = time.time()
try:
result = model.predict(input_data)
prediction_count.labels(
model_version="v2.1",
status="success"
).inc()
return result
except Exception as e:
prediction_count.labels(
model_version="v2.1",
status="error"
).inc()
raise
finally:
latency = time.time() - start
prediction_latency.observe(latency)
Data Drift Detection
When the distribution of input data changes, model performance degrades.
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
# Generate drift report
report = Report(metrics=[
DataDriftPreset(),
TargetDriftPreset(),
])
column_mapping = ColumnMapping(
target="label",
prediction="prediction",
numerical_features=["feature_1", "feature_2", "feature_3"],
categorical_features=["category", "region"],
)
report.run(
reference_data=training_data,
current_data=production_data,
column_mapping=column_mapping,
)
# Check drift detection results
drift_results = report.as_dict()
if drift_results["metrics"][0]["result"]["dataset_drift"]:
trigger_retraining_pipeline()
Concept Drift
Not only can data distributions change, but the relationship between inputs and outputs itself can shift. For example, consumer behavior patterns changed dramatically before and after COVID.
class ConceptDriftDetector:
"""PSI (Population Stability Index) based drift detection"""
def __init__(self, threshold=0.2):
self.threshold = threshold
def calculate_psi(self, expected, actual, bins=10):
"""Calculate PSI"""
breakpoints = np.percentile(expected, np.linspace(0, 100, bins + 1))
expected_counts = np.histogram(expected, breakpoints)[0] / len(expected)
actual_counts = np.histogram(actual, breakpoints)[0] / len(actual)
# Prevent division by zero
expected_counts = np.clip(expected_counts, 1e-6, None)
actual_counts = np.clip(actual_counts, 1e-6, None)
psi = np.sum(
(actual_counts - expected_counts) *
np.log(actual_counts / expected_counts)
)
return psi
def check_drift(self, reference_predictions, current_predictions):
psi = self.calculate_psi(reference_predictions, current_predictions)
if psi > self.threshold:
return {"drift_detected": True, "psi": psi, "action": "retrain"}
elif psi > self.threshold / 2:
return {"drift_detected": False, "psi": psi, "action": "monitor"}
else:
return {"drift_detected": False, "psi": psi, "action": "none"}
Grafana Dashboard Setup
Grafana is ideal for visualizing monitoring metrics. You can create dashboards from metrics collected by Prometheus.
Key monitoring panels:
- Request Throughput: Prediction requests per second
- Latency Distribution: p50, p95, p99 latency
- Error Rate: Prediction failure ratio
- Model Accuracy: Real-time performance metrics
- Data Drift Score: PSI, KL Divergence
- Resource Usage: GPU memory, CPU, network
9. A/B Testing and Deployment Strategies
Canary Deployment
A strategy that first applies a new model to only a small portion of traffic.
# Traffic splitting with Istio VirtualService
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: model-serving
spec:
hosts:
- model-service
http:
- match:
- headers:
canary:
exact: "true"
route:
- destination:
host: model-service
subset: v2
- route:
- destination:
host: model-service
subset: v1
weight: 90
- destination:
host: model-service
subset: v2
weight: 10
Shadow Mode
The new model runs inference on real traffic, but results are only stored and not returned to users. This allows safe validation by comparing against the existing model's results.
class ShadowModelRouter:
def __init__(self, primary_model, shadow_model):
self.primary = primary_model
self.shadow = shadow_model
async def predict(self, input_data):
# Primary model result (returned to user)
primary_result = await self.primary.predict(input_data)
# Shadow model result (stored for comparison)
try:
shadow_result = await self.shadow.predict(input_data)
await self.log_comparison(
input_data, primary_result, shadow_result
)
except Exception:
pass # Shadow model errors are ignored
return primary_result
async def log_comparison(self, input_data, primary, shadow):
comparison = {
"timestamp": datetime.utcnow().isoformat(),
"input_hash": hash(str(input_data)),
"primary_prediction": primary,
"shadow_prediction": shadow,
"agreement": primary == shadow,
}
await self.metrics_store.insert(comparison)
Multi-Armed Bandit
A more efficient dynamic traffic allocation method than A/B testing. It automatically routes more traffic to better-performing models.
import numpy as np
class ThompsonSamplingRouter:
"""Thompson Sampling based model router"""
def __init__(self, model_names):
self.models = model_names
# Beta distribution parameters (alpha, beta)
self.successes = {name: 1 for name in model_names}
self.failures = {name: 1 for name in model_names}
def select_model(self):
"""Select a model via Thompson Sampling"""
samples = {}
for name in self.models:
samples[name] = np.random.beta(
self.successes[name],
self.failures[name]
)
return max(samples, key=samples.get)
def update(self, model_name, success: bool):
"""Update the distribution based on results"""
if success:
self.successes[model_name] += 1
else:
self.failures[model_name] += 1
def get_allocation(self):
"""Check current traffic ratios"""
total = sum(self.successes[m] + self.failures[m] for m in self.models)
return {
m: (self.successes[m] + self.failures[m]) / total
for m in self.models
}
10. Cost Optimization
Leveraging Spot Instances
Training jobs are interruptible, so spot instances can save over 70% on costs.
# AWS SageMaker spot training
import sagemaker
estimator = sagemaker.estimator.Estimator(
image_uri="my-training-image:latest",
role="arn:aws:iam::ACCOUNT:role/SageMakerRole",
instance_count=4,
instance_type="ml.p4d.24xlarge",
use_spot_instances=True,
max_wait=7200, # Maximum wait time
max_run=3600, # Maximum run time
checkpoint_s3_uri="s3://bucket/checkpoints/",
)
estimator.fit({"training": "s3://bucket/data/"})
The key is checkpoint saving. You must be able to resume from a checkpoint when a spot instance is interrupted.
Model Compression
Methods to reduce production serving costs:
Knowledge Distillation
class DistillationTrainer:
def __init__(self, teacher, student, temperature=3.0, alpha=0.5):
self.teacher = teacher
self.student = student
self.temperature = temperature
self.alpha = alpha
def distillation_loss(self, student_logits, teacher_logits, labels):
# Soft target loss (KD loss)
soft_targets = F.softmax(teacher_logits / self.temperature, dim=1)
soft_loss = F.kl_div(
F.log_softmax(student_logits / self.temperature, dim=1),
soft_targets,
reduction="batchmean"
) * (self.temperature ** 2)
# Hard target loss
hard_loss = F.cross_entropy(student_logits, labels)
return self.alpha * soft_loss + (1 - self.alpha) * hard_loss
ONNX Conversion for Inference Optimization
import torch
import onnx
# PyTorch -> ONNX conversion
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
model,
dummy_input,
"model.onnx",
input_names=["input"],
output_names=["output"],
dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
opset_version=17,
)
# Inference with ONNX Runtime
import onnxruntime as ort
session = ort.InferenceSession("model.onnx", providers=["CUDAExecutionProvider"])
result = session.run(None, {"input": input_array})
Batch Inference vs Real-Time Inference
| Aspect | Batch Inference | Real-Time Inference |
|---|---|---|
| Latency | Minutes to hours | Milliseconds to seconds |
| Throughput | Very high | Medium |
| Cost | Low (spot instances) | High (always on) |
| Best For | Recommendation systems, reports | Chatbots, search |
# Batch inference example (Spark)
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
spark = SparkSession.builder.getOrCreate()
# Register model inference as a UDF
@udf("float")
def predict_udf(features):
return float(model.predict([features])[0])
# Run batch inference on large dataset
df = spark.read.parquet("s3://bucket/daily-data/")
predictions = df.withColumn("prediction", predict_udf("features"))
predictions.write.parquet("s3://bucket/predictions/")
11. Practical: Building an LLM RAG Pipeline
RAG (Retrieval-Augmented Generation) is a key pattern for overcoming LLM limitations. It searches an external knowledge base and provides context to the LLM.
Overall Architecture
- Document Collection - Gather data from various sources
- Chunking - Split documents into appropriate sizes
- Embedding - Convert text to vectors
- Vector DB Storage - Store in a vector index
- Retrieval - Search for similar documents for a query
- Generation - Pass search results as context to the LLM
Document Collection and Chunking
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import (
PyPDFLoader,
WebBaseLoader,
NotionDirectoryLoader,
)
# Load documents from various sources
pdf_docs = PyPDFLoader("docs/manual.pdf").load()
web_docs = WebBaseLoader("https://docs.example.com").load()
notion_docs = NotionDirectoryLoader("notion_export/").load()
all_docs = pdf_docs + web_docs + notion_docs
# Chunking
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ".", " "],
)
chunks = text_splitter.split_documents(all_docs)
print(f"Total chunks: {len(chunks)}")
Embedding and Vector DB
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
# Embedding model
embedding_model = HuggingFaceEmbeddings(
model_name="BAAI/bge-large-en-v1.5",
model_kwargs={"device": "cuda"},
encode_kwargs={"normalize_embeddings": True},
)
# Store in Chroma vector DB
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embedding_model,
persist_directory="./chroma_db",
collection_name="knowledge_base",
)
# Similarity search
results = vectorstore.similarity_search(
"How to deploy a model to production?",
k=5,
)
Assembling the RAG Pipeline
from langchain.chains import RetrievalQA
from langchain.llms import VLLM
from langchain.prompts import PromptTemplate
# LLM setup
llm = VLLM(
model="meta-llama/Llama-3.1-8B-Instruct",
tensor_parallel_size=1,
max_new_tokens=512,
temperature=0.1,
)
# Prompt template
prompt_template = PromptTemplate(
template="""Answer the question using the following context.
If the information is not in the context, say "I don't know."
Context:
{context}
Question: {question}
Answer:""",
input_variables=["context", "question"],
)
# RAG chain
rag_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20},
),
chain_type_kwargs={"prompt": prompt_template},
return_source_documents=True,
)
# Query
result = rag_chain.invoke("What is the production deployment procedure?")
print(result["result"])
for doc in result["source_documents"]:
print(f" Source: {doc.metadata['source']}")
Production RAG Serving
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class QueryRequest(BaseModel):
question: str
top_k: int = 5
class QueryResponse(BaseModel):
answer: str
sources: list[str]
latency_ms: float
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
import time
start = time.time()
result = rag_chain.invoke(request.question)
sources = list(set(
doc.metadata.get("source", "unknown")
for doc in result["source_documents"]
))
latency = (time.time() - start) * 1000
return QueryResponse(
answer=result["result"],
sources=sources,
latency_ms=round(latency, 2),
)
Conclusion
MLOps is an essential discipline for reliably operating ML models in production.
Here are the key takeaways:
- Start with experiment management - Even adopting MLflow alone makes a huge difference.
- Invest in automation - Manual processes will inevitably fail.
- Monitoring is non-negotiable - The real work begins after deployment.
- Start small - Going from Google Level 0 to Level 1 is the most important step.
- Evolve for the LLM era - Actively adopt new tools like vLLM and RAG.
Training a model accounts for only 20% of the total effort. The remaining 80% is data pipelines, serving, monitoring, and operations. MLOps is the methodology for systematically managing that 80%.
MLOps Self-Check Checklist
Experiment Management
- Are you tracking all experiment hyperparameters and metrics?
- Can you reproduce experiment results?
Data Pipelines
- Are you version-controlling your data?
- Is data quality validation automated?
Training Infrastructure
- Is your training pipeline automated?
- Are you saving checkpoints?
Serving
- Are you monitoring model serving latency?
- Are you using a deployment strategy that supports rollback?
Monitoring
- Are you detecting data drift?
- Do you get alerts when model performance degrades?
Cost
- Are you leveraging spot instances?
- Have you considered model compression?