Skip to content
Published on

チャットボット: MCPガードレールと評価ハンドブック 2026

Authors
  • Name
    Twitter
チャットボット: MCPガードレールと評価ハンドブック 2026

MCPとは何か、なぜガードレールが必要なのか

Model Context Protocol(MCP)は、Anthropicが2024年11月に公開したオープンプロトコルで、LLMアプリケーションが外部データソースやツールに標準化された方式で接続できるように設計されている。MCP以前は、各LLMフレームワークが独自のtool/pluginインターフェースを使用しており、ホストアプリケーションはツールごとに個別の連携コードを記述する必要があった。

MCPはClient-Server構造に従う。

  • MCP Host: LLMを内蔵したアプリケーション(Claude Desktop、IDE拡張、チャットボットサービス)
  • MCP Client: Host内部でMCP Serverとの接続を管理するプロトコルクライアント
  • MCP Server: 外部ツール、データソース、APIをMCPプロトコルで公開するサーバー

MCPがtool callingを標準化した以上、セキュリティ上の脅威も標準化された方法で管理する必要がある。MCP Serverが提供するツールの範囲が広がるほど、「どのツールを誰にどの条件で許可するか」というガードレール設計が重要になる。

本ハンドブックでは、MCPベースのチャットボットにおけるガードレール設計、評価体系、運用手順を一つのドキュメントにまとめる。

MCP Serverの実装: ツール公開とアクセス制御

基本的なMCP Server実装(Python SDK)

"""
MCP Serverの例: 顧客の注文照会とFAQ検索ツールを提供する。
mcpパッケージ(v1.2+)を使用する。
pip install mcp
"""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import json
from typing import Any

# MCP Serverインスタンスの作成
app = Server("customer-support-tools")

@app.list_tools()
async def list_tools() -> list[Tool]:
    """MCP Clientが利用可能なツール一覧を要求した際に呼び出される。"""
    return [
        Tool(
            name="order_lookup",
            description="注文番号で注文状況を照会します。注文番号はORD-で始まる形式です。",
            inputSchema={
                "type": "object",
                "properties": {
                    "order_id": {
                        "type": "string",
                        "description": "注文番号(例: ORD-20260304-001)",
                        "pattern": r"^ORD-\d{8}-\d{3,6}$",
                    },
                },
                "required": ["order_id"],
                "additionalProperties": False,
            },
        ),
        Tool(
            name="faq_search",
            description="カスタマーサポートFAQから関連する回答を検索します。",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "検索する質問",
                        "minLength": 2,
                        "maxLength": 200,
                    },
                    "category": {
                        "type": "string",
                        "enum": ["payment", "shipping", "return", "account"],
                        "description": "FAQカテゴリ",
                    },
                },
                "required": ["query"],
                "additionalProperties": False,
            },
        ),
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
    """MCP Clientがツール実行を要求した際に呼び出される。"""
    if name == "order_lookup":
        # 実際の実装ではDB照会を行う
        order_id = arguments["order_id"]
        result = await lookup_order_from_db(order_id)
        return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]

    elif name == "faq_search":
        query = arguments["query"]
        category = arguments.get("category")
        results = await search_faq_index(query, category)
        return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False))]

    else:
        raise ValueError(f"Unknown tool: {name}")

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await app.run(read_stream, write_stream)

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

MCPガードレールGateway設計

MCP ClientとMCP Serverの間にGatewayを配置し、すべてのtool callに対するガードレール検証を実行する。このGatewayが本ハンドブックのコアコンポーネントである。

"""
MCP Gateway: MCP Client -> Gateway -> MCP Server間で
すべてのtool callをインターセプトしてガードレール検証を実行する。
"""
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from enum import Enum
import re
import json
import logging

logger = logging.getLogger("mcp_gateway")

class GatewayDecision(Enum):
    ALLOW = "allow"
    DENY = "deny"
    REQUIRE_APPROVAL = "require_approval"
    RATE_LIMITED = "rate_limited"

@dataclass
class GatewayPolicy:
    """MCP Gatewayポリシー定義"""
    # ツール別アクセス制御
    tool_permissions: Dict[str, List[str]] = field(default_factory=dict)
    # 例: {"order_lookup": ["customer", "agent", "admin"],
    #      "order_cancel": ["agent", "admin"]}

    # ツール別呼び出し制限
    rate_limits: Dict[str, int] = field(default_factory=dict)
    # 例: {"order_lookup": 30, "faq_search": 60}  # per minute

    # 引数サニタイズルール
    argument_sanitizers: Dict[str, dict] = field(default_factory=dict)

    # 高リスクツールリスト(ユーザー確認が必要)
    high_risk_tools: List[str] = field(default_factory=list)

    # ブロックパターン
    blocked_argument_patterns: List[str] = field(default_factory=list)

