Skip to content
Published on

MLOps & Model Lifecycle Management: MLflow, DVC, and LLMOps Complete Guide

Authors

Table of Contents

  1. MLOps Overview and Maturity Model
  2. Experiment Tracking: MLflow & Weights and Biases
  3. Data Version Control: DVC
  4. Feature Store
  5. Model Registry
  6. CI/CD for ML
  7. Model Monitoring & Drift Detection
  8. LLMOps
  9. Quiz

MLOps Overview and Maturity Model

MLOps (Machine Learning Operations) is a set of practices, tools, and culture for reliably operating ML systems in production. It applies DevOps principles to ML workflows, automating the full lifecycle from model development through deployment, monitoring, and retraining.

Why MLOps Matters

Statistics show that over 95% of ML projects fail to reach production deployment. The root causes include:

  • Irreproducible experiments: Code, data, and environments are not version-controlled
  • Manual deployment processes: Slow and error-prone
  • Absent monitoring: Model performance degradation is detected too late
  • Team silos: Disconnect between data science and engineering teams

MLOps Maturity Levels

Google's MLOps maturity model defines three stages of automation.

Level 0: Manual Process

Everything is done manually. Data scientists experiment in Jupyter Notebooks and deploy results by hand.

CharacteristicDescription
Deployment frequencyEvery few months
Automation levelNone
ReproducibilityLow
MonitoringAbsent or manual

Limitations: No experiment tracking, code/data version mismatches, deployment errors, inability to detect model degradation.

Level 1: ML Pipeline Automation

CT (Continuous Training) is introduced. Data pipelines and model training are automated, but CI/CD remains manual.

Key components:

  • Automated data validation pipeline
  • Feature engineering pipeline
  • Model training pipeline (Kubeflow Pipelines, Apache Airflow, etc.)
  • Automated model performance evaluation
  • Introduction of feature stores
# Kubeflow Pipeline example - Level 1 CT pipeline
import kfp
from kfp import dsl

@dsl.component
def data_validation_op(data_path: str) -> bool:
    import great_expectations as ge
    ds = ge.read_csv(data_path)
    results = ds.expect_column_values_to_not_be_null("target")
    return results["success"]

@dsl.component
def train_model_op(data_path: str, model_output: str):
    import mlflow
    # Training logic here
    pass

@dsl.pipeline(name="CT Pipeline")
def ct_pipeline(data_path: str):
    validation = data_validation_op(data_path=data_path)
    with dsl.Condition(validation.output == True):
        train_model_op(data_path=data_path, model_output="/models/")

Level 2: CI/CD Pipeline Automation

Full MLOps automation. Code, data, and models are all version-controlled, and CI/CD/CT are completely automated.

Automated trigger conditions:

  • New training data arriving (schedule or data volume threshold)
  • Model performance metric degradation detected
  • Data drift detected
  • Code changes (new features, algorithm improvements)

Level 2 Architecture:

Source code change or data trigger
CI Pipeline (test, build)
CD Pipeline (deploy pipeline)
CT Pipeline (automated retraining)
Model evaluation → pass/fail gate
Model registry registration
StagingProduction promotion
Monitoring & alerting

Experiment Tracking: MLflow & Weights and Biases

MLflow Complete Guide

MLflow is an open-source platform for managing the ML lifecycle. It consists of four core components.

MLflow Tracking

Tracks experiment parameters, metrics, and artifacts.

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score

# Configure MLflow Tracking server
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("fraud-detection-v2")

