Skip to content

필사 모드: MLOps & モデルライフサイクル完全ガイド: MLflow、DVC、LLMOpsまで

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

目次

1. [MLOps概要と成熟度モデル](#mlops概要と成熟度モデル)

2. [実験追跡: MLflow と Weights and Biases](#実験追跡-mlflow-と-weights-and-biases)

3. [データバージョン管理: DVC](#データバージョン管理-dvc)

4. [フィーチャーストア](#フィーチャーストア)

5. [モデルレジストリ](#モデルレジストリ)

6. [ML向けCI/CD](#ml向けcicd)

7. [モデルモニタリングとドリフト検出](#モデルモニタリングとドリフト検出)

8. [LLMOps](#llmops)

9. [クイズ](#クイズ)

MLOps概要と成熟度モデル

MLOps(Machine Learning Operations)は、MLシステムを本番環境で安定して運用するためのプラクティス、ツール、文化の集合体です。DevOpsの原則をMLワークフローに適用し、モデル開発から展開、モニタリング、再学習までの全ライフサイクルを自動化します。

MLOpsが必要な理由

MLプロジェクトの95%以上が本番デプロイに失敗するという統計があります。主な原因は以下の通りです。

- **再現不可能な実験**: コード、データ、環境がバージョン管理されていない

- **手動デプロイプロセス**: 遅くてエラーが発生しやすい

- **モニタリング不足**: モデルのパフォーマンス低下の発見が遅れる

- **チームのサイロ化**: データサイエンスチームとエンジニアリングチームの断絶

MLOps成熟度レベル

GoogleのMLOps成熟度モデルは3つのステージを定義しています。

Level 0: 手動プロセス

すべてが手動で行われます。データサイエンティストがJupyter Notebookで実験し、結果を手動でデプロイします。

| 特徴 | 説明 |

| ------------ | ---------------- |

| デプロイ頻度 | 数ヶ月に1回 |

| 自動化レベル | なし |

| 再現性 | 低い |

| モニタリング | なし、または手動 |

**制限事項**: 実験追跡なし、コードとデータのバージョン不整合、デプロイエラー、モデル性能低下の検出不可

Level 1: MLパイプラインの自動化

CT(Continuous Training)が導入されます。データパイプラインとモデル学習は自動化されますが、CI/CDはまだ手動です。

**主要コンポーネント**:

- 自動化されたデータ検証パイプライン

- フィーチャーエンジニアリングパイプライン

- モデル学習パイプライン(Kubeflow Pipelines、Apache Airflowなど)

- モデル性能評価の自動化

- フィーチャーストアの導入

Kubeflow Pipelineの例 - Level 1 CTパイプライン

from kfp import dsl

@dsl.component

def data_validation_op(data_path: str) -> bool:

ds = ge.read_csv(data_path)

results = ds.expect_column_values_to_not_be_null("target")

return results["success"]

@dsl.component

def train_model_op(data_path: str, model_output: str):

モデル学習ロジック

pass

@dsl.pipeline(name="CT Pipeline")

def ct_pipeline(data_path: str):

validation = data_validation_op(data_path=data_path)

with dsl.Condition(validation.output == True):

train_model_op(data_path=data_path, model_output="/models/")

Level 2: CI/CDパイプラインの自動化

完全なMLOps自動化のステージです。コード、データ、モデルがすべてバージョン管理され、CI/CD/CTが完全に自動化されます。

**自動化トリガー条件**:

- 新しい学習データの到達(スケジュールまたはデータ量のしきい値)

- モデルパフォーマンス指標の低下検出

- データドリフトの検出

- コード変更(新機能、アルゴリズムの改善)

**Level 2アーキテクチャ**:

ソースコード変更またはデータトリガー

CIパイプライン(テスト、ビルド)

CDパイプライン(パイプラインのデプロイ)

CTパイプライン(自動再学習)

モデル評価 → 合格/不合格ゲート

モデルレジストリへの登録

Staging → Productionへの昇格

モニタリングとアラート

実験追跡: MLflow と Weights and Biases

MLflow完全ガイド

MLflowはMLライフサイクル管理のためのオープンソースプラットフォームです。4つのコアコンポーネントで構成されています。

MLflow Tracking

実験パラメータ、メトリクス、アーティファクトを追跡します。

from sklearn.ensemble import RandomForestClassifier

from sklearn.metrics import accuracy_score, f1_score

MLflow Trackingサーバーの設定

mlflow.set_tracking_uri("http://mlflow-server:5000")

mlflow.set_experiment("fraud-detection-v2")

with mlflow.start_run(run_name="rf-baseline") as run:

ハイパーパラメータのログ

params = {

"n_estimators": 100,

"max_depth": 10,

"min_samples_split": 5,

"random_state": 42

}

mlflow.log_params(params)

モデル学習

model = RandomForestClassifier(**params)

model.fit(X_train, y_train)

メトリクスのログ

y_pred = model.predict(X_test)

metrics = {

"accuracy": accuracy_score(y_test, y_pred),

"f1_score": f1_score(y_test, y_pred, average="weighted"),

}

mlflow.log_metrics(metrics)

モデルの保存(署名付き)

mlflow.sklearn.log_model(

sk_model=model,

artifact_path="model",

registered_model_name="fraud-detection",

input_example=X_test[:5],

signature=mlflow.models.infer_signature(X_train, y_pred)

)

カスタムアーティファクトのログ

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

cm = confusion_matrix(y_test, y_pred)

disp = ConfusionMatrixDisplay(confusion_matrix=cm)

disp.plot()

plt.savefig("confusion_matrix.png")

mlflow.log_artifact("confusion_matrix.png")

print(f"Run ID: {run.info.run_id}")

print(f"Accuracy: {metrics['accuracy']:.4f}")

MLflow Autolog

フレームワーク別の自動ログでボイラープレートを最小化:

フレームワーク自動検出とログ

mlflow.autolog()

PyTorch専用autolog

mlflow.pytorch.autolog(

log_every_n_epoch=1,

log_models=True,

disable=False,

log_datasets=True

)

XGBoost専用autolog

mlflow.xgboost.autolog(

log_input_examples=True,

log_model_signatures=True,

log_models=True,

log_datasets=True

)

MLflow Projects

再現可能なMLプロジェクトのパッケージング:

MLprojectファイル

name: fraud-detection

conda_env: conda.yaml

entry_points:

main:

parameters:

n_estimators: { type: int, default: 100 }

max_depth: { type: int, default: 10 }

data_path: { type: str, default: 'data/train.csv' }

command: 'python train.py --n_estimators {n_estimators} --max_depth {max_depth} --data_path {data_path}'

evaluate:

parameters:

model_uri: { type: str }

test_data: { type: str }

command: 'python evaluate.py --model_uri {model_uri} --test_data {test_data}'

Weights & Biases (W&B)

W&Bは実験追跡、可視化、ハイパーパラメータ最適化を提供するMLOpsプラットフォームです。

W&B実行の初期化

run = wandb.init(

project="image-classification",

config={

"learning_rate": 0.001,

"epochs": 50,

"batch_size": 32,

"architecture": "ResNet50"

}

)

W&B Sweepによるハイパーパラメータ最適化

sweep_config = {

"method": "bayes",

"metric": {"name": "val_accuracy", "goal": "maximize"},

"parameters": {

"learning_rate": {"min": 1e-5, "max": 1e-2},

"batch_size": {"values": [16, 32, 64]},

"dropout": {"min": 0.1, "max": 0.5}

}

}

sweep_id = wandb.sweep(sweep_config, project="image-classification")

wandb.agent(sweep_id, function=train_fn, count=50)

データバージョン管理: DVC

DVC(Data Version Control)はGitと連携して大規模データセットとMLパイプラインをバージョン管理するツールです。

DVCの仕組み

DVCは大容量ファイルをGitに直接保存する代わりに、`.dvc`メタデータファイル(ポインタ)を作成してGitにコミットします。実際のデータはS3、GCS、Azure Blob、SSHなどのリモートストレージに保存されます。

DVC初期化

git init

dvc init

リモートストレージの設定(S3)

dvc remote add -d myremote s3://my-bucket/dvc-store

dvc remote modify myremote region us-east-1

データファイルの追加

dvc add data/train.csv

git add data/train.csv.dvc .gitignore

git commit -m "Add training data v1"

dvc push

別の環境でデータを取得

git pull

dvc pull

DVCパイプライン(dvc.yaml)

再現可能なMLパイプラインの宣言的定義:

dvc.yaml

stages:

prepare:

cmd: python src/prepare.py --input data/raw.csv --output data/processed/

deps:

- src/prepare.py

- data/raw.csv

outs:

- data/processed/train.csv

- data/processed/test.csv

params:

- prepare:

- split_ratio

- random_seed

featurize:

cmd: python src/featurize.py

deps:

- src/featurize.py

- data/processed/train.csv

outs:

- data/features/train_features.pkl

params:

- featurize:

- max_features

- ngrams

train:

cmd: python src/train.py

deps:

- src/train.py

- data/features/train_features.pkl

outs:

- models/model.pkl

metrics:

- reports/metrics.json:

cache: false

params:

- train:

- n_estimators

- max_depth

- random_seed

evaluate:

cmd: python src/evaluate.py

deps:

- src/evaluate.py

- models/model.pkl

- data/processed/test.csv

metrics:

- reports/eval_metrics.json:

cache: false

plots:

- reports/plots/confusion_matrix.csv:

cache: false

DVC実験管理

パイプラインの実行

dvc repro

実験ブランチの作成

dvc exp run --set-param train.n_estimators=200 --name exp-200-trees

実験の比較

dvc exp show

メトリクス表の表示

dvc metrics show

dvc metrics diff

フィーチャーストア

フィーチャーストアは、MLフィーチャーを一元的に保存、共有、サービング(提供)するためのデータレイヤーです。

フィーチャーストアが必要な理由

- **学習/サービングスキューの解消**: 学習と推論で同一のフィーチャー変換を保証

- **フィーチャーの再利用**: チーム間でフィーチャーを共有し重複作業を排除

- **低レイテンシーサービング**: オンライン予測のためのリアルタイムフィーチャー検索

- **フィーチャーの一貫性**: バッチとストリーミングパイプライン間の一貫性を維持

オンラインストア vs オフラインストア

| 区分 | オンラインストア | オフラインストア |

| -------------- | -------------------------- | ------------------ |

| 目的 | リアルタイム推論サービング | モデル学習 |

| レイテンシー | ミリ秒 | 秒〜分 |

| ストレージ | Redis、DynamoDB、Cassandra | S3、BigQuery、Hive |

| データ量 | 最新状態(現在値) | 全履歴 |

| クエリパターン | 単件検索(キー基準) | バッチスキャン |

Feastフィーチャーストア

feature_repo/feature_store.yaml

project: fraud_detection

registry: data/registry.db

provider: local

online_store:

type: redis

connection_string: "localhost:6379"

offline_store:

type: bigquery

dataset: feast_dev

feature_repo/features.py

from datetime import timedelta

from feast import Entity, Feature, FeatureView, FileSource, ValueType

from feast.types import Float32, Int64

エンティティの定義

user = Entity(

name="user_id",

value_type=ValueType.INT64,

description="ユーザーID"

)

データソースの定義

user_stats_source = FileSource(

path="data/user_stats.parquet",

timestamp_field="event_timestamp",

created_timestamp_column="created"

)

フィーチャービューの定義

user_stats_fv = FeatureView(

name="user_stats",

entities=["user_id"],

ttl=timedelta(days=7),

features=[

Feature(name="transaction_count_7d", dtype=Float32),

Feature(name="avg_transaction_amount", dtype=Float32),

Feature(name="days_since_last_login", dtype=Int64),

Feature(name="account_age_days", dtype=Int64),

],

online=True,

source=user_stats_source,

tags={"team": "fraud", "version": "v2"},

)

フィーチャーストアの使用

from feast import FeatureStore

store = FeatureStore(repo_path="feature_repo/")

学習データの取得(オフライン)

entity_df = pd.DataFrame({

"user_id": [1001, 1002, 1003],

"event_timestamp": pd.to_datetime(["2026-03-01", "2026-03-01", "2026-03-01"])

})

training_df = store.get_historical_features(

entity_df=entity_df,

features=[

"user_stats:transaction_count_7d",

"user_stats:avg_transaction_amount",

"user_stats:days_since_last_login",

]

).to_df()

オンラインサービング - リアルタイムフィーチャー取得

feature_vector = store.get_online_features(

features=[

"user_stats:transaction_count_7d",

"user_stats:avg_transaction_amount",

],

entity_rows=[{"user_id": 1001}]

).to_dict()

フィーチャードリフト検出

from evidently import ColumnMapping

from evidently.report import Report

from evidently.metric_preset import DataDriftPreset

フィーチャードリフトレポートの生成

report = Report(metrics=[DataDriftPreset()])

report.run(

reference_data=reference_features,

current_data=current_features,

column_mapping=ColumnMapping(target="label")

)

report.save_html("feature_drift_report.html")

ドリフト結果の確認

results = report.as_dict()

drifted_features = [

col for col, info in results["metrics"][0]["result"]["drift_by_columns"].items()

if info["drift_detected"]

]

print(f"ドリフト検出されたフィーチャー: {drifted_features}")

モデルレジストリ

MLflow Model Registry

MLflow Model Registryは、モデルのバージョン管理、ステージ遷移、チームコラボレーションのための中央リポジトリです。

from mlflow.tracking import MlflowClient

client = MlflowClient()

新しいモデルの登録

model_uri = f"runs:/{run_id}/model"

model_version = mlflow.register_model(

model_uri=model_uri,

name="fraud-detection"

)

モデルの説明を追加

client.update_registered_model(

name="fraud-detection",

description="決済詐欺検出モデル - RandomForestベース"

)

client.update_model_version(

name="fraud-detection",

version=model_version.version,

description=f"Accuracy: 0.956, F1: 0.943 on test set"

)

Stagingへの遷移

client.transition_model_version_stage(

name="fraud-detection",

version=model_version.version,

stage="Staging",

archive_existing_versions=False

)

Stagingモデルのロードと検証

staging_model = mlflow.pyfunc.load_model(

model_uri="models:/fraud-detection/Staging"

)

staging_preds = staging_model.predict(X_val)

staging_accuracy = accuracy_score(y_val, staging_preds)

検証通過時にProductionへ昇格

if staging_accuracy > 0.95:

client.transition_model_version_stage(

name="fraud-detection",

version=model_version.version,

stage="Production",

archive_existing_versions=True

)

print(f"モデル v{model_version.version} のProduction昇格完了")

Hugging Face Hubモデルレジストリ

from huggingface_hub import HfApi

api = HfApi()

モデルのアップロード

api.upload_folder(

folder_path="./fine-tuned-model",

repo_id="myorg/sentiment-classifier-v2",

repo_type="model",

)

特定バージョンのタグ付け

api.create_tag(

repo_id="myorg/sentiment-classifier-v2",

tag="v2.1.0",

tag_message="Improved accuracy on edge cases"

)

ML向けCI/CD

GitHub Actions MLパイプライン

.github/workflows/ml-cicd.yml

name: ML CI/CD Pipeline

on:

push:

branches: [main, develop]

paths:

- 'src/**'

- 'params.yaml'

- 'dvc.yaml'

schedule:

- cron: '0 2 * * 1' # 毎週月曜午前2時に自動再学習

jobs:

test:

runs-on: ubuntu-latest

steps:

- uses: actions/checkout@v4

- name: Set up Python

uses: actions/setup-python@v4

with:

python-version: '3.11'

- name: Install dependencies

run: pip install -r requirements.txt

- name: Run unit tests

run: pytest tests/ -v --cov=src

- name: Data validation

run: python src/validate_data.py

train-and-evaluate:

needs: test

runs-on: ubuntu-latest

steps:

- uses: actions/checkout@v4

- name: Configure DVC remote

run: |

dvc remote modify myremote access_key_id ${{ secrets.AWS_ACCESS_KEY_ID }}

dvc remote modify myremote secret_access_key ${{ secrets.AWS_SECRET_ACCESS_KEY }}

- name: Pull data

run: dvc pull

- name: Run DVC pipeline

run: dvc repro

- name: Log metrics to MLflow

run: python src/log_results.py

env:

MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}

- name: Check model performance gate

run: |

python src/check_performance_gate.py \

--min-accuracy 0.95 \

--min-f1 0.93

- name: Push results

run: |

dvc push

git add reports/metrics.json dvc.lock

git commit -m "chore: update metrics [skip ci]"

git push

deploy-staging:

needs: train-and-evaluate

runs-on: ubuntu-latest

if: github.ref == 'refs/heads/main'

steps:

- name: Promote model to Staging

run: python src/promote_model.py --stage Staging

- name: Run integration tests

run: pytest tests/integration/ -v

- name: Deploy to staging endpoint

run: kubectl apply -f k8s/staging/

deploy-production:

needs: deploy-staging

runs-on: ubuntu-latest

environment: production

steps:

- name: Promote model to Production

run: python src/promote_model.py --stage Production

- name: Blue/Green deployment

run: ./scripts/blue_green_deploy.sh

- name: Smoke tests

run: pytest tests/smoke/ -v

自動再学習トリガー

src/check_retrain_trigger.py

from evidently.report import Report

from evidently.metric_preset import DataDriftPreset

def should_retrain(

current_data,

reference_data,

performance_threshold=0.92,

drift_threshold=0.3

) -> tuple[bool, str]:

"""再学習が必要かどうかを判断する"""

1. パフォーマンスベースのトリガー

current_metrics = get_current_metrics()

if current_metrics["accuracy"] < performance_threshold:

return True, f"パフォーマンス低下: accuracy={current_metrics['accuracy']:.3f}"

2. データドリフトトリガー

report = Report(metrics=[DataDriftPreset()])

report.run(reference_data=reference_data, current_data=current_data)

results = report.as_dict()

drift_share = results["metrics"][0]["result"]["share_of_drifted_columns"]

if drift_share > drift_threshold:

return True, f"データドリフト: {drift_share:.1%} のフィーチャーにドリフト検出"

return False, "再学習不要"

モデルモニタリングとドリフト検出

データドリフト vs コンセプトドリフト

**データドリフト(Data Drift)**: 入力フィーチャーの統計的分布が変化します。P(X)が変化しますがP(Y|X)は維持されます。例: ユーザー年齢分布の変化、取引金額分布の変化。

**コンセプトドリフト(Concept Drift)**: 入力と出力の関係が変化します。P(Y|X)が変化します。例: 新しい詐欺パターンの出現、ユーザー好みの変化。

Evidentlyによるドリフトモニタリング

from evidently.report import Report

from evidently.test_suite import TestSuite

from evidently import ColumnMapping

from evidently.metric_preset import (

DataDriftPreset,

DataQualityPreset,

TargetDriftPreset,

ClassificationPreset

)

from evidently.tests import (

TestNumberOfDriftedColumns,

TestShareOfDriftedColumns,

TestColumnDrift

)

カラムマッピングの設定

column_mapping = ColumnMapping(

target="fraud_label",

prediction="fraud_score",

numerical_features=["amount", "transaction_count_7d", "avg_amount"],

categorical_features=["merchant_category", "payment_method"]

)

総合ドリフトレポート

report = Report(metrics=[

DataDriftPreset(),

DataQualityPreset(),

TargetDriftPreset(),

ClassificationPreset()

])

report.run(

reference_data=reference_df,

current_data=production_df,

column_mapping=column_mapping

)

report.save_html("monitoring/report.html")

アラートテストスイート

test_suite = TestSuite(tests=[

TestNumberOfDriftedColumns(lt=3),

TestShareOfDriftedColumns(lt=0.3),

TestColumnDrift(column_name="amount"),

TestColumnDrift(column_name="transaction_count_7d"),

])

test_suite.run(

reference_data=reference_df,

current_data=production_df

)

テスト失敗時のアラート

results = test_suite.as_dict()

failed_tests = [t for t in results["tests"] if t["status"] == "FAIL"]

if failed_tests:

send_alert(f"モニタリングアラート: {len(failed_tests)}件のテストが失敗")

Prometheus + Grafanaメトリクス

src/monitoring/metrics.py

from prometheus_client import Counter, Histogram, Gauge

prediction_counter = Counter(

"model_predictions_total",

"総予測数",

["model_version", "result"]

)

prediction_latency = Histogram(

"model_prediction_latency_seconds",

"予測レイテンシー(秒)",

buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0]

)

model_accuracy = Gauge(

"model_accuracy_current",

"現在のモデル精度"

)

def predict_with_monitoring(features, model_version="v2.1"):

start_time = time.time()

prediction = model.predict(features)

latency = time.time() - start_time

prediction_latency.observe(latency)

prediction_counter.labels(

model_version=model_version,

result="fraud" if prediction[0] == 1 else "normal"

).inc()

return prediction

LLMOps

LLMOpsは大規模言語モデルの開発、デプロイ、運用のためのMLOpsの拡張です。

LLMパイプラインの特有の課題

- **非決定論的な出力**: 同じ入力でも異なる出力 → 評価が複雑

- **プロンプトの感度**: 小さな変更が大きなパフォーマンス差を生む

- **高コストなファインチューニング**: 大規模なGPUリソースが必要

- **ハルシネーション(幻覚)**: 事実と異なる情報の生成

- **コンテキスト長の管理**: 長いコンテキストの効率的な処理

LangSmithによるLLMトレーシング

from langchain_openai import ChatOpenAI

from langchain.prompts import ChatPromptTemplate

from langsmith import Client

LangSmithの設定

os.environ["LANGCHAIN_TRACING_V2"] = "true"

os.environ["LANGCHAIN_PROJECT"] = "production-chatbot"

os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"

LangChainチェーン(LangSmithに自動トレース記録)

llm = ChatOpenAI(model="gpt-4o", temperature=0.1)

prompt = ChatPromptTemplate.from_template(

"あなたは親切なカスタマーサービスエージェントです。\n\n質問: {question}\n\n回答:"

)

chain = prompt | llm

実行 - 自動トレーシング

response = chain.invoke({"question": "返金ポリシーを教えてください"})

LangSmithクライアントによる評価

langsmith_client = Client()

dataset = langsmith_client.create_dataset(

dataset_name="customer-service-eval",

description="カスタマーサービスチャットボット評価データセット"

)

langsmith_client.create_examples(

inputs=[{"question": "返金ポリシーを教えてください"}],

outputs=[{"answer": "購入後30日以内に返金が可能です。"}],

dataset_id=dataset.id

)

from langsmith.evaluation import evaluate, LangChainStringEvaluator

evaluators = [

LangChainStringEvaluator("cot_qa"),

LangChainStringEvaluator("labeled_criteria", config={"criteria": "correctness"})

]

results = evaluate(

lambda x: chain.invoke(x),

data=dataset.name,

evaluators=evaluators,

experiment_prefix="gpt4o-baseline"

)

プロンプトバージョン管理

prompt_registry.py

from dataclasses import dataclass

from typing import Optional

@dataclass

class PromptVersion:

template: str

version: str

description: str

metrics: Optional[dict] = None

class PromptRegistry:

def __init__(self, mlflow_uri: str):

mlflow.set_tracking_uri(mlflow_uri)

self.experiment_name = "prompt-versions"

mlflow.set_experiment(self.experiment_name)

def register_prompt(self, prompt: PromptVersion) -> str:

with mlflow.start_run(run_name=f"prompt-{prompt.version}") as run:

mlflow.log_param("version", prompt.version)

mlflow.log_param("description", prompt.description)

mlflow.log_text(prompt.template, "prompt_template.txt")

if prompt.metrics:

mlflow.log_metrics(prompt.metrics)

return run.info.run_id

def get_prompt(self, version: str) -> str:

client = mlflow.tracking.MlflowClient()

runs = client.search_runs(

experiment_ids=[mlflow.get_experiment_by_name(self.experiment_name).experiment_id],

filter_string=f"params.version = '{version}'"

)

if not runs:

raise ValueError(f"プロンプトバージョン {version} が見つかりません")

artifact_uri = runs[0].info.artifact_uri

return mlflow.artifacts.load_text(f"{artifact_uri}/prompt_template.txt")

使用例

registry = PromptRegistry("http://mlflow-server:5000")

registry.register_prompt(PromptVersion(

template="あなたは{role}です。{context}\n\n質問: {question}\n回答:",

version="v1.2.0",

description="コンテキスト注入付きプロンプトの改善",

metrics={"accuracy": 0.87, "hallucination_rate": 0.03}

))

LLMファインチューニングパイプライン

fine_tuning_pipeline.py

from transformers import AutoModelForCausalLM, TrainingArguments, Trainer

from peft import LoraConfig, get_peft_model, TaskType

def fine_tune_with_lora(

base_model: str,

dataset_path: str,

output_dir: str,

lora_r: int = 16,

lora_alpha: int = 32

):

mlflow.set_experiment("llm-fine-tuning")

with mlflow.start_run():

lora_config = LoraConfig(

task_type=TaskType.CAUSAL_LM,

r=lora_r,

lora_alpha=lora_alpha,

target_modules=["q_proj", "v_proj"],

lora_dropout=0.05,

bias="none"

)

mlflow.log_params({

"base_model": base_model,

"lora_r": lora_r,

"lora_alpha": lora_alpha

})

model = AutoModelForCausalLM.from_pretrained(base_model)

model = get_peft_model(model, lora_config)

model.print_trainable_parameters()

training_args = TrainingArguments(

output_dir=output_dir,

num_train_epochs=3,

per_device_train_batch_size=4,

gradient_accumulation_steps=4,

learning_rate=2e-4,

fp16=True,

report_to="mlflow"

)

trainer = Trainer(

model=model,

args=training_args,

train_dataset=train_dataset,

)

trainer.train()

model.save_pretrained(output_dir)

mlflow.transformers.log_model(

transformers_model={"model": model, "tokenizer": tokenizer},

artifact_path="fine-tuned-model",

registered_model_name="customer-service-llm"

)

クイズ

**答え**: データトリガー、パフォーマンストリガー、ドリフトトリガー、スケジュールトリガー

**解説**:

1. **データトリガー**: 新しい学習データが特定のしきい値(例: 10万件)に達した時、または新しいデータバッチがパイプラインに流入した時に自動再学習が開始されます。

2. **パフォーマンストリガー**: 本番モデルの精度やF1スコアなどが事前定義されたしきい値(例: accuracy < 0.92)を下回った時にトリガーされます。

3. **ドリフトトリガー**: Evidentlyなどのツールで検出されたデータドリフト率がしきい値(例: 30%以上のフィーチャーにドリフト)を超えた時にトリガーされます。

4. **スケジュールトリガー**: ビジネス要件に基づいた定期的な再学習(例: 毎週月曜の午前2時)でデータの鮮度を維持します。

**答え**: 学習と推論の異なるアクセスパターンとパフォーマンス要件をそれぞれ最適化するためです。

**解説**:

- **オフラインストア**(S3、BigQuery)はモデル学習用です。数百万件の履歴データをバッチでスキャンする必要があるため、大量処理とコスト効率が重要です。高いレイテンシー(秒〜分)を許容できます。

- **オンラインストア**(Redis、DynamoDB)はリアルタイム推論用です。特定のエンティティ(ユーザーID、商品ID)の最新フィーチャーを数ミリ秒以内に取得する必要があるため、低レイテンシーの単件検索に最適化されています。

- 分離しないと、学習時の大容量スキャンクエリがリアルタイム推論に干渉するか、リアルタイム要件に合わせることでコストが急増します。

**答え**: データドリフトはP(X)の変化、コンセプトドリフトはP(Y|X)の変化です。

**解説**:

- **データドリフト**: 入力フィーチャーの統計的分布が変化します。Kolmogorov-Smirnov検定、PSI(Population Stability Index)、JS Divergenceなどで検出します。ラベルなしでも検出可能で、EvidentlyのDataDriftPresetが代表的です。

- **コンセプトドリフト**: 同じ入力に対する正しい出力が変化します。例えば詐欺検出では新しい詐欺パターンが出現すると既存のモデルが機能しなくなります。実際のラベルが必要で、モデルパフォーマンス(accuracy、F1)の低下として検出します。ラベルが遅れて届く場合はプロキシメトリクスを活用します。

**答え**: ポインタ(メタデータ)ファイルをGitに保存し、実際のデータはリモートストレージに保存します。

**解説**:

DVCは大容量ファイル(データセット、モデル)をGitに直接保存しません。代わりに`.dvc`拡張子のメタデータファイルを作成してGitで追跡します。このファイルには実際のデータのMD5ハッシュ、サイズ、パスなどが保存されます。実際のデータはS3、GCS、Azure Blobなどのリモートストレージに`dvc push`でアップロードされます。別の環境でも`dvc pull`で全く同じバージョンのデータをダウンロードできます。Gitコミットとデータバージョンが1対1で紐付けられ、実験の再現性が保証されます。

**答え**: パフォーマンス検証、公平性検証、統合テスト、レイテンシーテスト、データスキーマ互換性の確認

**解説**:

1. **パフォーマンス検証**: ホールドアウトテストセットまたは最近の本番データで、精度、F1、AUCなどが現在のProductionモデルと同等以上であることを確認します。

2. **公平性検証**: 特定の人口集団や年齢層などでパフォーマンスのバイアスがないか、スライス別のメトリクスを確認します。

3. **統合テスト**: 実際のサービング環境(API、フィーチャーストア接続)でエンドツーエンドの予測が正常に動作するか確認します。

4. **レイテンシーテスト**: 平均応答時間とP99レイテンシーがSLAを満たすか、負荷テストを実施します。

5. **スキーマ互換性**: 入力フィーチャースキーマと出力形式が現在のサービングインフラと互換性があるか確認します。

현재 단락 (1/673)

1. [MLOps概要と成熟度モデル](#mlops概要と成熟度モデル)

작성 글자: 0원문 글자: 20,391작성 단락: 0/673