# デフォルトポリシー
DEFAULT_POLICY = GatewayPolicy(
    tool_permissions={
        "order_lookup": ["customer", "agent", "admin"],
        "faq_search": ["customer", "agent", "admin"],
        "order_cancel": ["agent", "admin"],
        "refund_process": ["admin"],
        "user_data_export": ["admin"],
    },
    rate_limits={
        "order_lookup": 30,
        "faq_search": 60,
        "order_cancel": 5,
        "refund_process": 3,
        "user_data_export": 1,
    },
    high_risk_tools=["order_cancel", "refund_process", "user_data_export"],
    blocked_argument_patterns=[
        r"(?i)(drop|delete|truncate)\s+(table|database)",
        r"(?i)(union\s+select|or\s+1\s*=\s*1)",
        r"\.\./",  # path traversal
        r"<script",  # XSS
    ],
)

class MCPGateway:
    def __init__(self, policy: GatewayPolicy = DEFAULT_POLICY):
        self.policy = policy
        self._call_history: Dict[str, List[datetime]] = {}

    def evaluate(
        self,
        tool_name: str,
        arguments: Dict[str, Any],
        user_role: str,
        session_id: str,
    ) -> dict:
        """
        Tool callに対するガードレール検証を実行する。
        Returns: {"decision": GatewayDecision, "reason": str, "sanitized_args": dict}
        """
        # 1. パーミッション検証
        allowed_roles = self.policy.tool_permissions.get(tool_name, [])
        if not allowed_roles:
            return self._deny(f"Tool '{tool_name}' is not registered in gateway policy")
        if user_role not in allowed_roles:
            return self._deny(
                f"Role '{user_role}' is not authorized for tool '{tool_name}'. "
                f"Required: {allowed_roles}"
            )

        # 2. レートリミット検証
        rate_limit = self.policy.rate_limits.get(tool_name, 60)
        if self._is_rate_limited(session_id, tool_name, rate_limit):
            return {
                "decision": GatewayDecision.RATE_LIMITED,
                "reason": f"Rate limit exceeded: {rate_limit}/min for '{tool_name}'",
                "sanitized_args": arguments,
            }

        # 3. 引数サニタイズ
        sanitized_args, blocked_patterns = self._sanitize_arguments(arguments)
        if blocked_patterns:
            logger.warning(
                f"Blocked patterns detected in tool '{tool_name}': {blocked_patterns}"
            )
            return self._deny(
                f"Potentially malicious argument patterns detected: {blocked_patterns}"
            )

        # 4. 高リスクツールの確認
        if tool_name in self.policy.high_risk_tools:
            return {
                "decision": GatewayDecision.REQUIRE_APPROVAL,
                "reason": f"Tool '{tool_name}' requires user confirmation before execution",
                "sanitized_args": sanitized_args,
            }

        return {
            "decision": GatewayDecision.ALLOW,
            "reason": "All checks passed",
            "sanitized_args": sanitized_args,
        }

    def _is_rate_limited(self, session_id: str, tool_name: str, limit: int) -> bool:
        key = f"{session_id}:{tool_name}"
        now = datetime.utcnow()
        cutoff = now - timedelta(minutes=1)

        if key not in self._call_history:
            self._call_history[key] = []

        self._call_history[key] = [
            t for t in self._call_history[key] if t > cutoff
        ]

        if len(self._call_history[key]) >= limit:
            return True

        self._call_history[key].append(now)
        return False

    def _sanitize_arguments(self, arguments: Dict[str, Any]) -> tuple:
        blocked = []
        args_str = json.dumps(arguments)

        for pattern in self.policy.blocked_argument_patterns:
            if re.search(pattern, args_str):
                blocked.append(pattern)

        return arguments, blocked

    def _deny(self, reason: str) -> dict:
        return {
            "decision": GatewayDecision.DENY,
            "reason": reason,
            "sanitized_args": {},
        }

MCPツール評価フレームワーク

MCP Serverが提供するツールが正しく動作しているか、ガードレールが攻撃を遮断しているかを体系的に評価する必要がある。

ツール正確性評価

"""
MCPツールの機能的正確性を評価するテストスイート。
各ツールについて正常ケース、エッジケース、エラーケースを検証する。
"""
import pytest
from dataclasses import dataclass
from typing import Any, Optional, List

@dataclass
class ToolTestCase:
    """単一ツールテストケース"""
    test_id: str
    tool_name: str
    arguments: dict
    expected_status: str  # success, error, validation_error
    expected_output_contains: Optional[str] = None
    expected_error_type: Optional[str] = None
    description: str = ""