with mlflow.start_run(run_name="rf-baseline") as run:
    # Log hyperparameters
    params = {
        "n_estimators": 100,
        "max_depth": 10,
        "min_samples_split": 5,
        "random_state": 42
    }
    mlflow.log_params(params)

    # Train model
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)

    # Log metrics
    y_pred = model.predict(X_test)
    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "f1_score": f1_score(y_test, y_pred, average="weighted"),
    }
    mlflow.log_metrics(metrics)

    # Save model with signature
    mlflow.sklearn.log_model(
        sk_model=model,
        artifact_path="model",
        registered_model_name="fraud-detection",
        input_example=X_test[:5],
        signature=mlflow.models.infer_signature(X_train, y_pred)
    )

    # Log custom artifacts
    import matplotlib.pyplot as plt
    from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
    cm = confusion_matrix(y_test, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot()
    plt.savefig("confusion_matrix.png")
    mlflow.log_artifact("confusion_matrix.png")

    print(f"Run ID: {run.info.run_id}")
    print(f"Accuracy: {metrics['accuracy']:.4f}")

MLflow Autolog

Framework-specific automatic logging to minimize boilerplate:

import mlflow

# Auto-detect framework and log
mlflow.autolog()

# PyTorch-specific autolog
mlflow.pytorch.autolog(
    log_every_n_epoch=1,
    log_models=True,
    disable=False,
    log_datasets=True
)

# XGBoost-specific autolog
mlflow.xgboost.autolog(
    log_input_examples=True,
    log_model_signatures=True,
    log_models=True,
    log_datasets=True
)

MLflow Projects

Packaging reproducible ML projects:

# MLproject file
name: fraud-detection

conda_env: conda.yaml

entry_points:
  main:
    parameters:
      n_estimators: { type: int, default: 100 }
      max_depth: { type: int, default: 10 }
      data_path: { type: str, default: 'data/train.csv' }
    command: 'python train.py --n_estimators {n_estimators} --max_depth {max_depth} --data_path {data_path}'
  evaluate:
    parameters:
      model_uri: { type: str }
      test_data: { type: str }
    command: 'python evaluate.py --model_uri {model_uri} --test_data {test_data}'

Weights & Biases (W&B)

W&B is an MLOps platform providing experiment tracking, visualization, and hyperparameter optimization.

import wandb

# Initialize W&B run
run = wandb.init(
    project="image-classification",
    config={
        "learning_rate": 0.001,
        "epochs": 50,
        "batch_size": 32,
        "architecture": "ResNet50"
    }
)

# W&B Sweep for hyperparameter optimization
sweep_config = {
    "method": "bayes",
    "metric": {"name": "val_accuracy", "goal": "maximize"},
    "parameters": {
        "learning_rate": {"min": 1e-5, "max": 1e-2},
        "batch_size": {"values": [16, 32, 64]},
        "dropout": {"min": 0.1, "max": 0.5}
    }
}
sweep_id = wandb.sweep(sweep_config, project="image-classification")
wandb.agent(sweep_id, function=train_fn, count=50)

Data Version Control: DVC

DVC (Data Version Control) works alongside Git to version-control large datasets and ML pipelines.

How DVC Works

Instead of storing large files directly in Git, DVC creates .dvc metadata files (pointers) that are committed to Git. The actual data is stored in remote storage such as S3, GCS, Azure Blob, or SSH servers.

# Initialize DVC
git init
dvc init

# Configure remote storage (S3)
dvc remote add -d myremote s3://my-bucket/dvc-store
dvc remote modify myremote region us-east-1

# Add data files
dvc add data/train.csv
git add data/train.csv.dvc .gitignore
git commit -m "Add training data v1"
dvc push

# Pull data in another environment
git pull
dvc pull

DVC Pipeline (dvc.yaml)

Declarative definition of reproducible ML pipelines:

# dvc.yaml
stages:
  prepare:
    cmd: python src/prepare.py --input data/raw.csv --output data/processed/
    deps:
      - src/prepare.py
      - data/raw.csv
    outs:
      - data/processed/train.csv
      - data/processed/test.csv
    params:
      - prepare:
          - split_ratio
          - random_seed

  featurize:
    cmd: python src/featurize.py
    deps:
      - src/featurize.py
      - data/processed/train.csv
    outs:
      - data/features/train_features.pkl
    params:
      - featurize:
          - max_features
          - ngrams

  train:
    cmd: python src/train.py
    deps:
      - src/train.py
      - data/features/train_features.pkl
    outs:
      - models/model.pkl
    metrics:
      - reports/metrics.json:
          cache: false
    params:
      - train:
          - n_estimators
          - max_depth
          - random_seed

  evaluate:
    cmd: python src/evaluate.py
    deps:
      - src/evaluate.py
      - models/model.pkl
      - data/processed/test.csv
    metrics:
      - reports/eval_metrics.json:
          cache: false
    plots:
      - reports/plots/confusion_matrix.csv:
          cache: false

DVC Experiment Management

# Run the pipeline
dvc repro

# Create an experiment branch
dvc exp run --set-param train.n_estimators=200 --name exp-200-trees

# Compare experiments
dvc exp show

# Show metrics table
dvc metrics show
dvc metrics diff

Feature Store

A feature store is a centralized data layer for storing, sharing, and serving ML features.

Why Feature Stores Are Necessary

  • Eliminate training/serving skew: Guarantee identical feature transformations at training and inference
  • Feature reuse: Share features across teams to eliminate redundant work
  • Low-latency serving: Real-time feature lookup for online predictions
  • Feature consistency: Maintain consistency between batch and streaming pipelines

Online vs Offline Store

AspectOnline StoreOffline Store
PurposeReal-time inference servingModel training
LatencyMillisecondsSeconds to minutes
StorageRedis, DynamoDB, CassandraS3, BigQuery, Hive
Data volumeLatest state (current values)Full history
Query patternSingle-key lookupBatch scan

Feast Feature Store

# feature_repo/feature_store.yaml
project: fraud_detection
registry: data/registry.db
provider: local
online_store:
  type: redis
  connection_string: "localhost:6379"
offline_store:
  type: bigquery
  dataset: feast_dev
# feature_repo/features.py
from datetime import timedelta
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from feast.types import Float32, Int64

# Define entity
user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="User identifier"
)

