Split View: 챗봇: MCP 가드레일과 평가 핸드북 2026
챗봇: MCP 가드레일과 평가 핸드북 2026

- MCP란 무엇이고 왜 가드레일이 필요한가
- MCP Server 구현: 도구 노출과 접근 제어
- MCP 가드레일 Gateway 설계
- MCP 도구 평가 프레임워크
- MCP 보안 위협 모델과 대응
- MCP Server 운영 모니터링
- MCP 가드레일 도입 체크리스트
- 장애 시나리오별 대응
- 퀴즈
- References
MCP란 무엇이고 왜 가드레일이 필요한가
Model Context Protocol(MCP)은 Anthropic이 2024년 11월에 공개한 오픈 프로토콜로, LLM 애플리케이션이 외부 데이터 소스와 도구에 표준화된 방식으로 연결되도록 설계되었다. MCP 이전에는 각 LLM 프레임워크가 독자적인 tool/plugin 인터페이스를 사용했고, 호스트 애플리케이션이 도구마다 별도의 연동 코드를 작성해야 했다.
MCP는 Client-Server 구조를 따른다.
- MCP Host: LLM을 내장한 애플리케이션 (Claude Desktop, IDE 확장, 챗봇 서비스)
- MCP Client: Host 내부에서 MCP Server와의 연결을 관리하는 프로토콜 클라이언트
- MCP Server: 외부 도구, 데이터 소스, API를 MCP 프로토콜로 노출하는 서버
MCP가 tool calling을 표준화한 만큼, 보안 위협도 표준화된 방식으로 관리해야 한다. MCP Server가 제공하는 도구의 범위가 넓어질수록, "어떤 도구를 누구에게 어떤 조건으로 허용할 것인가"라는 가드레일 설계가 핵심이 된다.
이 핸드북은 MCP 기반 챗봇의 가드레일 설계, 평가 체계, 운영 절차를 하나의 문서로 정리한다.
MCP Server 구현: 도구 노출과 접근 제어
기본 MCP Server 구현 (Python SDK)
"""
MCP Server 예시: 고객 주문 조회와 FAQ 검색 도구를 제공한다.
mcp 패키지(v1.2+)를 사용한다.
pip install mcp
"""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import json
from typing import Any
# MCP Server 인스턴스 생성
app = Server("customer-support-tools")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""MCP Client가 사용 가능한 도구 목록을 요청할 때 호출된다."""
return [
Tool(
name="order_lookup",
description="주문 번호로 주문 상태를 조회합니다. 주문 번호는 ORD-로 시작하는 형식입니다.",
inputSchema={
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "주문 번호 (예: ORD-20260304-001)",
"pattern": r"^ORD-\d{8}-\d{3,6}$",
},
},
"required": ["order_id"],
"additionalProperties": False,
},
),
Tool(
name="faq_search",
description="고객 지원 FAQ에서 관련 답변을 검색합니다.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "검색할 질문",
"minLength": 2,
"maxLength": 200,
},
"category": {
"type": "string",
"enum": ["payment", "shipping", "return", "account"],
"description": "FAQ 카테고리",
},
},
"required": ["query"],
"additionalProperties": False,
},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
"""MCP Client가 도구 실행을 요청할 때 호출된다."""
if name == "order_lookup":
# 실제 구현에서는 DB 조회
order_id = arguments["order_id"]
result = await lookup_order_from_db(order_id)
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]
elif name == "faq_search":
query = arguments["query"]
category = arguments.get("category")
results = await search_faq_index(query, category)
return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False))]
else:
raise ValueError(f"Unknown tool: {name}")
async def main():
async with stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
MCP 가드레일 Gateway 설계
MCP Client와 MCP Server 사이에 Gateway를 배치하여, 모든 tool call에 대한 가드레일 검증을 수행한다. 이 Gateway가 이 핸드북의 핵심 컴포넌트다.
"""
MCP Gateway: MCP Client -> Gateway -> MCP Server 사이에서
모든 tool call을 가로채어 가드레일 검증을 수행한다.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from enum import Enum
import re
import json
import logging
logger = logging.getLogger("mcp_gateway")
class GatewayDecision(Enum):
ALLOW = "allow"
DENY = "deny"
REQUIRE_APPROVAL = "require_approval"
RATE_LIMITED = "rate_limited"
@dataclass
class GatewayPolicy:
"""MCP Gateway 정책 정의"""
# 도구별 접근 제어
tool_permissions: Dict[str, List[str]] = field(default_factory=dict)
# 예: {"order_lookup": ["customer", "agent", "admin"],
# "order_cancel": ["agent", "admin"]}
# 도구별 호출 제한
rate_limits: Dict[str, int] = field(default_factory=dict)
# 예: {"order_lookup": 30, "faq_search": 60} # per minute
# 인자 정제 규칙
argument_sanitizers: Dict[str, dict] = field(default_factory=dict)
# 위험 도구 목록 (사용자 확인 필요)
high_risk_tools: List[str] = field(default_factory=list)
# 차단 패턴
blocked_argument_patterns: List[str] = field(default_factory=list)
# 기본 정책
DEFAULT_POLICY = GatewayPolicy(
tool_permissions={
"order_lookup": ["customer", "agent", "admin"],
"faq_search": ["customer", "agent", "admin"],
"order_cancel": ["agent", "admin"],
"refund_process": ["admin"],
"user_data_export": ["admin"],
},
rate_limits={
"order_lookup": 30,
"faq_search": 60,
"order_cancel": 5,
"refund_process": 3,
"user_data_export": 1,
},
high_risk_tools=["order_cancel", "refund_process", "user_data_export"],
blocked_argument_patterns=[
r"(?i)(drop|delete|truncate)\s+(table|database)",
r"(?i)(union\s+select|or\s+1\s*=\s*1)",
r"\.\./", # path traversal
r"<script", # XSS
],
)
class MCPGateway:
def __init__(self, policy: GatewayPolicy = DEFAULT_POLICY):
self.policy = policy
self._call_history: Dict[str, List[datetime]] = {}
def evaluate(
self,
tool_name: str,
arguments: Dict[str, Any],
user_role: str,
session_id: str,
) -> dict:
"""
Tool call에 대한 가드레일 검증을 수행한다.
Returns: {"decision": GatewayDecision, "reason": str, "sanitized_args": dict}
"""
# 1. Permission 검증
allowed_roles = self.policy.tool_permissions.get(tool_name, [])
if not allowed_roles:
return self._deny(f"Tool '{tool_name}' is not registered in gateway policy")
if user_role not in allowed_roles:
return self._deny(
f"Role '{user_role}' is not authorized for tool '{tool_name}'. "
f"Required: {allowed_roles}"
)
# 2. Rate limit 검증
rate_limit = self.policy.rate_limits.get(tool_name, 60)
if self._is_rate_limited(session_id, tool_name, rate_limit):
return {
"decision": GatewayDecision.RATE_LIMITED,
"reason": f"Rate limit exceeded: {rate_limit}/min for '{tool_name}'",
"sanitized_args": arguments,
}
# 3. Argument sanitization
sanitized_args, blocked_patterns = self._sanitize_arguments(arguments)
if blocked_patterns:
logger.warning(
f"Blocked patterns detected in tool '{tool_name}': {blocked_patterns}"
)
return self._deny(
f"Potentially malicious argument patterns detected: {blocked_patterns}"
)
# 4. High-risk tool 확인
if tool_name in self.policy.high_risk_tools:
return {
"decision": GatewayDecision.REQUIRE_APPROVAL,
"reason": f"Tool '{tool_name}' requires user confirmation before execution",
"sanitized_args": sanitized_args,
}
return {
"decision": GatewayDecision.ALLOW,
"reason": "All checks passed",
"sanitized_args": sanitized_args,
}
def _is_rate_limited(self, session_id: str, tool_name: str, limit: int) -> bool:
key = f"{session_id}:{tool_name}"
now = datetime.utcnow()
cutoff = now - timedelta(minutes=1)
if key not in self._call_history:
self._call_history[key] = []
self._call_history[key] = [
t for t in self._call_history[key] if t > cutoff
]
if len(self._call_history[key]) >= limit:
return True
self._call_history[key].append(now)
return False
def _sanitize_arguments(self, arguments: Dict[str, Any]) -> tuple:
blocked = []
args_str = json.dumps(arguments)
for pattern in self.policy.blocked_argument_patterns:
if re.search(pattern, args_str):
blocked.append(pattern)
return arguments, blocked
def _deny(self, reason: str) -> dict:
return {
"decision": GatewayDecision.DENY,
"reason": reason,
"sanitized_args": {},
}
MCP 도구 평가 프레임워크
MCP Server가 제공하는 도구들이 올바르게 동작하는지, 가드레일이 공격을 차단하는지 체계적으로 평가해야 한다.
도구 정확성 평가
"""
MCP 도구의 기능적 정확성을 평가하는 테스트 스위트.
각 도구에 대해 정상 케이스, 엣지 케이스, 에러 케이스를 검증한다.
"""
import pytest
from dataclasses import dataclass
from typing import Any, Optional, List
@dataclass
class ToolTestCase:
"""단일 도구 테스트 케이스"""
test_id: str
tool_name: str
arguments: dict
expected_status: str # success, error, validation_error
expected_output_contains: Optional[str] = None
expected_error_type: Optional[str] = None
description: str = ""
TOOL_TEST_CASES = [
# 정상 케이스
ToolTestCase(
test_id="order_lookup_valid",
tool_name="order_lookup",
arguments={"order_id": "ORD-20260304-001"},
expected_status="success",
expected_output_contains="order_status",
description="유효한 주문 번호로 조회",
),
# 엣지 케이스: 존재하지 않는 주문
ToolTestCase(
test_id="order_lookup_not_found",
tool_name="order_lookup",
arguments={"order_id": "ORD-99991231-999"},
expected_status="success",
expected_output_contains="not_found",
description="존재하지 않는 주문 번호 조회 시 정상 응답",
),
# 에러 케이스: 잘못된 형식
ToolTestCase(
test_id="order_lookup_invalid_format",
tool_name="order_lookup",
arguments={"order_id": "INVALID-FORMAT"},
expected_status="validation_error",
expected_error_type="pattern_mismatch",
description="주문 번호 형식이 잘못된 경우",
),
# 보안 케이스: SQL injection 시도
ToolTestCase(
test_id="order_lookup_sql_injection",
tool_name="order_lookup",
arguments={"order_id": "ORD-20260304-001' OR '1'='1"},
expected_status="validation_error",
expected_error_type="pattern_mismatch",
description="SQL injection 시도 차단",
),
# FAQ 정상 검색
ToolTestCase(
test_id="faq_search_valid",
tool_name="faq_search",
arguments={"query": "환불 절차", "category": "return"},
expected_status="success",
expected_output_contains="환불",
description="유효한 FAQ 검색",
),
# FAQ 빈 결과
ToolTestCase(
test_id="faq_search_no_results",
tool_name="faq_search",
arguments={"query": "xyzzyspoon"},
expected_status="success",
expected_output_contains="no_results",
description="검색 결과 없음 시 정상 빈 응답",
),
]
@pytest.mark.parametrize("case", TOOL_TEST_CASES, ids=lambda c: c.test_id)
async def test_mcp_tool(mcp_client, case):
"""MCP 도구의 기능적 정확성을 검증한다."""
try:
result = await mcp_client.call_tool(case.tool_name, case.arguments)
if case.expected_status == "validation_error":
pytest.fail(
f"Expected validation error but tool executed successfully: {result}"
)
if case.expected_output_contains:
result_text = str(result)
assert case.expected_output_contains in result_text, (
f"Expected output to contain '{case.expected_output_contains}', "
f"got: {result_text[:200]}"
)
except Exception as e:
if case.expected_status == "validation_error":
if case.expected_error_type:
assert case.expected_error_type in str(type(e).__name__).lower() or \
case.expected_error_type in str(e).lower(), (
f"Expected error type '{case.expected_error_type}', got: {type(e).__name__}: {e}"
)
else:
raise
가드레일 효과 측정
"""
MCP Gateway 가드레일의 효과를 정량적으로 측정한다.
공격 시나리오 데이터셋에 대한 차단율, 오탐율을 계산한다.
"""
from typing import List, Dict
from dataclasses import dataclass
@dataclass
class GuardrailEvalResult:
total_scenarios: int
true_positives: int # 공격을 정상 차단
false_positives: int # 정상을 오차단
true_negatives: int # 정상을 정상 통과
false_negatives: int # 공격을 미차단
precision: float
recall: float
f1_score: float
def evaluate_guardrails(
gateway,
attack_scenarios: List[dict],
benign_scenarios: List[dict],
) -> GuardrailEvalResult:
"""
공격 시나리오와 정상 시나리오에 대해 가드레일의 정밀도/재현율을 측정한다.
"""
tp = fp = tn = fn = 0
# 공격 시나리오: 차단되어야 정상
for scenario in attack_scenarios:
result = gateway.evaluate(
tool_name=scenario["tool_name"],
arguments=scenario["arguments"],
user_role=scenario.get("user_role", "customer"),
session_id=scenario.get("session_id", "eval"),
)
if result["decision"] in (GatewayDecision.DENY, GatewayDecision.RATE_LIMITED):
tp += 1
else:
fn += 1
print(f"MISS: Attack scenario not blocked: {scenario.get('name', 'unknown')}")
# 정상 시나리오: 통과되어야 정상
for scenario in benign_scenarios:
result = gateway.evaluate(
tool_name=scenario["tool_name"],
arguments=scenario["arguments"],
user_role=scenario.get("user_role", "customer"),
session_id=scenario.get("session_id", "eval"),
)
if result["decision"] == GatewayDecision.ALLOW:
tn += 1
elif result["decision"] == GatewayDecision.REQUIRE_APPROVAL:
tn += 1 # high-risk 도구의 확인 요청은 정상 동작
else:
fp += 1
print(f"FALSE POSITIVE: Benign scenario blocked: {scenario.get('name', 'unknown')}")
total = tp + fp + tn + fn
precision = tp / max(tp + fp, 1)
recall = tp / max(tp + fn, 1)
f1 = 2 * precision * recall / max(precision + recall, 1e-9)
return GuardrailEvalResult(
total_scenarios=total,
true_positives=tp,
false_positives=fp,
true_negatives=tn,
false_negatives=fn,
precision=round(precision, 4),
recall=round(recall, 4),
f1_score=round(f1, 4),
)
MCP 보안 위협 모델과 대응
MCP 아키텍처에서 발생할 수 있는 보안 위협을 체계적으로 정리한다.
| 위협 유형 | 공격 벡터 | 영향 | 대응 방법 |
|---|---|---|---|
| Tool Poisoning | 악성 MCP Server가 정상 도구로 위장 | 데이터 유출, 시스템 장악 | Server 인증서 검증, 도구 화이트리스트 |
| Rug Pull | 초기에 정상 동작 후 악성 동작으로 전환 | 신뢰 기반 보안 우회 | 도구 동작 지속 모니터링, 해시 기반 무결성 검증 |
| Argument Injection | 도구 인자에 악성 페이로드 삽입 | SQL injection, 명령어 실행 | JSON Schema 엄격 검증, 패턴 차단 |
| Excessive Privilege | 불필요하게 넓은 권한의 도구 노출 | 의도하지 않은 데이터 접근 | 최소 권한 원칙, 역할 기반 접근 제어 |
| Cross-Context Leakage | 다른 세션의 컨텍스트가 유출 | 개인정보 노출 | 세션 격리, 컨텍스트 스코핑 |
| Denial of Service | 대량 도구 호출로 서버 과부하 | 서비스 중단 | Rate limiting, 세션별 동시 호출 제한 |
MCP Server 운영 모니터링
# prometheus-rules.yaml
groups:
- name: mcp_server_monitoring
interval: 15s
rules:
- alert: MCPToolCallErrorRateHigh
expr: |
sum(rate(mcp_tool_call_total{status="error"}[5m])) by (tool_name) /
sum(rate(mcp_tool_call_total[5m])) by (tool_name) > 0.05
for: 3m
labels:
severity: warning
team: chatbot
annotations:
summary: 'MCP 도구 {{ $labels.tool_name }}의 에러율이 5%를 초과합니다'
description: |
현재 에러율: {{ $value | humanizePercentage }}
도구 구현 또는 하위 서비스 상태를 확인하세요.
- alert: MCPGatewayBlockRateSpike
expr: |
sum(rate(mcp_gateway_decision_total{decision="deny"}[15m])) /
sum(rate(mcp_gateway_decision_total[15m])) > 0.20
for: 5m
labels:
severity: warning
team: security
annotations:
summary: 'MCP Gateway 차단율이 20%를 초과합니다'
description: '공격 시도 증가 또는 정책 오설정 가능성. 감사 로그를 확인하세요.'
- alert: MCPToolLatencyHigh
expr: |
histogram_quantile(0.95,
rate(mcp_tool_call_duration_seconds_bucket[5m])
) > 3.0
for: 3m
labels:
severity: warning
team: chatbot
annotations:
summary: 'MCP 도구 호출 p95 지연이 3초를 초과합니다'
MCP 가드레일 도입 체크리스트
MCP 기반 챗봇에 가드레일을 도입할 때 순서대로 점검한다.
Phase 1: MCP Server 보안 기반
- 모든 MCP Server에 TLS 인증서 적용
- 도구별 inputSchema에
additionalProperties: false설정 - 도구별 인자의 type, pattern, maxLength, enum 제약 조건 명시
- 알려진 악성 패턴(SQL injection, path traversal) 차단 규칙 배포
Phase 2: Gateway 정책 배포
- MCP Gateway 배포 및 모든 tool call 프록시 경유 확인
- 역할 기반 도구 접근 제어 정책 설정
- 도구별 rate limit 설정
- 고위험 도구에 human-in-the-loop 확인 플로우 적용
Phase 3: 모니터링과 평가
- 감사 로그 수집 및 보관 정책 수립 (최소 90일)
- Prometheus 알림 규칙 배포
- Red team 테스트 스위트 CI 통합
- 가드레일 정밀도/재현율 측정 (목표: precision > 0.95, recall > 0.98)
Phase 4: 지속 운영
- 주 1회 감사 로그 리뷰
- 월 1회 red team 테스트 시나리오 업데이트
- 분기 1회 도구 권한 매트릭스 리뷰
- MCP SDK 보안 패치 적용 절차 수립
장애 시나리오별 대응
시나리오 1: MCP Server 연결 실패로 도구 사용 불가
증상: 챗봇이 모든 도구 호출에 실패. 사용자에게 "기능을 사용할 수 없습니다" 반복 표시.
에러 로그:
MCPConnectionError: Failed to connect to MCP server at unix:///tmp/mcp-customer-support.sock
Timeout after 5000ms
원인: MCP Server 프로세스가 OOM으로 종료됨
해결:
1. MCP Server 프로세스 재시작 (systemd 또는 supervisor)
2. 메모리 제한 설정 및 OOM killer 우선순위 조정
3. Graceful degradation: MCP Server 장애 시 도구 없이 일반 대화 모드로 전환
4. Health check 엔드포인트 추가 및 자동 재시작 설정
시나리오 2: 가드레일 오탐으로 정상 요청 차단
증상: "주문 취소해 주세요"라는 정상 요청이 Gateway에서 차단됨
에러 로그:
Gateway DENY: Blocked pattern detected - "취소" matched "cancel" block rule
원인: blocked_argument_patterns에 과도하게 넓은 패턴이 포함됨
("cancel"이라는 단어가 인자뿐 아니라 도구 설명에서도 매칭)
해결:
1. 패턴 매칭 범위를 인자 값으로 한정 (도구 이름/설명 제외)
2. 차단 패턴 추가 시 정상 시나리오 회귀 테스트 필수화
3. 오탐 발생 시 즉시 패턴 비활성화할 수 있는 핫픽스 절차 마련
시나리오 3: Rate limiting으로 파워 유저 불편
증상: CS 상담사가 대량 주문 조회 시 rate limit에 걸려 업무 지연
에러 로그:
Gateway RATE_LIMITED: Rate limit exceeded: 30/min for 'order_lookup'
해결:
1. 역할별 rate limit 차등 적용 (customer: 30/min, agent: 200/min, admin: 무제한)
2. 배치 조회 도구 (order_batch_lookup) 별도 제공
3. Rate limit 히스토리를 대시보드에 표시하여 임계치 튜닝 근거 확보
퀴즈
퀴즈
Q1. MCP의 Client-Server 구조에서 가드레일을 Gateway로 분리하는 이유는?
||MCP Server 자체를 수정하지 않고도 정책을 중앙에서 관리할 수 있고, 여러 MCP Server에 동일한 보안 정책을 일관되게 적용할 수 있기 때문이다. Server 구현과 보안 정책의 관심사를 분리한다.||
Q2. MCP inputSchema에 additionalProperties: false를 설정해야 하는 보안상 이유는?
||LLM이 스키마에 정의되지 않은 임의의 필드를 생성하여 의도하지 않은 동작을 유발하는 것을 방지한다. 예를 들어 {"admin_bypass": true} 같은 필드가 삽입되는 것을 차단한다.||
Q3. Tool Poisoning 공격이란 무엇이고 어떻게 방어하는가?
||악성 MCP Server가 정상 도구로 위장하여 데이터를 유출하거나 시스템을 장악하는 공격이다. Server 인증서 검증, 도구 화이트리스트, 도구 설명/동작의 무결성 해시 검증으로 방어한다.||
Q4. 가드레일 평가에서 recall이 precision보다 더 중요한 이유는?
||Recall이 낮으면 공격이 차단되지 않고 통과하는 것이므로 보안 사고로 이어진다. Precision이 낮으면 정상 요청이 오차단되어 사용자 경험이 나빠지지만, 보안 사고보다는 복구 가능한 문제다. 따라서 recall 목표(0.98)를 precision 목표(0.95)보다 높게 설정한다.||
Q5. MCP Server 장애 시 Graceful Degradation을 적용하는 방법은?
||MCP Server 연결 실패 시 도구 호출 기능을 비활성화하고, LLM에게 "현재 도구 사용이 불가능합니다"라는 컨텍스트를 제공하여 일반 대화 모드로 전환한다. 사용자에게는 기능 제한을 명시하고 재시도 시점을 안내한다.||
Q6. Rate limiting을 역할별로 차등 적용해야 하는 이유는?
||일반 고객과 CS 상담사의 사용 패턴이 근본적으로 다르기 때문이다. 상담사는 업무상 대량 조회가 필요하고, 동일한 rate limit을 적용하면 업무 효율이 저하된다. 역할별 사용 패턴 데이터를 분석하여 적정 임계치를 설정해야 한다.||
Q7. Red team 테스트 시나리오를 월 1회 업데이트해야 하는 이유는?
||새로운 공격 기법이 지속적으로 발견되고, LLM 모델 업데이트로 기존 가드레일을 우회하는 새로운 패턴이 생길 수 있기 때문이다. OWASP LLM Top 10 업데이트, 보안 커뮤니티의 새로운 발견을 반영하여 테스트 커버리지를 유지해야 한다.||
References
Chatbot: MCP Guardrails and Evaluation Handbook 2026