TOOL_TEST_CASES = [
    # 正常ケース
    ToolTestCase(
        test_id="order_lookup_valid",
        tool_name="order_lookup",
        arguments={"order_id": "ORD-20260304-001"},
        expected_status="success",
        expected_output_contains="order_status",
        description="有効な注文番号での照会",
    ),
    # エッジケース: 存在しない注文
    ToolTestCase(
        test_id="order_lookup_not_found",
        tool_name="order_lookup",
        arguments={"order_id": "ORD-99991231-999"},
        expected_status="success",
        expected_output_contains="not_found",
        description="存在しない注文番号での照会時に正常応答を返す",
    ),
    # エラーケース: 不正なフォーマット
    ToolTestCase(
        test_id="order_lookup_invalid_format",
        tool_name="order_lookup",
        arguments={"order_id": "INVALID-FORMAT"},
        expected_status="validation_error",
        expected_error_type="pattern_mismatch",
        description="注文番号の形式が不正な場合",
    ),
    # セキュリティケース: SQLインジェクション試行
    ToolTestCase(
        test_id="order_lookup_sql_injection",
        tool_name="order_lookup",
        arguments={"order_id": "ORD-20260304-001' OR '1'='1"},
        expected_status="validation_error",
        expected_error_type="pattern_mismatch",
        description="SQLインジェクション試行のブロック",
    ),
    # FAQ正常検索
    ToolTestCase(
        test_id="faq_search_valid",
        tool_name="faq_search",
        arguments={"query": "返金手続き", "category": "return"},
        expected_status="success",
        expected_output_contains="返金",
        description="有効なFAQ検索",
    ),
    # FAQ空の結果
    ToolTestCase(
        test_id="faq_search_no_results",
        tool_name="faq_search",
        arguments={"query": "xyzzyspoon"},
        expected_status="success",
        expected_output_contains="no_results",
        description="検索結果がない場合の正常な空応答",
    ),
]

@pytest.mark.parametrize("case", TOOL_TEST_CASES, ids=lambda c: c.test_id)
async def test_mcp_tool(mcp_client, case):
    """MCPツールの機能的正確性を検証する。"""
    try:
        result = await mcp_client.call_tool(case.tool_name, case.arguments)

        if case.expected_status == "validation_error":
            pytest.fail(
                f"Expected validation error but tool executed successfully: {result}"
            )

        if case.expected_output_contains:
            result_text = str(result)
            assert case.expected_output_contains in result_text, (
                f"Expected output to contain '{case.expected_output_contains}', "
                f"got: {result_text[:200]}"
            )

    except Exception as e:
        if case.expected_status == "validation_error":
            if case.expected_error_type:
                assert case.expected_error_type in str(type(e).__name__).lower() or \
                       case.expected_error_type in str(e).lower(), (
                    f"Expected error type '{case.expected_error_type}', got: {type(e).__name__}: {e}"
                )
        else:
            raise

ガードレール効果の測定

"""
MCP Gatewayガードレールの効果を定量的に測定する。
攻撃シナリオデータセットに対するブロック率、誤検出率を計算する。
"""
from typing import List, Dict
from dataclasses import dataclass

@dataclass
class GuardrailEvalResult:
    total_scenarios: int
    true_positives: int     # 攻撃を正常にブロック
    false_positives: int    # 正常リクエストを誤ブロック
    true_negatives: int     # 正常リクエストを正常に通過
    false_negatives: int    # 攻撃を見逃し
    precision: float
    recall: float
    f1_score: float

def evaluate_guardrails(
    gateway,
    attack_scenarios: List[dict],
    benign_scenarios: List[dict],
) -> GuardrailEvalResult:
    """
    攻撃シナリオと正常シナリオに対してガードレールの精度/再現率を測定する。
    """
    tp = fp = tn = fn = 0

    # 攻撃シナリオ: ブロックされるべき
    for scenario in attack_scenarios:
        result = gateway.evaluate(
            tool_name=scenario["tool_name"],
            arguments=scenario["arguments"],
            user_role=scenario.get("user_role", "customer"),
            session_id=scenario.get("session_id", "eval"),
        )
        if result["decision"] in (GatewayDecision.DENY, GatewayDecision.RATE_LIMITED):
            tp += 1
        else:
            fn += 1
            print(f"MISS: Attack scenario not blocked: {scenario.get('name', 'unknown')}")

    # 正常シナリオ: 通過されるべき
    for scenario in benign_scenarios:
        result = gateway.evaluate(
            tool_name=scenario["tool_name"],
            arguments=scenario["arguments"],
            user_role=scenario.get("user_role", "customer"),
            session_id=scenario.get("session_id", "eval"),
        )
        if result["decision"] == GatewayDecision.ALLOW:
            tn += 1
        elif result["decision"] == GatewayDecision.REQUIRE_APPROVAL:
            tn += 1  # 高リスクツールの確認要求は正常動作
        else:
            fp += 1
            print(f"FALSE POSITIVE: Benign scenario blocked: {scenario.get('name', 'unknown')}")

    total = tp + fp + tn + fn
    precision = tp / max(tp + fp, 1)
    recall = tp / max(tp + fn, 1)
    f1 = 2 * precision * recall / max(precision + recall, 1e-9)

    return GuardrailEvalResult(
        total_scenarios=total,
        true_positives=tp,
        false_positives=fp,
        true_negatives=tn,
        false_negatives=fn,
        precision=round(precision, 4),
        recall=round(recall, 4),
        f1_score=round(f1, 4),
    )