# Define data source
user_stats_source = FileSource(
    path="data/user_stats.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created"
)

# Define feature view
user_stats_fv = FeatureView(
    name="user_stats",
    entities=["user_id"],
    ttl=timedelta(days=7),
    features=[
        Feature(name="transaction_count_7d", dtype=Float32),
        Feature(name="avg_transaction_amount", dtype=Float32),
        Feature(name="days_since_last_login", dtype=Int64),
        Feature(name="account_age_days", dtype=Int64),
    ],
    online=True,
    source=user_stats_source,
    tags={"team": "fraud", "version": "v2"},
)
# Using the feature store
from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path="feature_repo/")

# Training data retrieval (offline)
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003],
    "event_timestamp": pd.to_datetime(["2026-03-01", "2026-03-01", "2026-03-01"])
})
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_stats:transaction_count_7d",
        "user_stats:avg_transaction_amount",
        "user_stats:days_since_last_login",
    ]
).to_df()

# Online serving - real-time feature retrieval
feature_vector = store.get_online_features(
    features=[
        "user_stats:transaction_count_7d",
        "user_stats:avg_transaction_amount",
    ],
    entity_rows=[{"user_id": 1001}]
).to_dict()

Feature Drift Detection

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

# Generate feature drift report
report = Report(metrics=[DataDriftPreset()])
report.run(
    reference_data=reference_features,
    current_data=current_features,
    column_mapping=ColumnMapping(target="label")
)
report.save_html("feature_drift_report.html")

# Check drift results
results = report.as_dict()
drifted_features = [
    col for col, info in results["metrics"][0]["result"]["drift_by_columns"].items()
    if info["drift_detected"]
]
print(f"Drifted features: {drifted_features}")

Model Registry

MLflow Model Registry

The MLflow Model Registry is a central repository for model version management, stage transitions, and team collaboration.

import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient()

