Skip to content
Published on

BentoMLでMLモデルサービングパイプラインを構築する:パッケージングからKubernetesデプロイまで

Authors
  • Name
    Twitter

はじめに

MLモデルを学習させることとプロダクションでサービングすることは全く別の問題です。BentoMLはこのギャップを埋めるフレームワークで、モデルをAPIサービスとしてパッケージングし、どこにでもデプロイできるようにします。Flask/FastAPIで直接APIを構築するよりもはるかに体系的なアプローチを提供します。

BentoML vs 直接実装

項目Flask/FastAPI直接実装BentoML
API実装手動(ルーティング、シリアライゼーション)デコレータベースの自動化
モデルバージョン管理自前で実装が必要内蔵Model Store
バッチ処理自前で実装Adaptive Batching内蔵
DockerビルドDockerfileの手動作成自動生成
GPUサポート手動設定宣言的設定

インストールと基本的な使い方

pip install bentoml

モデルの保存

# save_model.py
import bentoml
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris

# モデルの学習
X, y = load_iris(return_X_y=True)
model = RandomForestClassifier(n_estimators=100)
model.fit(X, y)

# BentoML Model Storeに保存
saved_model = bentoml.sklearn.save_model(
    "iris_classifier",
    model,
    signatures={"predict": {"batchable": True}},
    labels={"owner": "ml-team", "stage": "production"},
    metadata={"accuracy": 0.96, "dataset": "iris"},
)

print(f"Model saved: {saved_model}")
# Model saved: Model(tag="iris_classifier:abc123")
# 保存されたモデルの確認
bentoml models list
# Tag                          Module    Size    Creation Time
# iris_classifier:abc123       sklearn   1.2MB   2026-03-03 05:00:00

Serviceの定義

# service.py
import bentoml
import numpy as np
from typing import Annotated

@bentoml.service(
    resources={"cpu": "2", "memory": "1Gi"},
    traffic={"timeout": 30, "concurrency": 32},
)
class IrisClassifier:
    model = bentoml.models.get("iris_classifier:latest")

    def __init__(self):
        self.clf = bentoml.sklearn.load_model(self.model)
        self.target_names = ["setosa", "versicolor", "virginica"]

    @bentoml.api
    def predict(
        self,
        features: Annotated[np.ndarray, bentoml.validators.Shape((4,))],
    ) -> dict:
        prediction = self.clf.predict([features])[0]
        probabilities = self.clf.predict_proba([features])[0]
        return {
            "class": self.target_names[prediction],
            "probability": float(max(probabilities)),
            "all_probabilities": {
                name: float(prob)
                for name, prob in zip(self.target_names, probabilities)
            },
        }

    @bentoml.api
    def predict_batch(
        self,
        features: Annotated[np.ndarray, bentoml.validators.Shape((-1, 4))],
    ) -> list[dict]:
        predictions = self.clf.predict(features)
        probabilities = self.clf.predict_proba(features)
        return [
            {
                "class": self.target_names[pred],
                "probability": float(max(probs)),
            }
            for pred, probs in zip(predictions, probabilities)
        ]
# ローカルサービング
bentoml serve service:IrisClassifier

# テスト
curl -X POST http://localhost:3000/predict \
  -H "Content-Type: application/json" \
  -d '{"features": [5.1, 3.5, 1.4, 0.2]}'
# {"class": "setosa", "probability": 0.98, ...}

LLMサービング — OpenLLM連携

# llm_service.py
import bentoml
from vllm import LLM, SamplingParams

@bentoml.service(
    resources={"gpu": 1, "gpu_type": "nvidia-a100"},
    traffic={"timeout": 120, "concurrency": 16},
)
class LLMService:
    def __init__(self):
        self.llm = LLM(
            model="meta-llama/Llama-3.1-8B-Instruct",
            tensor_parallel_size=1,
            max_model_len=8192,
            gpu_memory_utilization=0.9,
        )

    @bentoml.api
    async def generate(self, prompt: str, max_tokens: int = 512) -> str:
        sampling_params = SamplingParams(
            temperature=0.7,
            top_p=0.9,
            max_tokens=max_tokens,
        )
        outputs = self.llm.generate([prompt], sampling_params)
        return outputs[0].outputs[0].text

    @bentoml.api
    async def chat(self, messages: list[dict]) -> str:
        prompt = self._format_chat(messages)
        return await self.generate(prompt)

    def _format_chat(self, messages):
        formatted = ""
        for msg in messages:
            role = msg["role"]
            content = msg["content"]
            formatted += f"<|{role}|>\n{content}\n"
        formatted += "<|assistant|>\n"
        return formatted

マルチモデルパイプライン

# pipeline_service.py
import bentoml
import numpy as np
from PIL import Image

@bentoml.service(resources={"cpu": "4", "memory": "4Gi"})
class ImageClassificationPipeline:
    # 複数のモデルを組み合わせ
    preprocessor = bentoml.depends(ImagePreprocessor)
    classifier = bentoml.depends(ImageClassifier)
    postprocessor = bentoml.depends(ResultPostprocessor)

    @bentoml.api
    async def classify(self, image: Image.Image) -> dict:
        # 1. 前処理
        features = await self.preprocessor.process(image)

        # 2. 分類
        raw_result = await self.classifier.predict(features)

        # 3. 後処理
        result = await self.postprocessor.format(raw_result)

        return result

@bentoml.service(resources={"cpu": "1"})
class ImagePreprocessor:
    @bentoml.api
    async def process(self, image: Image.Image) -> np.ndarray:
        img = image.resize((224, 224))
        arr = np.array(img) / 255.0
        return arr.transpose(2, 0, 1)