MCPセキュリティ脅威モデルと対策

MCPアーキテクチャで発生しうるセキュリティ上の脅威を体系的に整理する。

脅威タイプ攻撃ベクター影響対策
Tool Poisoning悪意あるMCP Serverが正規ツールに偽装データ流出、システム乗っ取りServer証明書検証、ツールホワイトリスト
Rug Pull初期は正常動作し、後に悪意ある動作に切り替え信頼ベースのセキュリティ回避ツール動作の継続的モニタリング、ハッシュベースの整合性検証
Argument Injectionツール引数に悪意あるペイロードを挿入SQLインジェクション、コマンド実行JSON Schemaの厳格な検証、パターンブロック
Excessive Privilege不必要に広い権限でツールを公開意図しないデータアクセス最小権限の原則、ロールベースアクセス制御
Cross-Context Leakage他のセッションのコンテキストが漏洩個人情報の露出セッション分離、コンテキストスコーピング
Denial of Service大量のツール呼び出しによるサーバー過負荷サービス中断レートリミット、セッション別同時呼び出し制限

MCP Server運用モニタリング

# prometheus-rules.yaml
groups:
  - name: mcp_server_monitoring
    interval: 15s
    rules:
      - alert: MCPToolCallErrorRateHigh
        expr: |
          sum(rate(mcp_tool_call_total{status="error"}[5m])) by (tool_name) /
          sum(rate(mcp_tool_call_total[5m])) by (tool_name) > 0.05
        for: 3m
        labels:
          severity: warning
          team: chatbot
        annotations:
          summary: 'MCPツール{{ $labels.tool_name }}のエラー率が5%を超えています'
          description: |
            現在のエラー率: {{ $value | humanizePercentage }}
            ツール実装または下流サービスの状態を確認してください。

      - alert: MCPGatewayBlockRateSpike
        expr: |
          sum(rate(mcp_gateway_decision_total{decision="deny"}[15m])) /
          sum(rate(mcp_gateway_decision_total[15m])) > 0.20
        for: 5m
        labels:
          severity: warning
          team: security
        annotations:
          summary: 'MCP Gatewayのブロック率が20%を超えています'
          description: '攻撃試行の増加またはポリシーの誤設定の可能性があります。監査ログを確認してください。'

      - alert: MCPToolLatencyHigh
        expr: |
          histogram_quantile(0.95,
            rate(mcp_tool_call_duration_seconds_bucket[5m])
          ) > 3.0
        for: 3m
        labels:
          severity: warning
          team: chatbot
        annotations:
          summary: 'MCPツール呼び出しp95レイテンシーが3秒を超えています'

MCPガードレール導入チェックリスト

MCPベースのチャットボットにガードレールを導入する際、順番に確認する。

Phase 1: MCP Serverセキュリティ基盤

  • すべてのMCP ServerにTLS証明書を適用
  • 各ツールのinputSchemaにadditionalProperties: falseを設定
  • 各ツールの引数にtype、pattern、maxLength、enum制約を明示
  • 既知の悪意あるパターン(SQLインジェクション、パストラバーサル)のブロックルールを配布

Phase 2: Gatewayポリシー配布

  • MCP Gatewayを配備し、すべてのtool callがGateway経由であることを確認
  • ロールベースのツールアクセス制御ポリシーを設定
  • ツール別レートリミットを設定
  • 高リスクツールにhuman-in-the-loop確認フローを適用

Phase 3: モニタリングと評価

  • 監査ログの収集と保持ポリシーを策定(最低90日)
  • Prometheusアラートルールを配布
  • Red teamテストスイートをCIに統合
  • ガードレールの精度/再現率を測定(目標: precision 0.95超、recall 0.98超)