# Register a new model
model_uri = f"runs:/{run_id}/model"
model_version = mlflow.register_model(
    model_uri=model_uri,
    name="fraud-detection"
)

# Add model description
client.update_registered_model(
    name="fraud-detection",
    description="Payment fraud detection model - RandomForest based"
)
client.update_model_version(
    name="fraud-detection",
    version=model_version.version,
    description=f"Accuracy: 0.956, F1: 0.943 on test set"
)

# Transition to Staging
client.transition_model_version_stage(
    name="fraud-detection",
    version=model_version.version,
    stage="Staging",
    archive_existing_versions=False
)

# Load and validate Staging model
staging_model = mlflow.pyfunc.load_model(
    model_uri="models:/fraud-detection/Staging"
)
staging_preds = staging_model.predict(X_val)
staging_accuracy = accuracy_score(y_val, staging_preds)

# Promote to Production if validation passes
if staging_accuracy > 0.95:
    client.transition_model_version_stage(
        name="fraud-detection",
        version=model_version.version,
        stage="Production",
        archive_existing_versions=True
    )
    print(f"Model v{model_version.version} promoted to Production")

Hugging Face Hub Model Registry

from huggingface_hub import HfApi

api = HfApi()

# Upload model
api.upload_folder(
    folder_path="./fine-tuned-model",
    repo_id="myorg/sentiment-classifier-v2",
    repo_type="model",
)

# Tag a specific version
api.create_tag(
    repo_id="myorg/sentiment-classifier-v2",
    tag="v2.1.0",
    tag_message="Improved accuracy on edge cases"
)

CI/CD for ML

GitHub Actions ML Pipeline

# .github/workflows/ml-cicd.yml
name: ML CI/CD Pipeline

on:
  push:
    branches: [main, develop]
    paths:
      - 'src/**'
      - 'params.yaml'
      - 'dvc.yaml'
  schedule:
    - cron: '0 2 * * 1' # Auto-retrain every Monday at 2 AM

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      - name: Install dependencies
        run: pip install -r requirements.txt
      - name: Run unit tests
        run: pytest tests/ -v --cov=src
      - name: Data validation
        run: python src/validate_data.py

  train-and-evaluate:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Configure DVC remote
        run: |
          dvc remote modify myremote access_key_id ${{ secrets.AWS_ACCESS_KEY_ID }}
          dvc remote modify myremote secret_access_key ${{ secrets.AWS_SECRET_ACCESS_KEY }}
      - name: Pull data
        run: dvc pull
      - name: Run DVC pipeline
        run: dvc repro
      - name: Log metrics to MLflow
        run: python src/log_results.py
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
      - name: Check model performance gate
        run: |
          python src/check_performance_gate.py \
            --min-accuracy 0.95 \
            --min-f1 0.93
      - name: Push results
        run: |
          dvc push
          git add reports/metrics.json dvc.lock
          git commit -m "chore: update metrics [skip ci]"
          git push

  deploy-staging:
    needs: train-and-evaluate
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - name: Promote model to Staging
        run: python src/promote_model.py --stage Staging
      - name: Run integration tests
        run: pytest tests/integration/ -v
      - name: Deploy to staging endpoint
        run: kubectl apply -f k8s/staging/

  deploy-production:
    needs: deploy-staging
    runs-on: ubuntu-latest
    environment: production
    steps:
      - name: Promote model to Production
        run: python src/promote_model.py --stage Production
      - name: Blue/Green deployment
        run: ./scripts/blue_green_deploy.sh
      - name: Smoke tests
        run: pytest tests/smoke/ -v

Automated Retraining Trigger

# src/check_retrain_trigger.py
import mlflow
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

