- Authors
- Name

- MCPとは何か、なぜガードレールが必要なのか
- MCP Serverの実装: ツール公開とアクセス制御
- MCPガードレールGateway設計
- MCPツール評価フレームワーク
- MCPセキュリティ脅威モデルと対策
- MCP Server運用モニタリング
- MCPガードレール導入チェックリスト
- 障害シナリオ別対応
- クイズ
- 参考資料
MCPとは何か、なぜガードレールが必要なのか
Model Context Protocol(MCP)は、Anthropicが2024年11月に公開したオープンプロトコルで、LLMアプリケーションが外部データソースやツールに標準化された方式で接続できるように設計されている。MCP以前は、各LLMフレームワークが独自のtool/pluginインターフェースを使用しており、ホストアプリケーションはツールごとに個別の連携コードを記述する必要があった。
MCPはClient-Server構造に従う。
- MCP Host: LLMを内蔵したアプリケーション(Claude Desktop、IDE拡張、チャットボットサービス)
- MCP Client: Host内部でMCP Serverとの接続を管理するプロトコルクライアント
- MCP Server: 外部ツール、データソース、APIをMCPプロトコルで公開するサーバー
MCPがtool callingを標準化した以上、セキュリティ上の脅威も標準化された方法で管理する必要がある。MCP Serverが提供するツールの範囲が広がるほど、「どのツールを誰にどの条件で許可するか」というガードレール設計が重要になる。
本ハンドブックでは、MCPベースのチャットボットにおけるガードレール設計、評価体系、運用手順を一つのドキュメントにまとめる。
MCP Serverの実装: ツール公開とアクセス制御
基本的なMCP Server実装(Python SDK)
"""
MCP Serverの例: 顧客の注文照会とFAQ検索ツールを提供する。
mcpパッケージ(v1.2+)を使用する。
pip install mcp
"""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import json
from typing import Any
# MCP Serverインスタンスの作成
app = Server("customer-support-tools")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""MCP Clientが利用可能なツール一覧を要求した際に呼び出される。"""
return [
Tool(
name="order_lookup",
description="注文番号で注文状況を照会します。注文番号はORD-で始まる形式です。",
inputSchema={
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "注文番号(例: ORD-20260304-001)",
"pattern": r"^ORD-\d{8}-\d{3,6}$",
},
},
"required": ["order_id"],
"additionalProperties": False,
},
),
Tool(
name="faq_search",
description="カスタマーサポートFAQから関連する回答を検索します。",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "検索する質問",
"minLength": 2,
"maxLength": 200,
},
"category": {
"type": "string",
"enum": ["payment", "shipping", "return", "account"],
"description": "FAQカテゴリ",
},
},
"required": ["query"],
"additionalProperties": False,
},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
"""MCP Clientがツール実行を要求した際に呼び出される。"""
if name == "order_lookup":
# 実際の実装ではDB照会を行う
order_id = arguments["order_id"]
result = await lookup_order_from_db(order_id)
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]
elif name == "faq_search":
query = arguments["query"]
category = arguments.get("category")
results = await search_faq_index(query, category)
return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False))]
else:
raise ValueError(f"Unknown tool: {name}")
async def main():
async with stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
MCPガードレールGateway設計
MCP ClientとMCP Serverの間にGatewayを配置し、すべてのtool callに対するガードレール検証を実行する。このGatewayが本ハンドブックのコアコンポーネントである。
"""
MCP Gateway: MCP Client -> Gateway -> MCP Server間で
すべてのtool callをインターセプトしてガードレール検証を実行する。
"""
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from enum import Enum
import re
import json
import logging
logger = logging.getLogger("mcp_gateway")
class GatewayDecision(Enum):
ALLOW = "allow"
DENY = "deny"
REQUIRE_APPROVAL = "require_approval"
RATE_LIMITED = "rate_limited"
@dataclass
class GatewayPolicy:
"""MCP Gatewayポリシー定義"""
# ツール別アクセス制御
tool_permissions: Dict[str, List[str]] = field(default_factory=dict)
# 例: {"order_lookup": ["customer", "agent", "admin"],
# "order_cancel": ["agent", "admin"]}
# ツール別呼び出し制限
rate_limits: Dict[str, int] = field(default_factory=dict)
# 例: {"order_lookup": 30, "faq_search": 60} # per minute
# 引数サニタイズルール
argument_sanitizers: Dict[str, dict] = field(default_factory=dict)
# 高リスクツールリスト(ユーザー確認が必要)
high_risk_tools: List[str] = field(default_factory=list)
# ブロックパターン
blocked_argument_patterns: List[str] = field(default_factory=list)
# デフォルトポリシー
DEFAULT_POLICY = GatewayPolicy(
tool_permissions={
"order_lookup": ["customer", "agent", "admin"],
"faq_search": ["customer", "agent", "admin"],
"order_cancel": ["agent", "admin"],
"refund_process": ["admin"],
"user_data_export": ["admin"],
},
rate_limits={
"order_lookup": 30,
"faq_search": 60,
"order_cancel": 5,
"refund_process": 3,
"user_data_export": 1,
},
high_risk_tools=["order_cancel", "refund_process", "user_data_export"],
blocked_argument_patterns=[
r"(?i)(drop|delete|truncate)\s+(table|database)",
r"(?i)(union\s+select|or\s+1\s*=\s*1)",
r"\.\./", # path traversal
r"<script", # XSS
],
)
class MCPGateway:
def __init__(self, policy: GatewayPolicy = DEFAULT_POLICY):
self.policy = policy
self._call_history: Dict[str, List[datetime]] = {}
def evaluate(
self,
tool_name: str,
arguments: Dict[str, Any],
user_role: str,
session_id: str,
) -> dict:
"""
Tool callに対するガードレール検証を実行する。
Returns: {"decision": GatewayDecision, "reason": str, "sanitized_args": dict}
"""
# 1. パーミッション検証
allowed_roles = self.policy.tool_permissions.get(tool_name, [])
if not allowed_roles:
return self._deny(f"Tool '{tool_name}' is not registered in gateway policy")
if user_role not in allowed_roles:
return self._deny(
f"Role '{user_role}' is not authorized for tool '{tool_name}'. "
f"Required: {allowed_roles}"
)
# 2. レートリミット検証
rate_limit = self.policy.rate_limits.get(tool_name, 60)
if self._is_rate_limited(session_id, tool_name, rate_limit):
return {
"decision": GatewayDecision.RATE_LIMITED,
"reason": f"Rate limit exceeded: {rate_limit}/min for '{tool_name}'",
"sanitized_args": arguments,
}
# 3. 引数サニタイズ
sanitized_args, blocked_patterns = self._sanitize_arguments(arguments)
if blocked_patterns:
logger.warning(
f"Blocked patterns detected in tool '{tool_name}': {blocked_patterns}"
)
return self._deny(
f"Potentially malicious argument patterns detected: {blocked_patterns}"
)
# 4. 高リスクツールの確認
if tool_name in self.policy.high_risk_tools:
return {
"decision": GatewayDecision.REQUIRE_APPROVAL,
"reason": f"Tool '{tool_name}' requires user confirmation before execution",
"sanitized_args": sanitized_args,
}
return {
"decision": GatewayDecision.ALLOW,
"reason": "All checks passed",
"sanitized_args": sanitized_args,
}
def _is_rate_limited(self, session_id: str, tool_name: str, limit: int) -> bool:
key = f"{session_id}:{tool_name}"
now = datetime.utcnow()
cutoff = now - timedelta(minutes=1)
if key not in self._call_history:
self._call_history[key] = []
self._call_history[key] = [
t for t in self._call_history[key] if t > cutoff
]
if len(self._call_history[key]) >= limit:
return True
self._call_history[key].append(now)
return False
def _sanitize_arguments(self, arguments: Dict[str, Any]) -> tuple:
blocked = []
args_str = json.dumps(arguments)
for pattern in self.policy.blocked_argument_patterns:
if re.search(pattern, args_str):
blocked.append(pattern)
return arguments, blocked
def _deny(self, reason: str) -> dict:
return {
"decision": GatewayDecision.DENY,
"reason": reason,
"sanitized_args": {},
}
MCPツール評価フレームワーク
MCP Serverが提供するツールが正しく動作しているか、ガードレールが攻撃を遮断しているかを体系的に評価する必要がある。
ツール正確性評価
"""
MCPツールの機能的正確性を評価するテストスイート。
各ツールについて正常ケース、エッジケース、エラーケースを検証する。
"""
import pytest
from dataclasses import dataclass
from typing import Any, Optional, List
@dataclass
class ToolTestCase:
"""単一ツールテストケース"""
test_id: str
tool_name: str
arguments: dict
expected_status: str # success, error, validation_error
expected_output_contains: Optional[str] = None
expected_error_type: Optional[str] = None
description: str = ""
TOOL_TEST_CASES = [
# 正常ケース
ToolTestCase(
test_id="order_lookup_valid",
tool_name="order_lookup",
arguments={"order_id": "ORD-20260304-001"},
expected_status="success",
expected_output_contains="order_status",
description="有効な注文番号での照会",
),
# エッジケース: 存在しない注文
ToolTestCase(
test_id="order_lookup_not_found",
tool_name="order_lookup",
arguments={"order_id": "ORD-99991231-999"},
expected_status="success",
expected_output_contains="not_found",
description="存在しない注文番号での照会時に正常応答を返す",
),
# エラーケース: 不正なフォーマット
ToolTestCase(
test_id="order_lookup_invalid_format",
tool_name="order_lookup",
arguments={"order_id": "INVALID-FORMAT"},
expected_status="validation_error",
expected_error_type="pattern_mismatch",
description="注文番号の形式が不正な場合",
),
# セキュリティケース: SQLインジェクション試行
ToolTestCase(
test_id="order_lookup_sql_injection",
tool_name="order_lookup",
arguments={"order_id": "ORD-20260304-001' OR '1'='1"},
expected_status="validation_error",
expected_error_type="pattern_mismatch",
description="SQLインジェクション試行のブロック",
),
# FAQ正常検索
ToolTestCase(
test_id="faq_search_valid",
tool_name="faq_search",
arguments={"query": "返金手続き", "category": "return"},
expected_status="success",
expected_output_contains="返金",
description="有効なFAQ検索",
),
# FAQ空の結果
ToolTestCase(
test_id="faq_search_no_results",
tool_name="faq_search",
arguments={"query": "xyzzyspoon"},
expected_status="success",
expected_output_contains="no_results",
description="検索結果がない場合の正常な空応答",
),
]
@pytest.mark.parametrize("case", TOOL_TEST_CASES, ids=lambda c: c.test_id)
async def test_mcp_tool(mcp_client, case):
"""MCPツールの機能的正確性を検証する。"""
try:
result = await mcp_client.call_tool(case.tool_name, case.arguments)
if case.expected_status == "validation_error":
pytest.fail(
f"Expected validation error but tool executed successfully: {result}"
)
if case.expected_output_contains:
result_text = str(result)
assert case.expected_output_contains in result_text, (
f"Expected output to contain '{case.expected_output_contains}', "
f"got: {result_text[:200]}"
)
except Exception as e:
if case.expected_status == "validation_error":
if case.expected_error_type:
assert case.expected_error_type in str(type(e).__name__).lower() or \
case.expected_error_type in str(e).lower(), (
f"Expected error type '{case.expected_error_type}', got: {type(e).__name__}: {e}"
)
else:
raise
ガードレール効果の測定
"""
MCP Gatewayガードレールの効果を定量的に測定する。
攻撃シナリオデータセットに対するブロック率、誤検出率を計算する。
"""
from typing import List, Dict
from dataclasses import dataclass
@dataclass
class GuardrailEvalResult:
total_scenarios: int
true_positives: int # 攻撃を正常にブロック
false_positives: int # 正常リクエストを誤ブロック
true_negatives: int # 正常リクエストを正常に通過
false_negatives: int # 攻撃を見逃し
precision: float
recall: float
f1_score: float
def evaluate_guardrails(
gateway,
attack_scenarios: List[dict],
benign_scenarios: List[dict],
) -> GuardrailEvalResult:
"""
攻撃シナリオと正常シナリオに対してガードレールの精度/再現率を測定する。
"""
tp = fp = tn = fn = 0
# 攻撃シナリオ: ブロックされるべき
for scenario in attack_scenarios:
result = gateway.evaluate(
tool_name=scenario["tool_name"],
arguments=scenario["arguments"],
user_role=scenario.get("user_role", "customer"),
session_id=scenario.get("session_id", "eval"),
)
if result["decision"] in (GatewayDecision.DENY, GatewayDecision.RATE_LIMITED):
tp += 1
else:
fn += 1
print(f"MISS: Attack scenario not blocked: {scenario.get('name', 'unknown')}")
# 正常シナリオ: 通過されるべき
for scenario in benign_scenarios:
result = gateway.evaluate(
tool_name=scenario["tool_name"],
arguments=scenario["arguments"],
user_role=scenario.get("user_role", "customer"),
session_id=scenario.get("session_id", "eval"),
)
if result["decision"] == GatewayDecision.ALLOW:
tn += 1
elif result["decision"] == GatewayDecision.REQUIRE_APPROVAL:
tn += 1 # 高リスクツールの確認要求は正常動作
else:
fp += 1
print(f"FALSE POSITIVE: Benign scenario blocked: {scenario.get('name', 'unknown')}")
total = tp + fp + tn + fn
precision = tp / max(tp + fp, 1)
recall = tp / max(tp + fn, 1)
f1 = 2 * precision * recall / max(precision + recall, 1e-9)
return GuardrailEvalResult(
total_scenarios=total,
true_positives=tp,
false_positives=fp,
true_negatives=tn,
false_negatives=fn,
precision=round(precision, 4),
recall=round(recall, 4),
f1_score=round(f1, 4),
)
MCPセキュリティ脅威モデルと対策
MCPアーキテクチャで発生しうるセキュリティ上の脅威を体系的に整理する。
| 脅威タイプ | 攻撃ベクター | 影響 | 対策 |
|---|---|---|---|
| Tool Poisoning | 悪意あるMCP Serverが正規ツールに偽装 | データ流出、システム乗っ取り | Server証明書検証、ツールホワイトリスト |
| Rug Pull | 初期は正常動作し、後に悪意ある動作に切り替え | 信頼ベースのセキュリティ回避 | ツール動作の継続的モニタリング、ハッシュベースの整合性検証 |
| Argument Injection | ツール引数に悪意あるペイロードを挿入 | SQLインジェクション、コマンド実行 | JSON Schemaの厳格な検証、パターンブロック |
| Excessive Privilege | 不必要に広い権限でツールを公開 | 意図しないデータアクセス | 最小権限の原則、ロールベースアクセス制御 |
| Cross-Context Leakage | 他のセッションのコンテキストが漏洩 | 個人情報の露出 | セッション分離、コンテキストスコーピング |
| Denial of Service | 大量のツール呼び出しによるサーバー過負荷 | サービス中断 | レートリミット、セッション別同時呼び出し制限 |
MCP Server運用モニタリング
# prometheus-rules.yaml
groups:
- name: mcp_server_monitoring
interval: 15s
rules:
- alert: MCPToolCallErrorRateHigh
expr: |
sum(rate(mcp_tool_call_total{status="error"}[5m])) by (tool_name) /
sum(rate(mcp_tool_call_total[5m])) by (tool_name) > 0.05
for: 3m
labels:
severity: warning
team: chatbot
annotations:
summary: 'MCPツール{{ $labels.tool_name }}のエラー率が5%を超えています'
description: |
現在のエラー率: {{ $value | humanizePercentage }}
ツール実装または下流サービスの状態を確認してください。
- alert: MCPGatewayBlockRateSpike
expr: |
sum(rate(mcp_gateway_decision_total{decision="deny"}[15m])) /
sum(rate(mcp_gateway_decision_total[15m])) > 0.20
for: 5m
labels:
severity: warning
team: security
annotations:
summary: 'MCP Gatewayのブロック率が20%を超えています'
description: '攻撃試行の増加またはポリシーの誤設定の可能性があります。監査ログを確認してください。'
- alert: MCPToolLatencyHigh
expr: |
histogram_quantile(0.95,
rate(mcp_tool_call_duration_seconds_bucket[5m])
) > 3.0
for: 3m
labels:
severity: warning
team: chatbot
annotations:
summary: 'MCPツール呼び出しp95レイテンシーが3秒を超えています'
MCPガードレール導入チェックリスト
MCPベースのチャットボットにガードレールを導入する際、順番に確認する。
Phase 1: MCP Serverセキュリティ基盤
- すべてのMCP ServerにTLS証明書を適用
- 各ツールのinputSchemaに
additionalProperties: falseを設定 - 各ツールの引数にtype、pattern、maxLength、enum制約を明示
- 既知の悪意あるパターン(SQLインジェクション、パストラバーサル)のブロックルールを配布
Phase 2: Gatewayポリシー配布
- MCP Gatewayを配備し、すべてのtool callがGateway経由であることを確認
- ロールベースのツールアクセス制御ポリシーを設定
- ツール別レートリミットを設定
- 高リスクツールにhuman-in-the-loop確認フローを適用
Phase 3: モニタリングと評価
- 監査ログの収集と保持ポリシーを策定(最低90日)
- Prometheusアラートルールを配布
- Red teamテストスイートをCIに統合
- ガードレールの精度/再現率を測定(目標: precision 0.95超、recall 0.98超)
Phase 4: 継続運用
- 週1回の監査ログレビュー
- 月1回のRed teamテストシナリオ更新
- 四半期1回のツール権限マトリクスレビュー
- MCP SDKセキュリティパッチ適用手順の策定
障害シナリオ別対応
シナリオ1: MCP Server接続失敗によるツール使用不可
症状: チャットボットがすべてのツール呼び出しに失敗。ユーザーに「機能を利用できません」と繰り返し表示。
エラーログ:
MCPConnectionError: Failed to connect to MCP server at unix:///tmp/mcp-customer-support.sock
Timeout after 5000ms
原因: MCP ServerプロセスがOOMで終了
解決:
1. MCP Serverプロセスの再起動(systemdまたはsupervisor)
2. メモリ制限の設定とOOM killerの優先度調整
3. グレースフルデグラデーション: MCP Server障害時にツールなしの一般会話モードに切り替え
4. ヘルスチェックエンドポイントの追加と自動再起動の設定
シナリオ2: ガードレールの誤検出による正常リクエストのブロック
症状: 「注文をキャンセルしてください」という正常なリクエストがGatewayでブロックされる
エラーログ:
Gateway DENY: Blocked pattern detected - "キャンセル" matched "cancel" block rule
原因: blocked_argument_patternsに過度に広いパターンが含まれている
("cancel"というキーワードが引数だけでなくツール説明でもマッチング)
解決:
1. パターンマッチングの範囲を引数値に限定(ツール名/説明を除外)
2. ブロックパターン追加時に正常シナリオの回帰テストを必須化
3. 誤検出発生時に即座にパターンを無効化できるホットフィックス手順を整備
シナリオ3: レートリミットによるパワーユーザーの不便
症状: CSエージェントが大量の注文照会時にレートリミットに引っかかり業務が遅延
エラーログ:
Gateway RATE_LIMITED: Rate limit exceeded: 30/min for 'order_lookup'
解決:
1. ロール別レートリミットの差異化(customer: 30/min、agent: 200/min、admin: 無制限)
2. バッチ照会ツール(order_batch_lookup)の別途提供
3. レートリミット履歴をダッシュボードに表示してしきい値チューニングの根拠を確保
クイズ
クイズ
Q1. MCPのClient-Server構造でガードレールをGatewayとして分離する理由は?
MCP Server自体を修正せずにポリシーを中央で管理でき、複数のMCP Serverに同一のセキュリティポリシーを一貫して適用できるためである。Server実装とセキュリティポリシーの関心事を分離する。
Q2. MCP inputSchemaにadditionalProperties: falseを設定すべきセキュリティ上の理由は?
LLMがスキーマに定義されていない任意のフィールドを生成し、意図しない動作を引き起こすことを防止する。例えば{"admin_bypass": true}のようなフィールドの挿入をブロックする。
Q3. Tool Poisoning攻撃とは何か、どのように防御するか?
悪意あるMCP Serverが正規ツールに偽装してデータを流出させたりシステムを乗っ取る攻撃である。Server証明書の検証、ツールホワイトリスト、ツールの説明/動作の整合性ハッシュ検証で防御する。
Q4. ガードレール評価でrecallがprecisionより重要な理由は?
Recallが低いと攻撃がブロックされずに通過するため、セキュリティインシデントにつながる。Precisionが低いと正常リクエストが誤ブロックされてユーザー体験が悪化するが、セキュリティインシデントと比較すれば回復可能な問題である。そのためrecall目標(0.98)をprecision目標(0.95)より高く設定する。
Q5. MCP Server障害時のグレースフルデグラデーションの適用方法は?
MCP Server接続失敗時にツール呼び出し機能を無効化し、LLMに「現在ツールが利用できません」というコンテキストを提供して一般会話モードに切り替える。ユーザーには機能制限を明示し、リトライのタイミングを案内する。
Q6. レートリミットをロール別に差異化すべき理由は?
一般顧客とCSエージェントの利用パターンが根本的に異なるためである。エージェントは業務上大量照会が必要であり、同一のレートリミットを適用すると業務効率が低下する。ロール別の利用パターンデータを分析して適切なしきい値を設定すべきである。
Q7. Red teamテストシナリオを月1回更新すべき理由は?
新しい攻撃手法が継続的に発見され、LLMモデルの更新によって既存のガードレールを迂回する新しいパターンが生じる可能性があるためである。OWASP LLM Top 10の更新やセキュリティコミュニティの新しい発見を反映し、テストカバレッジを維持する必要がある。