Phase 4: 継続運用

  • 週1回の監査ログレビュー
  • 月1回のRed teamテストシナリオ更新
  • 四半期1回のツール権限マトリクスレビュー
  • MCP SDKセキュリティパッチ適用手順の策定

障害シナリオ別対応

シナリオ1: MCP Server接続失敗によるツール使用不可

症状: チャットボットがすべてのツール呼び出しに失敗。ユーザーに「機能を利用できません」と繰り返し表示。
エラーログ:
  MCPConnectionError: Failed to connect to MCP server at unix:///tmp/mcp-customer-support.sock
  Timeout after 5000ms

原因: MCP ServerプロセスがOOMで終了

解決:
  1. MCP Serverプロセスの再起動(systemdまたはsupervisor)
  2. メモリ制限の設定とOOM killerの優先度調整
  3. グレースフルデグラデーション: MCP Server障害時にツールなしの一般会話モードに切り替え
  4. ヘルスチェックエンドポイントの追加と自動再起動の設定

シナリオ2: ガードレールの誤検出による正常リクエストのブロック

症状: 「注文をキャンセルしてください」という正常なリクエストがGatewayでブロックされる
エラーログ:
  Gateway DENY: Blocked pattern detected - "キャンセル" matched "cancel" block rule

原因: blocked_argument_patternsに過度に広いパターンが含まれている
"cancel"というキーワードが引数だけでなくツール説明でもマッチング)

解決:
  1. パターンマッチングの範囲を引数値に限定(ツール名/説明を除外)
  2. ブロックパターン追加時に正常シナリオの回帰テストを必須化
  3. 誤検出発生時に即座にパターンを無効化できるホットフィックス手順を整備

シナリオ3: レートリミットによるパワーユーザーの不便

症状: CSエージェントが大量の注文照会時にレートリミットに引っかかり業務が遅延
エラーログ:
  Gateway RATE_LIMITED: Rate limit exceeded: 30/min for 'order_lookup'

解決:
  1. ロール別レートリミットの差異化(customer: 30/min、agent: 200/min、admin: 無制限)
  2. バッチ照会ツール(order_batch_lookup)の別途提供
  3. レートリミット履歴をダッシュボードに表示してしきい値チューニングの根拠を確保

クイズ

クイズ

Q1. MCPのClient-Server構造でガードレールをGatewayとして分離する理由は?

MCP Server自体を修正せずにポリシーを中央で管理でき、複数のMCP Serverに同一のセキュリティポリシーを一貫して適用できるためである。Server実装とセキュリティポリシーの関心事を分離する。

Q2. MCP inputSchemaにadditionalProperties: falseを設定すべきセキュリティ上の理由は?

LLMがスキーマに定義されていない任意のフィールドを生成し、意図しない動作を引き起こすことを防止する。例えば{"admin_bypass": true}のようなフィールドの挿入をブロックする。

Q3. Tool Poisoning攻撃とは何か、どのように防御するか?

悪意あるMCP Serverが正規ツールに偽装してデータを流出させたりシステムを乗っ取る攻撃である。Server証明書の検証、ツールホワイトリスト、ツールの説明/動作の整合性ハッシュ検証で防御する。

Q4. ガードレール評価でrecallがprecisionより重要な理由は?

Recallが低いと攻撃がブロックされずに通過するため、セキュリティインシデントにつながる。Precisionが低いと正常リクエストが誤ブロックされてユーザー体験が悪化するが、セキュリティインシデントと比較すれば回復可能な問題である。そのためrecall目標(0.98)をprecision目標(0.95)より高く設定する。

Q5. MCP Server障害時のグレースフルデグラデーションの適用方法は?

MCP Server接続失敗時にツール呼び出し機能を無効化し、LLMに「現在ツールが利用できません」というコンテキストを提供して一般会話モードに切り替える。ユーザーには機能制限を明示し、リトライのタイミングを案内する。

Q6. レートリミットをロール別に差異化すべき理由は?

一般顧客とCSエージェントの利用パターンが根本的に異なるためである。エージェントは業務上大量照会が必要であり、同一のレートリミットを適用すると業務効率が低下する。ロール別の利用パターンデータを分析して適切なしきい値を設定すべきである。

Q7. Red teamテストシナリオを月1回更新すべき理由は?

新しい攻撃手法が継続的に発見され、LLMモデルの更新によって既存のガードレールを迂回する新しいパターンが生じる可能性があるためである。OWASP LLM Top 10の更新やセキュリティコミュニティの新しい発見を反映し、テストカバレッジを維持する必要がある。

参考資料