def should_retrain(
    current_data,
    reference_data,
    performance_threshold=0.92,
    drift_threshold=0.3
) -> tuple[bool, str]:
    """Determine whether retraining is needed."""

    # 1. Performance-based trigger
    current_metrics = get_current_metrics()
    if current_metrics["accuracy"] < performance_threshold:
        return True, f"Performance degradation: accuracy={current_metrics['accuracy']:.3f}"

    # 2. Data drift trigger
    report = Report(metrics=[DataDriftPreset()])
    report.run(reference_data=reference_data, current_data=current_data)
    results = report.as_dict()
    drift_share = results["metrics"][0]["result"]["share_of_drifted_columns"]

    if drift_share > drift_threshold:
        return True, f"Data drift: {drift_share:.1%} of features drifted"

    return False, "Retraining not required"

Model Monitoring & Drift Detection

Data Drift vs Concept Drift

Data Drift: The statistical distribution of input features changes. P(X) changes but P(Y|X) remains stable. Examples: shift in user age distribution, changes in transaction amount distribution.

Concept Drift: The relationship between inputs and outputs changes. P(Y|X) changes. Examples: new fraud patterns emerge, user preferences shift.

Evidently Drift Monitoring

import pandas as pd
from evidently.report import Report
from evidently.test_suite import TestSuite
from evidently import ColumnMapping
from evidently.metric_preset import (
    DataDriftPreset,
    DataQualityPreset,
    TargetDriftPreset,
    ClassificationPreset
)
from evidently.tests import (
    TestNumberOfDriftedColumns,
    TestShareOfDriftedColumns,
    TestColumnDrift
)

# Column mapping configuration
column_mapping = ColumnMapping(
    target="fraud_label",
    prediction="fraud_score",
    numerical_features=["amount", "transaction_count_7d", "avg_amount"],
    categorical_features=["merchant_category", "payment_method"]
)

# Comprehensive drift report
report = Report(metrics=[
    DataDriftPreset(),
    DataQualityPreset(),
    TargetDriftPreset(),
    ClassificationPreset()
])
report.run(
    reference_data=reference_df,
    current_data=production_df,
    column_mapping=column_mapping
)
report.save_html("monitoring/report.html")

# Alerting test suite
test_suite = TestSuite(tests=[
    TestNumberOfDriftedColumns(lt=3),
    TestShareOfDriftedColumns(lt=0.3),
    TestColumnDrift(column_name="amount"),
    TestColumnDrift(column_name="transaction_count_7d"),
])
test_suite.run(
    reference_data=reference_df,
    current_data=production_df
)

# Alert on test failures
results = test_suite.as_dict()
failed_tests = [t for t in results["tests"] if t["status"] == "FAIL"]
if failed_tests:
    send_alert(f"Monitoring alert: {len(failed_tests)} tests failed")

Prometheus + Grafana Metrics

# src/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge

prediction_counter = Counter(
    "model_predictions_total",
    "Total prediction count",
    ["model_version", "result"]
)
prediction_latency = Histogram(
    "model_prediction_latency_seconds",
    "Prediction latency in seconds",
    buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0]
)
model_accuracy = Gauge(
    "model_accuracy_current",
    "Current model accuracy"
)

import time

def predict_with_monitoring(features, model_version="v2.1"):
    start_time = time.time()
    prediction = model.predict(features)
    latency = time.time() - start_time

    prediction_latency.observe(latency)
    prediction_counter.labels(
        model_version=model_version,
        result="fraud" if prediction[0] == 1 else "normal"
    ).inc()

    return prediction

LLMOps

LLMOps is the extension of MLOps for developing, deploying, and operating large language models.

LLM Pipeline Unique Challenges

  • Non-deterministic outputs: Same input may produce different outputs — complex to evaluate
  • Prompt sensitivity: Small changes cause large performance swings
  • High-cost fine-tuning: Requires significant GPU resources
  • Hallucination: Model generates factually incorrect information
  • Context length management: Handling long contexts efficiently

LangSmith for LLM Tracing

from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langsmith import Client
import os

# LangSmith configuration
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "production-chatbot"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"