@bentoml.service(resources={"gpu": 1})
class ImageClassifier:
    model = bentoml.models.get("resnet50:latest")

    def __init__(self):
        import torch
        self.model = bentoml.pytorch.load_model(self.model)
        self.model.eval()
        self.device = torch.device("cuda")
        self.model.to(self.device)

    @bentoml.api
    async def predict(self, features: np.ndarray) -> np.ndarray:
        import torch
        tensor = torch.tensor(features).unsqueeze(0).float().to(self.device)
        with torch.no_grad():
            output = self.model(tensor)
        return output.cpu().numpy()

BentoビルドとDocker

bentofile.yaml

# bentofile.yaml
service: 'service:IrisClassifier'
labels:
  owner: ml-team
  project: iris-classifier
include:
  - '*.py'
python:
  packages:
    - scikit-learn==1.5.0
    - numpy
docker:
  python_version: '3.11'
  system_packages:
    - libgomp1
  env:
    BENTOML_PORT: '3000'
# Bentoビルド
bentoml build

# ビルドされたBentoの確認
bentoml list
# Tag                              Size     Creation Time
# iris_classifier_service:xyz789   45MB     2026-03-03

# Dockerイメージの生成
bentoml containerize iris_classifier_service:latest

# Dockerで実行
docker run -p 3000:3000 iris_classifier_service:latest

Kubernetesデプロイ

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: iris-classifier
  namespace: ml-serving
spec:
  replicas: 3
  selector:
    matchLabels:
      app: iris-classifier
  template:
    metadata:
      labels:
        app: iris-classifier
    spec:
      containers:
        - name: bento
          image: registry.example.com/iris_classifier_service:latest
          ports:
            - containerPort: 3000
          resources:
            requests:
              cpu: '1'
              memory: '1Gi'
            limits:
              cpu: '2'
              memory: '2Gi'
          readinessProbe:
            httpGet:
              path: /healthz
              port: 3000
            initialDelaySeconds: 10
          livenessProbe:
            httpGet:
              path: /healthz
              port: 3000
            initialDelaySeconds: 30
---
apiVersion: v1
kind: Service
metadata:
  name: iris-classifier
  namespace: ml-serving
spec:
  selector:
    app: iris-classifier
  ports:
    - port: 80
      targetPort: 3000
  type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: iris-classifier
  namespace: ml-serving
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: iris-classifier
  minReplicas: 2
  maxReplicas: 10
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70

Adaptive Batching

BentoMLの中核機能で、複数のリクエストを自動的にまとめてGPU利用率を最大化します。

@bentoml.service(
    traffic={
        "timeout": 30,
    },
)
class EmbeddingService:
    model = bentoml.models.get("sentence-transformer:latest")

    def __init__(self):
        from sentence_transformers import SentenceTransformer
        self.model = SentenceTransformer(self.model.path)

    @bentoml.api(
        batchable=True,
        batch_dim=0,
        max_batch_size=64,
        max_latency_ms=100,
    )
    async def encode(self, texts: list[str]) -> np.ndarray:
        # 個別のリクエストが自動的にバッチにまとめられて実行される
        embeddings = self.model.encode(texts)
        return embeddings

モニタリング

# カスタムメトリクスの追加
import bentoml
from prometheus_client import Counter, Histogram

prediction_counter = Counter(
    "predictions_total", "Total predictions", ["model", "class"]
)
latency_histogram = Histogram(
    "prediction_latency_seconds", "Prediction latency"
)

@bentoml.service
class MonitoredClassifier:
    @bentoml.api
    def predict(self, features: np.ndarray) -> dict:
        with latency_histogram.time():
            result = self.clf.predict([features])[0]
            prediction_counter.labels(
                model="iris_v1", class_name=result
            ).inc()
            return {"class": result}
# Prometheusメトリクスエンドポイント
curl http://localhost:3000/metrics

まとめ

BentoMLはMLモデルサービングの複雑さを大幅に軽減します:

  • 簡単なAPI実装: デコレータベースで数行でREST APIを作成
  • モデルバージョン管理: 内蔵Model Storeで体系的に管理
  • Adaptive Batching: GPU利用率を最大化
  • Docker自動化: bentofile.yamlで再現可能なビルド
  • Kubernetesネイティブ: HPAと連携した自動スケーリング

クイズ:BentoML理解度チェック(7問)

Q1. BentoMLのModel Storeとは?

学習済みモデルをバージョン管理とメタデータと共にローカルに保存するリポジトリです。bentoml.sklearn.save_model()などで保存します。

Q2. Adaptive Batchingの動作原理は?

個別のリクエストを自動的に収集し、max_batch_sizeまたはmax_latency_msに達した時点で一括処理することで、GPU効率を最大化します。

Q3. bentoml.depends()の役割は?

マルチモデルパイプラインで他のBentoMLサービスを依存性として注入し、サービス間の通信を自動管理します。

Q4. bentofile.yamlで定義するものは?

サービスのエントリポイント、Pythonパッケージの依存関係、Docker設定、含めるファイルなどを宣言します。

Q5. BentoMLの/healthzエンドポイントの用途は?

KubernetesのReadiness/Liveness Probeに使用して、サービスの準備状態と生存確認を行います。

Q6. GPUリソースを指定する方法は?

@bentoml.service(resources={"gpu": 1, "gpu_type": "nvidia-a100"})デコレータで宣言します。

Q7. BentoMLがFlask/FastAPI直接実装より優れている点は?

モデルバージョン管理、Adaptive Batching、Docker自動ビルド、宣言的リソース管理などが内蔵されており、プロダクション対応が迅速です。