- What Is MCP and Why Are Guardrails Needed
- MCP Server Implementation: Tool Exposure and Access Control
- MCP Guardrail Gateway Design
- MCP Tool Evaluation Framework
- MCP Security Threat Model and Countermeasures
- MCP Server Operational Monitoring
- MCP Guardrail Adoption Checklist
- Failure Scenario Responses
- Quiz
- References
What Is MCP and Why Are Guardrails Needed
Model Context Protocol (MCP) is an open protocol released by Anthropic in November 2024, designed to provide a standardized way for LLM applications to connect with external data sources and tools. Before MCP, each LLM framework used its own proprietary tool/plugin interface, requiring host applications to write separate integration code for each tool.
MCP follows a Client-Server architecture.
- MCP Host: An application with an embedded LLM (Claude Desktop, IDE extensions, chatbot services)
- MCP Client: A protocol client within the Host that manages the connection with the MCP Server
- MCP Server: A server that exposes external tools, data sources, and APIs via the MCP protocol
Since MCP standardizes tool calling, security threats must also be managed in a standardized manner. As the range of tools provided by MCP Servers widens, designing guardrails around "which tools to allow for whom under what conditions" becomes critical.
This handbook consolidates guardrail design, evaluation frameworks, and operational procedures for MCP-based chatbots into a single document.
MCP Server Implementation: Tool Exposure and Access Control
Basic MCP Server Implementation (Python SDK)
"""
MCP Server example: provides customer order lookup and FAQ search tools.
Uses the mcp package (v1.2+).
pip install mcp
"""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import json
from typing import Any
# Create MCP Server instance
app = Server("customer-support-tools")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""Called when the MCP Client requests the list of available tools."""
return [
Tool(
name="order_lookup",
description="Looks up order status by order number. Order numbers follow the ORD- prefix format.",
inputSchema={
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "Order number (e.g., ORD-20260304-001)",
"pattern": r"^ORD-\d{8}-\d{3,6}$",
},
},
"required": ["order_id"],
"additionalProperties": False,
},
),
Tool(
name="faq_search",
description="Searches for relevant answers in the customer support FAQ.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The question to search for",
"minLength": 2,
"maxLength": 200,
},
"category": {
"type": "string",
"enum": ["payment", "shipping", "return", "account"],
"description": "FAQ category",
},
},
"required": ["query"],
"additionalProperties": False,
},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
"""Called when the MCP Client requests tool execution."""
if name == "order_lookup":
# In a real implementation, this would query a DB
order_id = arguments["order_id"]
result = await lookup_order_from_db(order_id)
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]
elif name == "faq_search":
query = arguments["query"]
category = arguments.get("category")
results = await search_faq_index(query, category)
return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False))]
else:
raise ValueError(f"Unknown tool: {name}")
async def main():
async with stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
MCP Guardrail Gateway Design
A Gateway is placed between the MCP Client and MCP Server to perform guardrail validation on all tool calls. This Gateway is the core component of this handbook.
"""
MCP Gateway: Intercepts all tool calls between
MCP Client -> Gateway -> MCP Server and performs guardrail validation.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from enum import Enum
import re
import json
import logging
logger = logging.getLogger("mcp_gateway")
class GatewayDecision(Enum):
ALLOW = "allow"
DENY = "deny"
REQUIRE_APPROVAL = "require_approval"
RATE_LIMITED = "rate_limited"
@dataclass
class GatewayPolicy:
"""MCP Gateway policy definition"""
# Per-tool access control
tool_permissions: Dict[str, List[str]] = field(default_factory=dict)
# e.g., {"order_lookup": ["customer", "agent", "admin"],
# "order_cancel": ["agent", "admin"]}
# Per-tool call limits
rate_limits: Dict[str, int] = field(default_factory=dict)
# e.g., {"order_lookup": 30, "faq_search": 60} # per minute
# Argument sanitization rules
argument_sanitizers: Dict[str, dict] = field(default_factory=dict)
# High-risk tool list (requires user confirmation)
high_risk_tools: List[str] = field(default_factory=list)
# Blocked patterns
blocked_argument_patterns: List[str] = field(default_factory=list)
# Default policy
DEFAULT_POLICY = GatewayPolicy(
tool_permissions={
"order_lookup": ["customer", "agent", "admin"],
"faq_search": ["customer", "agent", "admin"],
"order_cancel": ["agent", "admin"],
"refund_process": ["admin"],
"user_data_export": ["admin"],
},
rate_limits={
"order_lookup": 30,
"faq_search": 60,
"order_cancel": 5,
"refund_process": 3,
"user_data_export": 1,
},
high_risk_tools=["order_cancel", "refund_process", "user_data_export"],
blocked_argument_patterns=[
r"(?i)(drop|delete|truncate)\s+(table|database)",
r"(?i)(union\s+select|or\s+1\s*=\s*1)",
r"\.\./", # path traversal
r"<script", # XSS
],
)
class MCPGateway:
def __init__(self, policy: GatewayPolicy = DEFAULT_POLICY):
self.policy = policy
self._call_history: Dict[str, List[datetime]] = {}
def evaluate(
self,
tool_name: str,
arguments: Dict[str, Any],
user_role: str,
session_id: str,
) -> dict:
"""
Performs guardrail validation on a tool call.
Returns: {"decision": GatewayDecision, "reason": str, "sanitized_args": dict}
"""
# 1. Permission check
allowed_roles = self.policy.tool_permissions.get(tool_name, [])
if not allowed_roles:
return self._deny(f"Tool '{tool_name}' is not registered in gateway policy")
if user_role not in allowed_roles:
return self._deny(
f"Role '{user_role}' is not authorized for tool '{tool_name}'. "
f"Required: {allowed_roles}"
)
# 2. Rate limit check
rate_limit = self.policy.rate_limits.get(tool_name, 60)
if self._is_rate_limited(session_id, tool_name, rate_limit):
return {
"decision": GatewayDecision.RATE_LIMITED,
"reason": f"Rate limit exceeded: {rate_limit}/min for '{tool_name}'",
"sanitized_args": arguments,
}
# 3. Argument sanitization
sanitized_args, blocked_patterns = self._sanitize_arguments(arguments)
if blocked_patterns:
logger.warning(
f"Blocked patterns detected in tool '{tool_name}': {blocked_patterns}"
)
return self._deny(
f"Potentially malicious argument patterns detected: {blocked_patterns}"
)
# 4. High-risk tool confirmation
if tool_name in self.policy.high_risk_tools:
return {
"decision": GatewayDecision.REQUIRE_APPROVAL,
"reason": f"Tool '{tool_name}' requires user confirmation before execution",
"sanitized_args": sanitized_args,
}
return {
"decision": GatewayDecision.ALLOW,
"reason": "All checks passed",
"sanitized_args": sanitized_args,
}
def _is_rate_limited(self, session_id: str, tool_name: str, limit: int) -> bool:
key = f"{session_id}:{tool_name}"
now = datetime.utcnow()
cutoff = now - timedelta(minutes=1)
if key not in self._call_history:
self._call_history[key] = []
self._call_history[key] = [
t for t in self._call_history[key] if t > cutoff
]
if len(self._call_history[key]) >= limit:
return True
self._call_history[key].append(now)
return False
def _sanitize_arguments(self, arguments: Dict[str, Any]) -> tuple:
blocked = []
args_str = json.dumps(arguments)
for pattern in self.policy.blocked_argument_patterns:
if re.search(pattern, args_str):
blocked.append(pattern)
return arguments, blocked
def _deny(self, reason: str) -> dict:
return {
"decision": GatewayDecision.DENY,
"reason": reason,
"sanitized_args": {},
}
MCP Tool Evaluation Framework
It is essential to systematically evaluate whether the tools provided by MCP Servers function correctly and whether guardrails properly block attacks.
Tool Correctness Evaluation
"""
Test suite for evaluating the functional correctness of MCP tools.
Validates normal cases, edge cases, and error cases for each tool.
"""
import pytest
from dataclasses import dataclass
from typing import Any, Optional, List
@dataclass
class ToolTestCase:
"""Single tool test case"""
test_id: str
tool_name: str
arguments: dict
expected_status: str # success, error, validation_error
expected_output_contains: Optional[str] = None
expected_error_type: Optional[str] = None
description: str = ""
TOOL_TEST_CASES = [
# Normal case
ToolTestCase(
test_id="order_lookup_valid",
tool_name="order_lookup",
arguments={"order_id": "ORD-20260304-001"},
expected_status="success",
expected_output_contains="order_status",
description="Lookup with a valid order number",
),
# Edge case: non-existent order
ToolTestCase(
test_id="order_lookup_not_found",
tool_name="order_lookup",
arguments={"order_id": "ORD-99991231-999"},
expected_status="success",
expected_output_contains="not_found",
description="Returns proper response for a non-existent order number",
),
# Error case: invalid format
ToolTestCase(
test_id="order_lookup_invalid_format",
tool_name="order_lookup",
arguments={"order_id": "INVALID-FORMAT"},
expected_status="validation_error",
expected_error_type="pattern_mismatch",
description="Invalid order number format",
),
# Security case: SQL injection attempt
ToolTestCase(
test_id="order_lookup_sql_injection",
tool_name="order_lookup",
arguments={"order_id": "ORD-20260304-001' OR '1'='1"},
expected_status="validation_error",
expected_error_type="pattern_mismatch",
description="Blocks SQL injection attempt",
),
# FAQ normal search
ToolTestCase(
test_id="faq_search_valid",
tool_name="faq_search",
arguments={"query": "refund process", "category": "return"},
expected_status="success",
expected_output_contains="refund",
description="Valid FAQ search",
),
# FAQ empty result
ToolTestCase(
test_id="faq_search_no_results",
tool_name="faq_search",
arguments={"query": "xyzzyspoon"},
expected_status="success",
expected_output_contains="no_results",
description="Returns proper empty response when no results found",
),
]
@pytest.mark.parametrize("case", TOOL_TEST_CASES, ids=lambda c: c.test_id)
async def test_mcp_tool(mcp_client, case):
"""Verifies the functional correctness of MCP tools."""
try:
result = await mcp_client.call_tool(case.tool_name, case.arguments)
if case.expected_status == "validation_error":
pytest.fail(
f"Expected validation error but tool executed successfully: {result}"
)
if case.expected_output_contains:
result_text = str(result)
assert case.expected_output_contains in result_text, (
f"Expected output to contain '{case.expected_output_contains}', "
f"got: {result_text[:200]}"
)
except Exception as e:
if case.expected_status == "validation_error":
if case.expected_error_type:
assert case.expected_error_type in str(type(e).__name__).lower() or \
case.expected_error_type in str(e).lower(), (
f"Expected error type '{case.expected_error_type}', got: {type(e).__name__}: {e}"
)
else:
raise
Guardrail Effectiveness Measurement
"""
Quantitatively measures the effectiveness of MCP Gateway guardrails.
Calculates block rate and false positive rate against an attack scenario dataset.
"""
from typing import List, Dict
from dataclasses import dataclass
@dataclass
class GuardrailEvalResult:
total_scenarios: int
true_positives: int # Correctly blocked attacks
false_positives: int # Incorrectly blocked legitimate requests
true_negatives: int # Correctly allowed legitimate requests
false_negatives: int # Missed attacks
precision: float
recall: float
f1_score: float
def evaluate_guardrails(
gateway,
attack_scenarios: List[dict],
benign_scenarios: List[dict],
) -> GuardrailEvalResult:
"""
Measures the precision/recall of guardrails against attack and benign scenarios.
"""
tp = fp = tn = fn = 0
# Attack scenarios: should be blocked
for scenario in attack_scenarios:
result = gateway.evaluate(
tool_name=scenario["tool_name"],
arguments=scenario["arguments"],
user_role=scenario.get("user_role", "customer"),
session_id=scenario.get("session_id", "eval"),
)
if result["decision"] in (GatewayDecision.DENY, GatewayDecision.RATE_LIMITED):
tp += 1
else:
fn += 1
print(f"MISS: Attack scenario not blocked: {scenario.get('name', 'unknown')}")
# Benign scenarios: should be allowed
for scenario in benign_scenarios:
result = gateway.evaluate(
tool_name=scenario["tool_name"],
arguments=scenario["arguments"],
user_role=scenario.get("user_role", "customer"),
session_id=scenario.get("session_id", "eval"),
)
if result["decision"] == GatewayDecision.ALLOW:
tn += 1
elif result["decision"] == GatewayDecision.REQUIRE_APPROVAL:
tn += 1 # Confirmation request for high-risk tools is normal behavior
else:
fp += 1
print(f"FALSE POSITIVE: Benign scenario blocked: {scenario.get('name', 'unknown')}")
total = tp + fp + tn + fn
precision = tp / max(tp + fp, 1)
recall = tp / max(tp + fn, 1)
f1 = 2 * precision * recall / max(precision + recall, 1e-9)
return GuardrailEvalResult(
total_scenarios=total,
true_positives=tp,
false_positives=fp,
true_negatives=tn,
false_negatives=fn,
precision=round(precision, 4),
recall=round(recall, 4),
f1_score=round(f1, 4),
)
MCP Security Threat Model and Countermeasures
A systematic overview of the security threats that can arise in MCP architecture.
| Threat Type | Attack Vector | Impact | Countermeasure |
|---|---|---|---|
| Tool Poisoning | Malicious MCP Server disguised as a legitimate tool | Data exfiltration, system takeover | Server certificate validation, tool allowlisting |
| Rug Pull | Initially normal behavior, then switches to malicious | Trust-based security bypass | Continuous tool behavior monitoring, hash-based integrity checks |
| Argument Injection | Malicious payload inserted in tool arguments | SQL injection, command execution | Strict JSON Schema validation, pattern blocking |
| Excessive Privilege | Tools exposed with unnecessarily broad permissions | Unintended data access | Principle of least privilege, role-based access control |
| Cross-Context Leakage | Context from other sessions leaks | PII exposure | Session isolation, context scoping |
| Denial of Service | Server overload through mass tool calls | Service disruption | Rate limiting, per-session concurrent call limits |
MCP Server Operational Monitoring
# prometheus-rules.yaml
groups:
- name: mcp_server_monitoring
interval: 15s
rules:
- alert: MCPToolCallErrorRateHigh
expr: |
sum(rate(mcp_tool_call_total{status="error"}[5m])) by (tool_name) /
sum(rate(mcp_tool_call_total[5m])) by (tool_name) > 0.05
for: 3m
labels:
severity: warning
team: chatbot
annotations:
summary: 'Error rate for MCP tool {{ $labels.tool_name }} exceeds 5%'
description: |
Current error rate: {{ $value | humanizePercentage }}
Check the tool implementation or downstream service status.
- alert: MCPGatewayBlockRateSpike
expr: |
sum(rate(mcp_gateway_decision_total{decision="deny"}[15m])) /
sum(rate(mcp_gateway_decision_total[15m])) > 0.20
for: 5m
labels:
severity: warning
team: security
annotations:
summary: 'MCP Gateway block rate exceeds 20%'
description: 'Possible increase in attack attempts or policy misconfiguration. Check audit logs.'
- alert: MCPToolLatencyHigh
expr: |
histogram_quantile(0.95,
rate(mcp_tool_call_duration_seconds_bucket[5m])
) > 3.0
for: 3m
labels:
severity: warning
team: chatbot
annotations:
summary: 'MCP tool call p95 latency exceeds 3 seconds'
MCP Guardrail Adoption Checklist
Follow this checklist in order when introducing guardrails to an MCP-based chatbot.
Phase 1: MCP Server Security Foundation
- Apply TLS certificates to all MCP Servers
- Set
additionalProperties: falsein each tool's inputSchema - Specify type, pattern, maxLength, and enum constraints for each tool's arguments
- Deploy blocking rules for known malicious patterns (SQL injection, path traversal)
Phase 2: Gateway Policy Deployment
- Deploy MCP Gateway and confirm all tool calls are routed through it
- Configure role-based tool access control policies
- Set per-tool rate limits
- Apply human-in-the-loop confirmation flow for high-risk tools
Phase 3: Monitoring and Evaluation
- Establish audit log collection and retention policy (minimum 90 days)
- Deploy Prometheus alert rules
- Integrate red team test suite into CI
- Measure guardrail precision/recall (target: precision over 0.95, recall over 0.98)
Phase 4: Continuous Operations
- Weekly audit log review
- Monthly red team test scenario updates
- Quarterly tool permission matrix review
- Establish MCP SDK security patch application procedures
Failure Scenario Responses
Scenario 1: Tool Unavailability Due to MCP Server Connection Failure
Symptom: Chatbot fails on all tool calls. Repeatedly shows "Feature unavailable" to users.
Error log:
MCPConnectionError: Failed to connect to MCP server at unix:///tmp/mcp-customer-support.sock
Timeout after 5000ms
Root cause: MCP Server process terminated due to OOM
Resolution:
1. Restart the MCP Server process (systemd or supervisor)
2. Set memory limits and adjust OOM killer priority
3. Graceful degradation: Switch to general conversation mode without tools when MCP Server is down
4. Add health check endpoint and configure automatic restart
Scenario 2: Legitimate Requests Blocked Due to Guardrail False Positives
Symptom: A legitimate request "Please cancel my order" is blocked by the Gateway
Error log:
Gateway DENY: Blocked pattern detected - "cancel" matched "cancel" block rule
Root cause: Overly broad patterns included in blocked_argument_patterns
("cancel" keyword matches in arguments as well as tool descriptions)
Resolution:
1. Restrict pattern matching scope to argument values only (exclude tool names/descriptions)
2. Require regression testing with benign scenarios before adding new blocked patterns
3. Establish a hotfix procedure to immediately disable patterns when false positives occur
Scenario 3: Power User Inconvenience Due to Rate Limiting
Symptom: CS agents hit rate limits during bulk order lookups, causing work delays
Error log:
Gateway RATE_LIMITED: Rate limit exceeded: 30/min for 'order_lookup'
Resolution:
1. Apply role-based differential rate limits (customer: 30/min, agent: 200/min, admin: unlimited)
2. Provide a separate batch lookup tool (order_batch_lookup)
3. Display rate limit history on dashboards to provide tuning insights
Quiz
Quiz
Q1. Why should guardrails in MCP's Client-Server architecture be separated into a Gateway?
It allows policies to be managed centrally without modifying the MCP Server itself, and enables consistent application of the same security policies across multiple MCP Servers. This separates concerns between server implementation and security policy.
Q2. What is the security reason for setting additionalProperties: false in MCP inputSchema?
It prevents the LLM from generating arbitrary fields not defined in the schema, which could cause unintended behavior. For example, it blocks the insertion of fields like {"admin_bypass": true}.
Q3. What is a Tool Poisoning attack and how do you defend against it?
It is an attack where a malicious MCP Server masquerades as a legitimate tool to exfiltrate data or take over systems. Defense includes server certificate validation, tool allowlisting, and integrity hash verification of tool descriptions and behavior.
Q4. Why is recall more important than precision in guardrail evaluation?
Low recall means attacks pass through without being blocked, leading to security incidents. Low precision means legitimate requests are falsely blocked, degrading user experience but remaining a recoverable issue compared to security breaches. Therefore, the recall target (0.98) is set higher than the precision target (0.95).
Q5. How do you apply Graceful Degradation when the MCP Server fails?
When the MCP Server connection fails, disable tool calling and provide the LLM with context that "tools are currently unavailable," switching to general conversation mode. Inform users of the limited functionality and provide guidance on when to retry.
Q6. Why should rate limiting be applied differently by role?
Because the usage patterns of regular customers and CS agents are fundamentally different. Agents need bulk lookups for their work, and applying the same rate limit reduces work efficiency. Rate limit thresholds should be set based on usage pattern data analysis for each role.
Q7. Why should red team test scenarios be updated monthly?
Because new attack techniques are continuously discovered, and LLM model updates can create new patterns that bypass existing guardrails. Test coverage must be maintained by incorporating OWASP LLM Top 10 updates and new findings from the security community.