# LangChain chain (automatically traced in LangSmith)
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
prompt = ChatPromptTemplate.from_template(
    "You are a helpful customer service agent.\n\nQuestion: {question}\n\nAnswer:"
)
chain = prompt | llm

# Invocation - auto-traced
response = chain.invoke({"question": "What is your refund policy?"})

# Evaluation with LangSmith client
langsmith_client = Client()

dataset = langsmith_client.create_dataset(
    dataset_name="customer-service-eval",
    description="Customer service chatbot evaluation dataset"
)

langsmith_client.create_examples(
    inputs=[{"question": "What is your refund policy?"}],
    outputs=[{"answer": "Refunds are available within 30 days of purchase."}],
    dataset_id=dataset.id
)

from langsmith.evaluation import evaluate, LangChainStringEvaluator

evaluators = [
    LangChainStringEvaluator("cot_qa"),
    LangChainStringEvaluator("labeled_criteria", config={"criteria": "correctness"})
]
results = evaluate(
    lambda x: chain.invoke(x),
    data=dataset.name,
    evaluators=evaluators,
    experiment_prefix="gpt4o-baseline"
)

Prompt Version Control

# prompt_registry.py
import mlflow
from dataclasses import dataclass
from typing import Optional

@dataclass
class PromptVersion:
    template: str
    version: str
    description: str
    metrics: Optional[dict] = None

class PromptRegistry:
    def __init__(self, mlflow_uri: str):
        mlflow.set_tracking_uri(mlflow_uri)
        self.experiment_name = "prompt-versions"
        mlflow.set_experiment(self.experiment_name)

    def register_prompt(self, prompt: PromptVersion) -> str:
        with mlflow.start_run(run_name=f"prompt-{prompt.version}") as run:
            mlflow.log_param("version", prompt.version)
            mlflow.log_param("description", prompt.description)
            mlflow.log_text(prompt.template, "prompt_template.txt")
            if prompt.metrics:
                mlflow.log_metrics(prompt.metrics)
            return run.info.run_id

    def get_prompt(self, version: str) -> str:
        client = mlflow.tracking.MlflowClient()
        runs = client.search_runs(
            experiment_ids=[mlflow.get_experiment_by_name(self.experiment_name).experiment_id],
            filter_string=f"params.version = '{version}'"
        )
        if not runs:
            raise ValueError(f"Prompt version {version} not found")
        artifact_uri = runs[0].info.artifact_uri
        return mlflow.artifacts.load_text(f"{artifact_uri}/prompt_template.txt")

# Usage
registry = PromptRegistry("http://mlflow-server:5000")
registry.register_prompt(PromptVersion(
    template="You are a {role}. {context}\n\nQuestion: {question}\nAnswer:",
    version="v1.2.0",
    description="Improved prompt with context injection",
    metrics={"accuracy": 0.87, "hallucination_rate": 0.03}
))

LLM Fine-tuning Pipeline

# fine_tuning_pipeline.py
from transformers import AutoModelForCausalLM, TrainingArguments, Trainer
from peft import LoraConfig, get_peft_model, TaskType
import mlflow

def fine_tune_with_lora(
    base_model: str,
    dataset_path: str,
    output_dir: str,
    lora_r: int = 16,
    lora_alpha: int = 32
):
    mlflow.set_experiment("llm-fine-tuning")

    with mlflow.start_run():
        lora_config = LoraConfig(
            task_type=TaskType.CAUSAL_LM,
            r=lora_r,
            lora_alpha=lora_alpha,
            target_modules=["q_proj", "v_proj"],
            lora_dropout=0.05,
            bias="none"
        )
        mlflow.log_params({
            "base_model": base_model,
            "lora_r": lora_r,
            "lora_alpha": lora_alpha
        })

        model = AutoModelForCausalLM.from_pretrained(base_model)
        model = get_peft_model(model, lora_config)
        model.print_trainable_parameters()

        training_args = TrainingArguments(
            output_dir=output_dir,
            num_train_epochs=3,
            per_device_train_batch_size=4,
            gradient_accumulation_steps=4,
            learning_rate=2e-4,
            fp16=True,
            report_to="mlflow"
        )

        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=train_dataset,
        )
        trainer.train()

        model.save_pretrained(output_dir)
        mlflow.transformers.log_model(
            transformers_model={"model": model, "tokenizer": tokenizer},
            artifact_path="fine-tuned-model",
            registered_model_name="customer-service-llm"
        )

