- Authors

- Name
- Youngju Kim
- @fjvbn20031
目次
- MLOps概要と成熟度モデル
- 実験追跡: MLflow と Weights and Biases
- データバージョン管理: DVC
- フィーチャーストア
- モデルレジストリ
- ML向けCI/CD
- モデルモニタリングとドリフト検出
- LLMOps
- クイズ
MLOps概要と成熟度モデル
MLOps(Machine Learning Operations)は、MLシステムを本番環境で安定して運用するためのプラクティス、ツール、文化の集合体です。DevOpsの原則をMLワークフローに適用し、モデル開発から展開、モニタリング、再学習までの全ライフサイクルを自動化します。
MLOpsが必要な理由
MLプロジェクトの95%以上が本番デプロイに失敗するという統計があります。主な原因は以下の通りです。
- 再現不可能な実験: コード、データ、環境がバージョン管理されていない
- 手動デプロイプロセス: 遅くてエラーが発生しやすい
- モニタリング不足: モデルのパフォーマンス低下の発見が遅れる
- チームのサイロ化: データサイエンスチームとエンジニアリングチームの断絶
MLOps成熟度レベル
GoogleのMLOps成熟度モデルは3つのステージを定義しています。
Level 0: 手動プロセス
すべてが手動で行われます。データサイエンティストがJupyter Notebookで実験し、結果を手動でデプロイします。
| 特徴 | 説明 |
|---|---|
| デプロイ頻度 | 数ヶ月に1回 |
| 自動化レベル | なし |
| 再現性 | 低い |
| モニタリング | なし、または手動 |
制限事項: 実験追跡なし、コードとデータのバージョン不整合、デプロイエラー、モデル性能低下の検出不可
Level 1: MLパイプラインの自動化
CT(Continuous Training)が導入されます。データパイプラインとモデル学習は自動化されますが、CI/CDはまだ手動です。
主要コンポーネント:
- 自動化されたデータ検証パイプライン
- フィーチャーエンジニアリングパイプライン
- モデル学習パイプライン(Kubeflow Pipelines、Apache Airflowなど)
- モデル性能評価の自動化
- フィーチャーストアの導入
# Kubeflow Pipelineの例 - Level 1 CTパイプライン
import kfp
from kfp import dsl
@dsl.component
def data_validation_op(data_path: str) -> bool:
import great_expectations as ge
ds = ge.read_csv(data_path)
results = ds.expect_column_values_to_not_be_null("target")
return results["success"]
@dsl.component
def train_model_op(data_path: str, model_output: str):
import mlflow
# モデル学習ロジック
pass
@dsl.pipeline(name="CT Pipeline")
def ct_pipeline(data_path: str):
validation = data_validation_op(data_path=data_path)
with dsl.Condition(validation.output == True):
train_model_op(data_path=data_path, model_output="/models/")
Level 2: CI/CDパイプラインの自動化
完全なMLOps自動化のステージです。コード、データ、モデルがすべてバージョン管理され、CI/CD/CTが完全に自動化されます。
自動化トリガー条件:
- 新しい学習データの到達(スケジュールまたはデータ量のしきい値)
- モデルパフォーマンス指標の低下検出
- データドリフトの検出
- コード変更(新機能、アルゴリズムの改善)
Level 2アーキテクチャ:
ソースコード変更またはデータトリガー
↓
CIパイプライン(テスト、ビルド)
↓
CDパイプライン(パイプラインのデプロイ)
↓
CTパイプライン(自動再学習)
↓
モデル評価 → 合格/不合格ゲート
↓
モデルレジストリへの登録
↓
Staging → Productionへの昇格
↓
モニタリングとアラート
実験追跡: MLflow と Weights and Biases
MLflow完全ガイド
MLflowはMLライフサイクル管理のためのオープンソースプラットフォームです。4つのコアコンポーネントで構成されています。
MLflow Tracking
実験パラメータ、メトリクス、アーティファクトを追跡します。
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
# MLflow Trackingサーバーの設定
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("fraud-detection-v2")
with mlflow.start_run(run_name="rf-baseline") as run:
# ハイパーパラメータのログ
params = {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 5,
"random_state": 42
}
mlflow.log_params(params)
# モデル学習
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# メトリクスのログ
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred, average="weighted"),
}
mlflow.log_metrics(metrics)
# モデルの保存(署名付き)
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
registered_model_name="fraud-detection",
input_example=X_test[:5],
signature=mlflow.models.infer_signature(X_train, y_pred)
)
# カスタムアーティファクトのログ
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.savefig("confusion_matrix.png")
mlflow.log_artifact("confusion_matrix.png")
print(f"Run ID: {run.info.run_id}")
print(f"Accuracy: {metrics['accuracy']:.4f}")
MLflow Autolog
フレームワーク別の自動ログでボイラープレートを最小化:
import mlflow
# フレームワーク自動検出とログ
mlflow.autolog()
# PyTorch専用autolog
mlflow.pytorch.autolog(
log_every_n_epoch=1,
log_models=True,
disable=False,
log_datasets=True
)
# XGBoost専用autolog
mlflow.xgboost.autolog(
log_input_examples=True,
log_model_signatures=True,
log_models=True,
log_datasets=True
)
MLflow Projects
再現可能なMLプロジェクトのパッケージング:
# MLprojectファイル
name: fraud-detection
conda_env: conda.yaml
entry_points:
main:
parameters:
n_estimators: { type: int, default: 100 }
max_depth: { type: int, default: 10 }
data_path: { type: str, default: 'data/train.csv' }
command: 'python train.py --n_estimators {n_estimators} --max_depth {max_depth} --data_path {data_path}'
evaluate:
parameters:
model_uri: { type: str }
test_data: { type: str }
command: 'python evaluate.py --model_uri {model_uri} --test_data {test_data}'
Weights & Biases (W&B)
W&Bは実験追跡、可視化、ハイパーパラメータ最適化を提供するMLOpsプラットフォームです。
import wandb
# W&B実行の初期化
run = wandb.init(
project="image-classification",
config={
"learning_rate": 0.001,
"epochs": 50,
"batch_size": 32,
"architecture": "ResNet50"
}
)
# W&B Sweepによるハイパーパラメータ最適化
sweep_config = {
"method": "bayes",
"metric": {"name": "val_accuracy", "goal": "maximize"},
"parameters": {
"learning_rate": {"min": 1e-5, "max": 1e-2},
"batch_size": {"values": [16, 32, 64]},
"dropout": {"min": 0.1, "max": 0.5}
}
}
sweep_id = wandb.sweep(sweep_config, project="image-classification")
wandb.agent(sweep_id, function=train_fn, count=50)
データバージョン管理: DVC
DVC(Data Version Control)はGitと連携して大規模データセットとMLパイプラインをバージョン管理するツールです。
DVCの仕組み
DVCは大容量ファイルをGitに直接保存する代わりに、.dvcメタデータファイル(ポインタ)を作成してGitにコミットします。実際のデータはS3、GCS、Azure Blob、SSHなどのリモートストレージに保存されます。
# DVC初期化
git init
dvc init
# リモートストレージの設定(S3)
dvc remote add -d myremote s3://my-bucket/dvc-store
dvc remote modify myremote region us-east-1
# データファイルの追加
dvc add data/train.csv
git add data/train.csv.dvc .gitignore
git commit -m "Add training data v1"
dvc push
# 別の環境でデータを取得
git pull
dvc pull
DVCパイプライン(dvc.yaml)
再現可能なMLパイプラインの宣言的定義:
# dvc.yaml
stages:
prepare:
cmd: python src/prepare.py --input data/raw.csv --output data/processed/
deps:
- src/prepare.py
- data/raw.csv
outs:
- data/processed/train.csv
- data/processed/test.csv
params:
- prepare:
- split_ratio
- random_seed
featurize:
cmd: python src/featurize.py
deps:
- src/featurize.py
- data/processed/train.csv
outs:
- data/features/train_features.pkl
params:
- featurize:
- max_features
- ngrams
train:
cmd: python src/train.py
deps:
- src/train.py
- data/features/train_features.pkl
outs:
- models/model.pkl
metrics:
- reports/metrics.json:
cache: false
params:
- train:
- n_estimators
- max_depth
- random_seed
evaluate:
cmd: python src/evaluate.py
deps:
- src/evaluate.py
- models/model.pkl
- data/processed/test.csv
metrics:
- reports/eval_metrics.json:
cache: false
plots:
- reports/plots/confusion_matrix.csv:
cache: false
DVC実験管理
# パイプラインの実行
dvc repro
# 実験ブランチの作成
dvc exp run --set-param train.n_estimators=200 --name exp-200-trees
# 実験の比較
dvc exp show
# メトリクス表の表示
dvc metrics show
dvc metrics diff
フィーチャーストア
フィーチャーストアは、MLフィーチャーを一元的に保存、共有、サービング(提供)するためのデータレイヤーです。
フィーチャーストアが必要な理由
- 学習/サービングスキューの解消: 学習と推論で同一のフィーチャー変換を保証
- フィーチャーの再利用: チーム間でフィーチャーを共有し重複作業を排除
- 低レイテンシーサービング: オンライン予測のためのリアルタイムフィーチャー検索
- フィーチャーの一貫性: バッチとストリーミングパイプライン間の一貫性を維持
オンラインストア vs オフラインストア
| 区分 | オンラインストア | オフラインストア |
|---|---|---|
| 目的 | リアルタイム推論サービング | モデル学習 |
| レイテンシー | ミリ秒 | 秒〜分 |
| ストレージ | Redis、DynamoDB、Cassandra | S3、BigQuery、Hive |
| データ量 | 最新状態(現在値) | 全履歴 |
| クエリパターン | 単件検索(キー基準) | バッチスキャン |
Feastフィーチャーストア
# feature_repo/feature_store.yaml
project: fraud_detection
registry: data/registry.db
provider: local
online_store:
type: redis
connection_string: "localhost:6379"
offline_store:
type: bigquery
dataset: feast_dev
# feature_repo/features.py
from datetime import timedelta
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from feast.types import Float32, Int64
# エンティティの定義
user = Entity(
name="user_id",
value_type=ValueType.INT64,
description="ユーザーID"
)
# データソースの定義
user_stats_source = FileSource(
path="data/user_stats.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created"
)
# フィーチャービューの定義
user_stats_fv = FeatureView(
name="user_stats",
entities=["user_id"],
ttl=timedelta(days=7),
features=[
Feature(name="transaction_count_7d", dtype=Float32),
Feature(name="avg_transaction_amount", dtype=Float32),
Feature(name="days_since_last_login", dtype=Int64),
Feature(name="account_age_days", dtype=Int64),
],
online=True,
source=user_stats_source,
tags={"team": "fraud", "version": "v2"},
)
# フィーチャーストアの使用
from feast import FeatureStore
import pandas as pd
store = FeatureStore(repo_path="feature_repo/")
# 学習データの取得(オフライン)
entity_df = pd.DataFrame({
"user_id": [1001, 1002, 1003],
"event_timestamp": pd.to_datetime(["2026-03-01", "2026-03-01", "2026-03-01"])
})
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_stats:transaction_count_7d",
"user_stats:avg_transaction_amount",
"user_stats:days_since_last_login",
]
).to_df()
# オンラインサービング - リアルタイムフィーチャー取得
feature_vector = store.get_online_features(
features=[
"user_stats:transaction_count_7d",
"user_stats:avg_transaction_amount",
],
entity_rows=[{"user_id": 1001}]
).to_dict()
フィーチャードリフト検出
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
# フィーチャードリフトレポートの生成
report = Report(metrics=[DataDriftPreset()])
report.run(
reference_data=reference_features,
current_data=current_features,
column_mapping=ColumnMapping(target="label")
)
report.save_html("feature_drift_report.html")
# ドリフト結果の確認
results = report.as_dict()
drifted_features = [
col for col, info in results["metrics"][0]["result"]["drift_by_columns"].items()
if info["drift_detected"]
]
print(f"ドリフト検出されたフィーチャー: {drifted_features}")
モデルレジストリ
MLflow Model Registry
MLflow Model Registryは、モデルのバージョン管理、ステージ遷移、チームコラボレーションのための中央リポジトリです。
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient()
# 新しいモデルの登録
model_uri = f"runs:/{run_id}/model"
model_version = mlflow.register_model(
model_uri=model_uri,
name="fraud-detection"
)
# モデルの説明を追加
client.update_registered_model(
name="fraud-detection",
description="決済詐欺検出モデル - RandomForestベース"
)
client.update_model_version(
name="fraud-detection",
version=model_version.version,
description=f"Accuracy: 0.956, F1: 0.943 on test set"
)
# Stagingへの遷移
client.transition_model_version_stage(
name="fraud-detection",
version=model_version.version,
stage="Staging",
archive_existing_versions=False
)
# Stagingモデルのロードと検証
staging_model = mlflow.pyfunc.load_model(
model_uri="models:/fraud-detection/Staging"
)
staging_preds = staging_model.predict(X_val)
staging_accuracy = accuracy_score(y_val, staging_preds)
# 検証通過時にProductionへ昇格
if staging_accuracy > 0.95:
client.transition_model_version_stage(
name="fraud-detection",
version=model_version.version,
stage="Production",
archive_existing_versions=True
)
print(f"モデル v{model_version.version} のProduction昇格完了")
Hugging Face Hubモデルレジストリ
from huggingface_hub import HfApi
api = HfApi()
# モデルのアップロード
api.upload_folder(
folder_path="./fine-tuned-model",
repo_id="myorg/sentiment-classifier-v2",
repo_type="model",
)
# 特定バージョンのタグ付け
api.create_tag(
repo_id="myorg/sentiment-classifier-v2",
tag="v2.1.0",
tag_message="Improved accuracy on edge cases"
)
ML向けCI/CD
GitHub Actions MLパイプライン
# .github/workflows/ml-cicd.yml
name: ML CI/CD Pipeline
on:
push:
branches: [main, develop]
paths:
- 'src/**'
- 'params.yaml'
- 'dvc.yaml'
schedule:
- cron: '0 2 * * 1' # 毎週月曜午前2時に自動再学習
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run unit tests
run: pytest tests/ -v --cov=src
- name: Data validation
run: python src/validate_data.py
train-and-evaluate:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Configure DVC remote
run: |
dvc remote modify myremote access_key_id ${{ secrets.AWS_ACCESS_KEY_ID }}
dvc remote modify myremote secret_access_key ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Pull data
run: dvc pull
- name: Run DVC pipeline
run: dvc repro
- name: Log metrics to MLflow
run: python src/log_results.py
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
- name: Check model performance gate
run: |
python src/check_performance_gate.py \
--min-accuracy 0.95 \
--min-f1 0.93
- name: Push results
run: |
dvc push
git add reports/metrics.json dvc.lock
git commit -m "chore: update metrics [skip ci]"
git push
deploy-staging:
needs: train-and-evaluate
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Promote model to Staging
run: python src/promote_model.py --stage Staging
- name: Run integration tests
run: pytest tests/integration/ -v
- name: Deploy to staging endpoint
run: kubectl apply -f k8s/staging/
deploy-production:
needs: deploy-staging
runs-on: ubuntu-latest
environment: production
steps:
- name: Promote model to Production
run: python src/promote_model.py --stage Production
- name: Blue/Green deployment
run: ./scripts/blue_green_deploy.sh
- name: Smoke tests
run: pytest tests/smoke/ -v
自動再学習トリガー
# src/check_retrain_trigger.py
import mlflow
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
def should_retrain(
current_data,
reference_data,
performance_threshold=0.92,
drift_threshold=0.3
) -> tuple[bool, str]:
"""再学習が必要かどうかを判断する"""
# 1. パフォーマンスベースのトリガー
current_metrics = get_current_metrics()
if current_metrics["accuracy"] < performance_threshold:
return True, f"パフォーマンス低下: accuracy={current_metrics['accuracy']:.3f}"
# 2. データドリフトトリガー
report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=reference_data, current_data=current_data)
results = report.as_dict()
drift_share = results["metrics"][0]["result"]["share_of_drifted_columns"]
if drift_share > drift_threshold:
return True, f"データドリフト: {drift_share:.1%} のフィーチャーにドリフト検出"
return False, "再学習不要"
モデルモニタリングとドリフト検出
データドリフト vs コンセプトドリフト
データドリフト(Data Drift): 入力フィーチャーの統計的分布が変化します。P(X)が変化しますがP(Y|X)は維持されます。例: ユーザー年齢分布の変化、取引金額分布の変化。
コンセプトドリフト(Concept Drift): 入力と出力の関係が変化します。P(Y|X)が変化します。例: 新しい詐欺パターンの出現、ユーザー好みの変化。
Evidentlyによるドリフトモニタリング
import pandas as pd
from evidently.report import Report
from evidently.test_suite import TestSuite
from evidently import ColumnMapping
from evidently.metric_preset import (
DataDriftPreset,
DataQualityPreset,
TargetDriftPreset,
ClassificationPreset
)
from evidently.tests import (
TestNumberOfDriftedColumns,
TestShareOfDriftedColumns,
TestColumnDrift
)
# カラムマッピングの設定
column_mapping = ColumnMapping(
target="fraud_label",
prediction="fraud_score",
numerical_features=["amount", "transaction_count_7d", "avg_amount"],
categorical_features=["merchant_category", "payment_method"]
)
# 総合ドリフトレポート
report = Report(metrics=[
DataDriftPreset(),
DataQualityPreset(),
TargetDriftPreset(),
ClassificationPreset()
])
report.run(
reference_data=reference_df,
current_data=production_df,
column_mapping=column_mapping
)
report.save_html("monitoring/report.html")
# アラートテストスイート
test_suite = TestSuite(tests=[
TestNumberOfDriftedColumns(lt=3),
TestShareOfDriftedColumns(lt=0.3),
TestColumnDrift(column_name="amount"),
TestColumnDrift(column_name="transaction_count_7d"),
])
test_suite.run(
reference_data=reference_df,
current_data=production_df
)
# テスト失敗時のアラート
results = test_suite.as_dict()
failed_tests = [t for t in results["tests"] if t["status"] == "FAIL"]
if failed_tests:
send_alert(f"モニタリングアラート: {len(failed_tests)}件のテストが失敗")
Prometheus + Grafanaメトリクス
# src/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time
prediction_counter = Counter(
"model_predictions_total",
"総予測数",
["model_version", "result"]
)
prediction_latency = Histogram(
"model_prediction_latency_seconds",
"予測レイテンシー(秒)",
buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0]
)
model_accuracy = Gauge(
"model_accuracy_current",
"現在のモデル精度"
)
def predict_with_monitoring(features, model_version="v2.1"):
start_time = time.time()
prediction = model.predict(features)
latency = time.time() - start_time
prediction_latency.observe(latency)
prediction_counter.labels(
model_version=model_version,
result="fraud" if prediction[0] == 1 else "normal"
).inc()
return prediction
LLMOps
LLMOpsは大規模言語モデルの開発、デプロイ、運用のためのMLOpsの拡張です。
LLMパイプラインの特有の課題
- 非決定論的な出力: 同じ入力でも異なる出力 → 評価が複雑
- プロンプトの感度: 小さな変更が大きなパフォーマンス差を生む
- 高コストなファインチューニング: 大規模なGPUリソースが必要
- ハルシネーション(幻覚): 事実と異なる情報の生成
- コンテキスト長の管理: 長いコンテキストの効率的な処理
LangSmithによるLLMトレーシング
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langsmith import Client
import os
# LangSmithの設定
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "production-chatbot"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
# LangChainチェーン(LangSmithに自動トレース記録)
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
prompt = ChatPromptTemplate.from_template(
"あなたは親切なカスタマーサービスエージェントです。\n\n質問: {question}\n\n回答:"
)
chain = prompt | llm
# 実行 - 自動トレーシング
response = chain.invoke({"question": "返金ポリシーを教えてください"})
# LangSmithクライアントによる評価
langsmith_client = Client()
dataset = langsmith_client.create_dataset(
dataset_name="customer-service-eval",
description="カスタマーサービスチャットボット評価データセット"
)
langsmith_client.create_examples(
inputs=[{"question": "返金ポリシーを教えてください"}],
outputs=[{"answer": "購入後30日以内に返金が可能です。"}],
dataset_id=dataset.id
)
from langsmith.evaluation import evaluate, LangChainStringEvaluator
evaluators = [
LangChainStringEvaluator("cot_qa"),
LangChainStringEvaluator("labeled_criteria", config={"criteria": "correctness"})
]
results = evaluate(
lambda x: chain.invoke(x),
data=dataset.name,
evaluators=evaluators,
experiment_prefix="gpt4o-baseline"
)
プロンプトバージョン管理
# prompt_registry.py
import mlflow
from dataclasses import dataclass
from typing import Optional
@dataclass
class PromptVersion:
template: str
version: str
description: str
metrics: Optional[dict] = None
class PromptRegistry:
def __init__(self, mlflow_uri: str):
mlflow.set_tracking_uri(mlflow_uri)
self.experiment_name = "prompt-versions"
mlflow.set_experiment(self.experiment_name)
def register_prompt(self, prompt: PromptVersion) -> str:
with mlflow.start_run(run_name=f"prompt-{prompt.version}") as run:
mlflow.log_param("version", prompt.version)
mlflow.log_param("description", prompt.description)
mlflow.log_text(prompt.template, "prompt_template.txt")
if prompt.metrics:
mlflow.log_metrics(prompt.metrics)
return run.info.run_id
def get_prompt(self, version: str) -> str:
client = mlflow.tracking.MlflowClient()
runs = client.search_runs(
experiment_ids=[mlflow.get_experiment_by_name(self.experiment_name).experiment_id],
filter_string=f"params.version = '{version}'"
)
if not runs:
raise ValueError(f"プロンプトバージョン {version} が見つかりません")
artifact_uri = runs[0].info.artifact_uri
return mlflow.artifacts.load_text(f"{artifact_uri}/prompt_template.txt")
# 使用例
registry = PromptRegistry("http://mlflow-server:5000")
registry.register_prompt(PromptVersion(
template="あなたは{role}です。{context}\n\n質問: {question}\n回答:",
version="v1.2.0",
description="コンテキスト注入付きプロンプトの改善",
metrics={"accuracy": 0.87, "hallucination_rate": 0.03}
))
LLMファインチューニングパイプライン
# fine_tuning_pipeline.py
from transformers import AutoModelForCausalLM, TrainingArguments, Trainer
from peft import LoraConfig, get_peft_model, TaskType
import mlflow
def fine_tune_with_lora(
base_model: str,
dataset_path: str,
output_dir: str,
lora_r: int = 16,
lora_alpha: int = 32
):
mlflow.set_experiment("llm-fine-tuning")
with mlflow.start_run():
lora_config = LoraConfig(
task_type=TaskType.CAUSAL_LM,
r=lora_r,
lora_alpha=lora_alpha,
target_modules=["q_proj", "v_proj"],
lora_dropout=0.05,
bias="none"
)
mlflow.log_params({
"base_model": base_model,
"lora_r": lora_r,
"lora_alpha": lora_alpha
})
model = AutoModelForCausalLM.from_pretrained(base_model)
model = get_peft_model(model, lora_config)
model.print_trainable_parameters()
training_args = TrainingArguments(
output_dir=output_dir,
num_train_epochs=3,
per_device_train_batch_size=4,
gradient_accumulation_steps=4,
learning_rate=2e-4,
fp16=True,
report_to="mlflow"
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
)
trainer.train()
model.save_pretrained(output_dir)
mlflow.transformers.log_model(
transformers_model={"model": model, "tokenizer": tokenizer},
artifact_path="fine-tuned-model",
registered_model_name="customer-service-llm"
)
クイズ
Q1. MLOps Level 2でCT(Continuous Training)が自動化される4つのトリガー条件を説明してください。
答え: データトリガー、パフォーマンストリガー、ドリフトトリガー、スケジュールトリガー
解説:
- データトリガー: 新しい学習データが特定のしきい値(例: 10万件)に達した時、または新しいデータバッチがパイプラインに流入した時に自動再学習が開始されます。
- パフォーマンストリガー: 本番モデルの精度やF1スコアなどが事前定義されたしきい値(例: accuracy < 0.92)を下回った時にトリガーされます。
- ドリフトトリガー: Evidentlyなどのツールで検出されたデータドリフト率がしきい値(例: 30%以上のフィーチャーにドリフト)を超えた時にトリガーされます。
- スケジュールトリガー: ビジネス要件に基づいた定期的な再学習(例: 毎週月曜の午前2時)でデータの鮮度を維持します。
Q2. フィーチャーストアでオンラインとオフラインのストアを分離する理由を説明してください。
答え: 学習と推論の異なるアクセスパターンとパフォーマンス要件をそれぞれ最適化するためです。
解説:
- オフラインストア(S3、BigQuery)はモデル学習用です。数百万件の履歴データをバッチでスキャンする必要があるため、大量処理とコスト効率が重要です。高いレイテンシー(秒〜分)を許容できます。
- オンラインストア(Redis、DynamoDB)はリアルタイム推論用です。特定のエンティティ(ユーザーID、商品ID)の最新フィーチャーを数ミリ秒以内に取得する必要があるため、低レイテンシーの単件検索に最適化されています。
- 分離しないと、学習時の大容量スキャンクエリがリアルタイム推論に干渉するか、リアルタイム要件に合わせることでコストが急増します。
Q3. データドリフトとコンセプトドリフトの違いと、それぞれの検出方法を説明してください。
答え: データドリフトはP(X)の変化、コンセプトドリフトはP(Y|X)の変化です。
解説:
- データドリフト: 入力フィーチャーの統計的分布が変化します。Kolmogorov-Smirnov検定、PSI(Population Stability Index)、JS Divergenceなどで検出します。ラベルなしでも検出可能で、EvidentlyのDataDriftPresetが代表的です。
- コンセプトドリフト: 同じ入力に対する正しい出力が変化します。例えば詐欺検出では新しい詐欺パターンが出現すると既存のモデルが機能しなくなります。実際のラベルが必要で、モデルパフォーマンス(accuracy、F1)の低下として検出します。ラベルが遅れて届く場合はプロキシメトリクスを活用します。
Q4. DVCがGitで大容量MLデータを管理する仕組みを説明してください。
答え: ポインタ(メタデータ)ファイルをGitに保存し、実際のデータはリモートストレージに保存します。
解説:
DVCは大容量ファイル(データセット、モデル)をGitに直接保存しません。代わりに.dvc拡張子のメタデータファイルを作成してGitで追跡します。このファイルには実際のデータのMD5ハッシュ、サイズ、パスなどが保存されます。実際のデータはS3、GCS、Azure Blobなどのリモートストレージにdvc pushでアップロードされます。別の環境でもdvc pullで全く同じバージョンのデータをダウンロードできます。Gitコミットとデータバージョンが1対1で紐付けられ、実験の再現性が保証されます。
Q5. MLflow Model RegistryでStagingからProductionへの昇格前に検証すべき項目を説明してください。
答え: パフォーマンス検証、公平性検証、統合テスト、レイテンシーテスト、データスキーマ互換性の確認
解説:
- パフォーマンス検証: ホールドアウトテストセットまたは最近の本番データで、精度、F1、AUCなどが現在のProductionモデルと同等以上であることを確認します。
- 公平性検証: 特定の人口集団や年齢層などでパフォーマンスのバイアスがないか、スライス別のメトリクスを確認します。
- 統合テスト: 実際のサービング環境(API、フィーチャーストア接続)でエンドツーエンドの予測が正常に動作するか確認します。
- レイテンシーテスト: 平均応答時間とP99レイテンシーがSLAを満たすか、負荷テストを実施します。
- スキーマ互換性: 入力フィーチャースキーマと出力形式が現在のサービングインフラと互換性があるか確認します。