Quiz

Q1. List and explain the 4 automated CT (Continuous Training) trigger conditions in MLOps Level 2.

Answer: Data trigger, performance trigger, drift trigger, schedule trigger

Explanation:

  1. Data trigger: Automatic retraining starts when new training data reaches a threshold (e.g., 100k records) or a new data batch arrives in the pipeline.
  2. Performance trigger: Fires when production model accuracy, F1-score, or other KPIs fall below a predefined threshold (e.g., accuracy below 0.92).
  3. Drift trigger: Fires when the ratio of drifted features detected by tools like Evidently exceeds a threshold (e.g., over 30% of features show drift).
  4. Schedule trigger: Periodic retraining based on business requirements (e.g., every Monday at 2 AM) to maintain data freshness.
Q2. Explain why online and offline stores are kept separate in a feature store.

Answer: To independently optimize for the different access patterns and performance requirements of training and inference.

Explanation:

  • The offline store (S3, BigQuery) serves model training. It must batch-scan millions of historical records, so throughput and cost-efficiency matter most. High latency (seconds to minutes) is acceptable.
  • The online store (Redis, DynamoDB) serves real-time inference. It must retrieve the latest feature values for a given entity (user ID, product ID) within milliseconds, so it is optimized for low-latency single-key lookups.
  • Without separation, large batch scans during training would interfere with real-time inference queries, or the cost would explode when trying to meet real-time requirements from a single system.
Q3. Explain the difference between data drift and concept drift, and how to detect each.

Answer: Data drift is a change in P(X); concept drift is a change in P(Y|X).

Explanation:

  • Data drift: The statistical distribution of input features changes. Detected using Kolmogorov-Smirnov tests, Population Stability Index (PSI), or JS Divergence — all without requiring labels. Evidently's DataDriftPreset is a popular tool.
  • Concept drift: The correct output for the same input changes over time. For example, a new type of fraud emerges that the existing model does not recognize. Requires actual labels and is detected via model performance degradation (accuracy, F1 decline). When labels are delayed, proxy metrics can be used.
Q4. Explain how DVC manages large ML data alongside Git.

Answer: DVC commits pointer (metadata) files to Git and stores actual data in remote storage.

Explanation: DVC does not store large files (datasets, models) directly in Git. Instead it creates .dvc metadata files containing the MD5 hash, size, and path of the actual data, and these pointer files are committed to Git. The actual data is uploaded to remote storage (S3, GCS, Azure Blob) with dvc push. Any environment can download the exact same version of the data with dvc pull. Because each Git commit is linked 1:1 with a DVC data version, full experiment reproducibility is guaranteed.

Q5. Describe the validation checklist before promoting a model from Staging to Production in MLflow Model Registry.

Answer: Performance validation, fairness validation, integration tests, latency testing, data schema compatibility check.

Explanation:

  1. Performance validation: Confirm that accuracy, F1, AUC, or other metrics on a holdout test set or recent production data are equal to or better than the current Production model.
  2. Fairness validation: Review per-slice metrics to ensure no performance bias across demographic groups or age cohorts.
  3. Integration tests: Verify end-to-end prediction works correctly in the actual serving environment (API, feature store connections).
  4. Latency testing: Run load tests to confirm mean response time and P99 latency meet the defined SLA.
  5. Schema compatibility: Confirm that the input feature schema and output format are compatible with the current serving infrastructure.