Skip to content

Split View: 챗봇: MCP 가드레일과 평가 핸드북 2026

|

챗봇: MCP 가드레일과 평가 핸드북 2026

챗봇: MCP 가드레일과 평가 핸드북 2026

MCP란 무엇이고 왜 가드레일이 필요한가

Model Context Protocol(MCP)은 Anthropic이 2024년 11월에 공개한 오픈 프로토콜로, LLM 애플리케이션이 외부 데이터 소스와 도구에 표준화된 방식으로 연결되도록 설계되었다. MCP 이전에는 각 LLM 프레임워크가 독자적인 tool/plugin 인터페이스를 사용했고, 호스트 애플리케이션이 도구마다 별도의 연동 코드를 작성해야 했다.

MCP는 Client-Server 구조를 따른다.

  • MCP Host: LLM을 내장한 애플리케이션 (Claude Desktop, IDE 확장, 챗봇 서비스)
  • MCP Client: Host 내부에서 MCP Server와의 연결을 관리하는 프로토콜 클라이언트
  • MCP Server: 외부 도구, 데이터 소스, API를 MCP 프로토콜로 노출하는 서버

MCP가 tool calling을 표준화한 만큼, 보안 위협도 표준화된 방식으로 관리해야 한다. MCP Server가 제공하는 도구의 범위가 넓어질수록, "어떤 도구를 누구에게 어떤 조건으로 허용할 것인가"라는 가드레일 설계가 핵심이 된다.

이 핸드북은 MCP 기반 챗봇의 가드레일 설계, 평가 체계, 운영 절차를 하나의 문서로 정리한다.

MCP Server 구현: 도구 노출과 접근 제어

기본 MCP Server 구현 (Python SDK)

"""
MCP Server 예시: 고객 주문 조회와 FAQ 검색 도구를 제공한다.
mcp 패키지(v1.2+)를 사용한다.
pip install mcp
"""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import json
from typing import Any

# MCP Server 인스턴스 생성
app = Server("customer-support-tools")

@app.list_tools()
async def list_tools() -> list[Tool]:
    """MCP Client가 사용 가능한 도구 목록을 요청할 때 호출된다."""
    return [
        Tool(
            name="order_lookup",
            description="주문 번호로 주문 상태를 조회합니다. 주문 번호는 ORD-로 시작하는 형식입니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "order_id": {
                        "type": "string",
                        "description": "주문 번호 (예: ORD-20260304-001)",
                        "pattern": r"^ORD-\d{8}-\d{3,6}$",
                    },
                },
                "required": ["order_id"],
                "additionalProperties": False,
            },
        ),
        Tool(
            name="faq_search",
            description="고객 지원 FAQ에서 관련 답변을 검색합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "검색할 질문",
                        "minLength": 2,
                        "maxLength": 200,
                    },
                    "category": {
                        "type": "string",
                        "enum": ["payment", "shipping", "return", "account"],
                        "description": "FAQ 카테고리",
                    },
                },
                "required": ["query"],
                "additionalProperties": False,
            },
        ),
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
    """MCP Client가 도구 실행을 요청할 때 호출된다."""
    if name == "order_lookup":
        # 실제 구현에서는 DB 조회
        order_id = arguments["order_id"]
        result = await lookup_order_from_db(order_id)
        return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]

    elif name == "faq_search":
        query = arguments["query"]
        category = arguments.get("category")
        results = await search_faq_index(query, category)
        return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False))]

    else:
        raise ValueError(f"Unknown tool: {name}")

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await app.run(read_stream, write_stream)

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

MCP 가드레일 Gateway 설계

MCP Client와 MCP Server 사이에 Gateway를 배치하여, 모든 tool call에 대한 가드레일 검증을 수행한다. 이 Gateway가 이 핸드북의 핵심 컴포넌트다.

"""
MCP Gateway: MCP Client -> Gateway -> MCP Server 사이에서
모든 tool call을 가로채어 가드레일 검증을 수행한다.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from enum import Enum
import re
import json
import logging

logger = logging.getLogger("mcp_gateway")

class GatewayDecision(Enum):
    ALLOW = "allow"
    DENY = "deny"
    REQUIRE_APPROVAL = "require_approval"
    RATE_LIMITED = "rate_limited"

@dataclass
class GatewayPolicy:
    """MCP Gateway 정책 정의"""
    # 도구별 접근 제어
    tool_permissions: Dict[str, List[str]] = field(default_factory=dict)
    # 예: {"order_lookup": ["customer", "agent", "admin"],
    #      "order_cancel": ["agent", "admin"]}

    # 도구별 호출 제한
    rate_limits: Dict[str, int] = field(default_factory=dict)
    # 예: {"order_lookup": 30, "faq_search": 60}  # per minute

    # 인자 정제 규칙
    argument_sanitizers: Dict[str, dict] = field(default_factory=dict)

    # 위험 도구 목록 (사용자 확인 필요)
    high_risk_tools: List[str] = field(default_factory=list)

    # 차단 패턴
    blocked_argument_patterns: List[str] = field(default_factory=list)

# 기본 정책
DEFAULT_POLICY = GatewayPolicy(
    tool_permissions={
        "order_lookup": ["customer", "agent", "admin"],
        "faq_search": ["customer", "agent", "admin"],
        "order_cancel": ["agent", "admin"],
        "refund_process": ["admin"],
        "user_data_export": ["admin"],
    },
    rate_limits={
        "order_lookup": 30,
        "faq_search": 60,
        "order_cancel": 5,
        "refund_process": 3,
        "user_data_export": 1,
    },
    high_risk_tools=["order_cancel", "refund_process", "user_data_export"],
    blocked_argument_patterns=[
        r"(?i)(drop|delete|truncate)\s+(table|database)",
        r"(?i)(union\s+select|or\s+1\s*=\s*1)",
        r"\.\./",  # path traversal
        r"<script",  # XSS
    ],
)

class MCPGateway:
    def __init__(self, policy: GatewayPolicy = DEFAULT_POLICY):
        self.policy = policy
        self._call_history: Dict[str, List[datetime]] = {}

    def evaluate(
        self,
        tool_name: str,
        arguments: Dict[str, Any],
        user_role: str,
        session_id: str,
    ) -> dict:
        """
        Tool call에 대한 가드레일 검증을 수행한다.
        Returns: {"decision": GatewayDecision, "reason": str, "sanitized_args": dict}
        """
        # 1. Permission 검증
        allowed_roles = self.policy.tool_permissions.get(tool_name, [])
        if not allowed_roles:
            return self._deny(f"Tool '{tool_name}' is not registered in gateway policy")
        if user_role not in allowed_roles:
            return self._deny(
                f"Role '{user_role}' is not authorized for tool '{tool_name}'. "
                f"Required: {allowed_roles}"
            )

        # 2. Rate limit 검증
        rate_limit = self.policy.rate_limits.get(tool_name, 60)
        if self._is_rate_limited(session_id, tool_name, rate_limit):
            return {
                "decision": GatewayDecision.RATE_LIMITED,
                "reason": f"Rate limit exceeded: {rate_limit}/min for '{tool_name}'",
                "sanitized_args": arguments,
            }

        # 3. Argument sanitization
        sanitized_args, blocked_patterns = self._sanitize_arguments(arguments)
        if blocked_patterns:
            logger.warning(
                f"Blocked patterns detected in tool '{tool_name}': {blocked_patterns}"
            )
            return self._deny(
                f"Potentially malicious argument patterns detected: {blocked_patterns}"
            )

        # 4. High-risk tool 확인
        if tool_name in self.policy.high_risk_tools:
            return {
                "decision": GatewayDecision.REQUIRE_APPROVAL,
                "reason": f"Tool '{tool_name}' requires user confirmation before execution",
                "sanitized_args": sanitized_args,
            }

        return {
            "decision": GatewayDecision.ALLOW,
            "reason": "All checks passed",
            "sanitized_args": sanitized_args,
        }

    def _is_rate_limited(self, session_id: str, tool_name: str, limit: int) -> bool:
        key = f"{session_id}:{tool_name}"
        now = datetime.utcnow()
        cutoff = now - timedelta(minutes=1)

        if key not in self._call_history:
            self._call_history[key] = []

        self._call_history[key] = [
            t for t in self._call_history[key] if t > cutoff
        ]

        if len(self._call_history[key]) >= limit:
            return True

        self._call_history[key].append(now)
        return False

    def _sanitize_arguments(self, arguments: Dict[str, Any]) -> tuple:
        blocked = []
        args_str = json.dumps(arguments)

        for pattern in self.policy.blocked_argument_patterns:
            if re.search(pattern, args_str):
                blocked.append(pattern)

        return arguments, blocked

    def _deny(self, reason: str) -> dict:
        return {
            "decision": GatewayDecision.DENY,
            "reason": reason,
            "sanitized_args": {},
        }

MCP 도구 평가 프레임워크

MCP Server가 제공하는 도구들이 올바르게 동작하는지, 가드레일이 공격을 차단하는지 체계적으로 평가해야 한다.

도구 정확성 평가

"""
MCP 도구의 기능적 정확성을 평가하는 테스트 스위트.
각 도구에 대해 정상 케이스, 엣지 케이스, 에러 케이스를 검증한다.
"""
import pytest
from dataclasses import dataclass
from typing import Any, Optional, List

@dataclass
class ToolTestCase:
    """단일 도구 테스트 케이스"""
    test_id: str
    tool_name: str
    arguments: dict
    expected_status: str  # success, error, validation_error
    expected_output_contains: Optional[str] = None
    expected_error_type: Optional[str] = None
    description: str = ""

TOOL_TEST_CASES = [
    # 정상 케이스
    ToolTestCase(
        test_id="order_lookup_valid",
        tool_name="order_lookup",
        arguments={"order_id": "ORD-20260304-001"},
        expected_status="success",
        expected_output_contains="order_status",
        description="유효한 주문 번호로 조회",
    ),
    # 엣지 케이스: 존재하지 않는 주문
    ToolTestCase(
        test_id="order_lookup_not_found",
        tool_name="order_lookup",
        arguments={"order_id": "ORD-99991231-999"},
        expected_status="success",
        expected_output_contains="not_found",
        description="존재하지 않는 주문 번호 조회 시 정상 응답",
    ),
    # 에러 케이스: 잘못된 형식
    ToolTestCase(
        test_id="order_lookup_invalid_format",
        tool_name="order_lookup",
        arguments={"order_id": "INVALID-FORMAT"},
        expected_status="validation_error",
        expected_error_type="pattern_mismatch",
        description="주문 번호 형식이 잘못된 경우",
    ),
    # 보안 케이스: SQL injection 시도
    ToolTestCase(
        test_id="order_lookup_sql_injection",
        tool_name="order_lookup",
        arguments={"order_id": "ORD-20260304-001' OR '1'='1"},
        expected_status="validation_error",
        expected_error_type="pattern_mismatch",
        description="SQL injection 시도 차단",
    ),
    # FAQ 정상 검색
    ToolTestCase(
        test_id="faq_search_valid",
        tool_name="faq_search",
        arguments={"query": "환불 절차", "category": "return"},
        expected_status="success",
        expected_output_contains="환불",
        description="유효한 FAQ 검색",
    ),
    # FAQ 빈 결과
    ToolTestCase(
        test_id="faq_search_no_results",
        tool_name="faq_search",
        arguments={"query": "xyzzyspoon"},
        expected_status="success",
        expected_output_contains="no_results",
        description="검색 결과 없음 시 정상 빈 응답",
    ),
]

@pytest.mark.parametrize("case", TOOL_TEST_CASES, ids=lambda c: c.test_id)
async def test_mcp_tool(mcp_client, case):
    """MCP 도구의 기능적 정확성을 검증한다."""
    try:
        result = await mcp_client.call_tool(case.tool_name, case.arguments)

        if case.expected_status == "validation_error":
            pytest.fail(
                f"Expected validation error but tool executed successfully: {result}"
            )

        if case.expected_output_contains:
            result_text = str(result)
            assert case.expected_output_contains in result_text, (
                f"Expected output to contain '{case.expected_output_contains}', "
                f"got: {result_text[:200]}"
            )

    except Exception as e:
        if case.expected_status == "validation_error":
            if case.expected_error_type:
                assert case.expected_error_type in str(type(e).__name__).lower() or \
                       case.expected_error_type in str(e).lower(), (
                    f"Expected error type '{case.expected_error_type}', got: {type(e).__name__}: {e}"
                )
        else:
            raise

가드레일 효과 측정

"""
MCP Gateway 가드레일의 효과를 정량적으로 측정한다.
공격 시나리오 데이터셋에 대한 차단율, 오탐율을 계산한다.
"""
from typing import List, Dict
from dataclasses import dataclass

@dataclass
class GuardrailEvalResult:
    total_scenarios: int
    true_positives: int     # 공격을 정상 차단
    false_positives: int    # 정상을 오차단
    true_negatives: int     # 정상을 정상 통과
    false_negatives: int    # 공격을 미차단
    precision: float
    recall: float
    f1_score: float

def evaluate_guardrails(
    gateway,
    attack_scenarios: List[dict],
    benign_scenarios: List[dict],
) -> GuardrailEvalResult:
    """
    공격 시나리오와 정상 시나리오에 대해 가드레일의 정밀도/재현율을 측정한다.
    """
    tp = fp = tn = fn = 0

    # 공격 시나리오: 차단되어야 정상
    for scenario in attack_scenarios:
        result = gateway.evaluate(
            tool_name=scenario["tool_name"],
            arguments=scenario["arguments"],
            user_role=scenario.get("user_role", "customer"),
            session_id=scenario.get("session_id", "eval"),
        )
        if result["decision"] in (GatewayDecision.DENY, GatewayDecision.RATE_LIMITED):
            tp += 1
        else:
            fn += 1
            print(f"MISS: Attack scenario not blocked: {scenario.get('name', 'unknown')}")

    # 정상 시나리오: 통과되어야 정상
    for scenario in benign_scenarios:
        result = gateway.evaluate(
            tool_name=scenario["tool_name"],
            arguments=scenario["arguments"],
            user_role=scenario.get("user_role", "customer"),
            session_id=scenario.get("session_id", "eval"),
        )
        if result["decision"] == GatewayDecision.ALLOW:
            tn += 1
        elif result["decision"] == GatewayDecision.REQUIRE_APPROVAL:
            tn += 1  # high-risk 도구의 확인 요청은 정상 동작
        else:
            fp += 1
            print(f"FALSE POSITIVE: Benign scenario blocked: {scenario.get('name', 'unknown')}")

    total = tp + fp + tn + fn
    precision = tp / max(tp + fp, 1)
    recall = tp / max(tp + fn, 1)
    f1 = 2 * precision * recall / max(precision + recall, 1e-9)

    return GuardrailEvalResult(
        total_scenarios=total,
        true_positives=tp,
        false_positives=fp,
        true_negatives=tn,
        false_negatives=fn,
        precision=round(precision, 4),
        recall=round(recall, 4),
        f1_score=round(f1, 4),
    )

MCP 보안 위협 모델과 대응

MCP 아키텍처에서 발생할 수 있는 보안 위협을 체계적으로 정리한다.

위협 유형공격 벡터영향대응 방법
Tool Poisoning악성 MCP Server가 정상 도구로 위장데이터 유출, 시스템 장악Server 인증서 검증, 도구 화이트리스트
Rug Pull초기에 정상 동작 후 악성 동작으로 전환신뢰 기반 보안 우회도구 동작 지속 모니터링, 해시 기반 무결성 검증
Argument Injection도구 인자에 악성 페이로드 삽입SQL injection, 명령어 실행JSON Schema 엄격 검증, 패턴 차단
Excessive Privilege불필요하게 넓은 권한의 도구 노출의도하지 않은 데이터 접근최소 권한 원칙, 역할 기반 접근 제어
Cross-Context Leakage다른 세션의 컨텍스트가 유출개인정보 노출세션 격리, 컨텍스트 스코핑
Denial of Service대량 도구 호출로 서버 과부하서비스 중단Rate limiting, 세션별 동시 호출 제한

MCP Server 운영 모니터링

# prometheus-rules.yaml
groups:
  - name: mcp_server_monitoring
    interval: 15s
    rules:
      - alert: MCPToolCallErrorRateHigh
        expr: |
          sum(rate(mcp_tool_call_total{status="error"}[5m])) by (tool_name) /
          sum(rate(mcp_tool_call_total[5m])) by (tool_name) > 0.05
        for: 3m
        labels:
          severity: warning
          team: chatbot
        annotations:
          summary: 'MCP 도구 {{ $labels.tool_name }}의 에러율이 5%를 초과합니다'
          description: |
            현재 에러율: {{ $value | humanizePercentage }}
            도구 구현 또는 하위 서비스 상태를 확인하세요.

      - alert: MCPGatewayBlockRateSpike
        expr: |
          sum(rate(mcp_gateway_decision_total{decision="deny"}[15m])) /
          sum(rate(mcp_gateway_decision_total[15m])) > 0.20
        for: 5m
        labels:
          severity: warning
          team: security
        annotations:
          summary: 'MCP Gateway 차단율이 20%를 초과합니다'
          description: '공격 시도 증가 또는 정책 오설정 가능성. 감사 로그를 확인하세요.'

      - alert: MCPToolLatencyHigh
        expr: |
          histogram_quantile(0.95,
            rate(mcp_tool_call_duration_seconds_bucket[5m])
          ) > 3.0
        for: 3m
        labels:
          severity: warning
          team: chatbot
        annotations:
          summary: 'MCP 도구 호출 p95 지연이 3초를 초과합니다'

MCP 가드레일 도입 체크리스트

MCP 기반 챗봇에 가드레일을 도입할 때 순서대로 점검한다.

Phase 1: MCP Server 보안 기반

  • 모든 MCP Server에 TLS 인증서 적용
  • 도구별 inputSchema에 additionalProperties: false 설정
  • 도구별 인자의 type, pattern, maxLength, enum 제약 조건 명시
  • 알려진 악성 패턴(SQL injection, path traversal) 차단 규칙 배포

Phase 2: Gateway 정책 배포

  • MCP Gateway 배포 및 모든 tool call 프록시 경유 확인
  • 역할 기반 도구 접근 제어 정책 설정
  • 도구별 rate limit 설정
  • 고위험 도구에 human-in-the-loop 확인 플로우 적용

Phase 3: 모니터링과 평가

  • 감사 로그 수집 및 보관 정책 수립 (최소 90일)
  • Prometheus 알림 규칙 배포
  • Red team 테스트 스위트 CI 통합
  • 가드레일 정밀도/재현율 측정 (목표: precision > 0.95, recall > 0.98)

Phase 4: 지속 운영

  • 주 1회 감사 로그 리뷰
  • 월 1회 red team 테스트 시나리오 업데이트
  • 분기 1회 도구 권한 매트릭스 리뷰
  • MCP SDK 보안 패치 적용 절차 수립

장애 시나리오별 대응

시나리오 1: MCP Server 연결 실패로 도구 사용 불가

증상: 챗봇이 모든 도구 호출에 실패. 사용자에게 "기능을 사용할 수 없습니다" 반복 표시.
에러 로그:
  MCPConnectionError: Failed to connect to MCP server at unix:///tmp/mcp-customer-support.sock
  Timeout after 5000ms

원인: MCP Server 프로세스가 OOM으로 종료됨

해결:
  1. MCP Server 프로세스 재시작 (systemd 또는 supervisor)
  2. 메모리 제한 설정 및 OOM killer 우선순위 조정
  3. Graceful degradation: MCP Server 장애 시 도구 없이 일반 대화 모드로 전환
  4. Health check 엔드포인트 추가 및 자동 재시작 설정

시나리오 2: 가드레일 오탐으로 정상 요청 차단

증상: "주문 취소해 주세요"라는 정상 요청이 Gateway에서 차단됨
에러 로그:
  Gateway DENY: Blocked pattern detected - "취소" matched "cancel" block rule

원인: blocked_argument_patterns에 과도하게 넓은 패턴이 포함됨
     ("cancel"이라는 단어가 인자뿐 아니라 도구 설명에서도 매칭)

해결:
  1. 패턴 매칭 범위를 인자 값으로 한정 (도구 이름/설명 제외)
  2. 차단 패턴 추가 시 정상 시나리오 회귀 테스트 필수화
  3. 오탐 발생 시 즉시 패턴 비활성화할 수 있는 핫픽스 절차 마련

시나리오 3: Rate limiting으로 파워 유저 불편

증상: CS 상담사가 대량 주문 조회 시 rate limit에 걸려 업무 지연
에러 로그:
  Gateway RATE_LIMITED: Rate limit exceeded: 30/min for 'order_lookup'

해결:
  1. 역할별 rate limit 차등 적용 (customer: 30/min, agent: 200/min, admin: 무제한)
  2. 배치 조회 도구 (order_batch_lookup) 별도 제공
  3. Rate limit 히스토리를 대시보드에 표시하여 임계치 튜닝 근거 확보

퀴즈

퀴즈

Q1. MCP의 Client-Server 구조에서 가드레일을 Gateway로 분리하는 이유는?

||MCP Server 자체를 수정하지 않고도 정책을 중앙에서 관리할 수 있고, 여러 MCP Server에 동일한 보안 정책을 일관되게 적용할 수 있기 때문이다. Server 구현과 보안 정책의 관심사를 분리한다.||

Q2. MCP inputSchema에 additionalProperties: false를 설정해야 하는 보안상 이유는?

||LLM이 스키마에 정의되지 않은 임의의 필드를 생성하여 의도하지 않은 동작을 유발하는 것을 방지한다. 예를 들어 {"admin_bypass": true} 같은 필드가 삽입되는 것을 차단한다.||

Q3. Tool Poisoning 공격이란 무엇이고 어떻게 방어하는가?

||악성 MCP Server가 정상 도구로 위장하여 데이터를 유출하거나 시스템을 장악하는 공격이다. Server 인증서 검증, 도구 화이트리스트, 도구 설명/동작의 무결성 해시 검증으로 방어한다.||

Q4. 가드레일 평가에서 recall이 precision보다 더 중요한 이유는?

||Recall이 낮으면 공격이 차단되지 않고 통과하는 것이므로 보안 사고로 이어진다. Precision이 낮으면 정상 요청이 오차단되어 사용자 경험이 나빠지지만, 보안 사고보다는 복구 가능한 문제다. 따라서 recall 목표(0.98)를 precision 목표(0.95)보다 높게 설정한다.||

Q5. MCP Server 장애 시 Graceful Degradation을 적용하는 방법은?

||MCP Server 연결 실패 시 도구 호출 기능을 비활성화하고, LLM에게 "현재 도구 사용이 불가능합니다"라는 컨텍스트를 제공하여 일반 대화 모드로 전환한다. 사용자에게는 기능 제한을 명시하고 재시도 시점을 안내한다.||

Q6. Rate limiting을 역할별로 차등 적용해야 하는 이유는?

||일반 고객과 CS 상담사의 사용 패턴이 근본적으로 다르기 때문이다. 상담사는 업무상 대량 조회가 필요하고, 동일한 rate limit을 적용하면 업무 효율이 저하된다. 역할별 사용 패턴 데이터를 분석하여 적정 임계치를 설정해야 한다.||

Q7. Red team 테스트 시나리오를 월 1회 업데이트해야 하는 이유는?

||새로운 공격 기법이 지속적으로 발견되고, LLM 모델 업데이트로 기존 가드레일을 우회하는 새로운 패턴이 생길 수 있기 때문이다. OWASP LLM Top 10 업데이트, 보안 커뮤니티의 새로운 발견을 반영하여 테스트 커버리지를 유지해야 한다.||

References

Chatbot: MCP Guardrails and Evaluation Handbook 2026

Chatbot: MCP Guardrails and Evaluation Handbook 2026

What Is MCP and Why Are Guardrails Needed

Model Context Protocol (MCP) is an open protocol released by Anthropic in November 2024, designed to provide a standardized way for LLM applications to connect with external data sources and tools. Before MCP, each LLM framework used its own proprietary tool/plugin interface, requiring host applications to write separate integration code for each tool.

MCP follows a Client-Server architecture.

  • MCP Host: An application with an embedded LLM (Claude Desktop, IDE extensions, chatbot services)
  • MCP Client: A protocol client within the Host that manages the connection with the MCP Server
  • MCP Server: A server that exposes external tools, data sources, and APIs via the MCP protocol

Since MCP standardizes tool calling, security threats must also be managed in a standardized manner. As the range of tools provided by MCP Servers widens, designing guardrails around "which tools to allow for whom under what conditions" becomes critical.

This handbook consolidates guardrail design, evaluation frameworks, and operational procedures for MCP-based chatbots into a single document.

MCP Server Implementation: Tool Exposure and Access Control

Basic MCP Server Implementation (Python SDK)

"""
MCP Server example: provides customer order lookup and FAQ search tools.
Uses the mcp package (v1.2+).
pip install mcp
"""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import json
from typing import Any

# Create MCP Server instance
app = Server("customer-support-tools")

@app.list_tools()
async def list_tools() -> list[Tool]:
    """Called when the MCP Client requests the list of available tools."""
    return [
        Tool(
            name="order_lookup",
            description="Looks up order status by order number. Order numbers follow the ORD- prefix format.",
            inputSchema={
                "type": "object",
                "properties": {
                    "order_id": {
                        "type": "string",
                        "description": "Order number (e.g., ORD-20260304-001)",
                        "pattern": r"^ORD-\d{8}-\d{3,6}$",
                    },
                },
                "required": ["order_id"],
                "additionalProperties": False,
            },
        ),
        Tool(
            name="faq_search",
            description="Searches for relevant answers in the customer support FAQ.",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "The question to search for",
                        "minLength": 2,
                        "maxLength": 200,
                    },
                    "category": {
                        "type": "string",
                        "enum": ["payment", "shipping", "return", "account"],
                        "description": "FAQ category",
                    },
                },
                "required": ["query"],
                "additionalProperties": False,
            },
        ),
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
    """Called when the MCP Client requests tool execution."""
    if name == "order_lookup":
        # In a real implementation, this would query a DB
        order_id = arguments["order_id"]
        result = await lookup_order_from_db(order_id)
        return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]

    elif name == "faq_search":
        query = arguments["query"]
        category = arguments.get("category")
        results = await search_faq_index(query, category)
        return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False))]

    else:
        raise ValueError(f"Unknown tool: {name}")

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await app.run(read_stream, write_stream)

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

MCP Guardrail Gateway Design

A Gateway is placed between the MCP Client and MCP Server to perform guardrail validation on all tool calls. This Gateway is the core component of this handbook.

"""
MCP Gateway: Intercepts all tool calls between
MCP Client -> Gateway -> MCP Server and performs guardrail validation.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from enum import Enum
import re
import json
import logging

logger = logging.getLogger("mcp_gateway")

class GatewayDecision(Enum):
    ALLOW = "allow"
    DENY = "deny"
    REQUIRE_APPROVAL = "require_approval"
    RATE_LIMITED = "rate_limited"

@dataclass
class GatewayPolicy:
    """MCP Gateway policy definition"""
    # Per-tool access control
    tool_permissions: Dict[str, List[str]] = field(default_factory=dict)
    # e.g., {"order_lookup": ["customer", "agent", "admin"],
    #        "order_cancel": ["agent", "admin"]}

    # Per-tool call limits
    rate_limits: Dict[str, int] = field(default_factory=dict)
    # e.g., {"order_lookup": 30, "faq_search": 60}  # per minute

    # Argument sanitization rules
    argument_sanitizers: Dict[str, dict] = field(default_factory=dict)

    # High-risk tool list (requires user confirmation)
    high_risk_tools: List[str] = field(default_factory=list)

    # Blocked patterns
    blocked_argument_patterns: List[str] = field(default_factory=list)

# Default policy
DEFAULT_POLICY = GatewayPolicy(
    tool_permissions={
        "order_lookup": ["customer", "agent", "admin"],
        "faq_search": ["customer", "agent", "admin"],
        "order_cancel": ["agent", "admin"],
        "refund_process": ["admin"],
        "user_data_export": ["admin"],
    },
    rate_limits={
        "order_lookup": 30,
        "faq_search": 60,
        "order_cancel": 5,
        "refund_process": 3,
        "user_data_export": 1,
    },
    high_risk_tools=["order_cancel", "refund_process", "user_data_export"],
    blocked_argument_patterns=[
        r"(?i)(drop|delete|truncate)\s+(table|database)",
        r"(?i)(union\s+select|or\s+1\s*=\s*1)",
        r"\.\./",  # path traversal
        r"<script",  # XSS
    ],
)

class MCPGateway:
    def __init__(self, policy: GatewayPolicy = DEFAULT_POLICY):
        self.policy = policy
        self._call_history: Dict[str, List[datetime]] = {}

    def evaluate(
        self,
        tool_name: str,
        arguments: Dict[str, Any],
        user_role: str,
        session_id: str,
    ) -> dict:
        """
        Performs guardrail validation on a tool call.
        Returns: {"decision": GatewayDecision, "reason": str, "sanitized_args": dict}
        """
        # 1. Permission check
        allowed_roles = self.policy.tool_permissions.get(tool_name, [])
        if not allowed_roles:
            return self._deny(f"Tool '{tool_name}' is not registered in gateway policy")
        if user_role not in allowed_roles:
            return self._deny(
                f"Role '{user_role}' is not authorized for tool '{tool_name}'. "
                f"Required: {allowed_roles}"
            )

        # 2. Rate limit check
        rate_limit = self.policy.rate_limits.get(tool_name, 60)
        if self._is_rate_limited(session_id, tool_name, rate_limit):
            return {
                "decision": GatewayDecision.RATE_LIMITED,
                "reason": f"Rate limit exceeded: {rate_limit}/min for '{tool_name}'",
                "sanitized_args": arguments,
            }

        # 3. Argument sanitization
        sanitized_args, blocked_patterns = self._sanitize_arguments(arguments)
        if blocked_patterns:
            logger.warning(
                f"Blocked patterns detected in tool '{tool_name}': {blocked_patterns}"
            )
            return self._deny(
                f"Potentially malicious argument patterns detected: {blocked_patterns}"
            )

        # 4. High-risk tool confirmation
        if tool_name in self.policy.high_risk_tools:
            return {
                "decision": GatewayDecision.REQUIRE_APPROVAL,
                "reason": f"Tool '{tool_name}' requires user confirmation before execution",
                "sanitized_args": sanitized_args,
            }

        return {
            "decision": GatewayDecision.ALLOW,
            "reason": "All checks passed",
            "sanitized_args": sanitized_args,
        }

    def _is_rate_limited(self, session_id: str, tool_name: str, limit: int) -> bool:
        key = f"{session_id}:{tool_name}"
        now = datetime.utcnow()
        cutoff = now - timedelta(minutes=1)

        if key not in self._call_history:
            self._call_history[key] = []

        self._call_history[key] = [
            t for t in self._call_history[key] if t > cutoff
        ]

        if len(self._call_history[key]) >= limit:
            return True

        self._call_history[key].append(now)
        return False

    def _sanitize_arguments(self, arguments: Dict[str, Any]) -> tuple:
        blocked = []
        args_str = json.dumps(arguments)

        for pattern in self.policy.blocked_argument_patterns:
            if re.search(pattern, args_str):
                blocked.append(pattern)

        return arguments, blocked

    def _deny(self, reason: str) -> dict:
        return {
            "decision": GatewayDecision.DENY,
            "reason": reason,
            "sanitized_args": {},
        }

MCP Tool Evaluation Framework

It is essential to systematically evaluate whether the tools provided by MCP Servers function correctly and whether guardrails properly block attacks.

Tool Correctness Evaluation

"""
Test suite for evaluating the functional correctness of MCP tools.
Validates normal cases, edge cases, and error cases for each tool.
"""
import pytest
from dataclasses import dataclass
from typing import Any, Optional, List

@dataclass
class ToolTestCase:
    """Single tool test case"""
    test_id: str
    tool_name: str
    arguments: dict
    expected_status: str  # success, error, validation_error
    expected_output_contains: Optional[str] = None
    expected_error_type: Optional[str] = None
    description: str = ""

TOOL_TEST_CASES = [
    # Normal case
    ToolTestCase(
        test_id="order_lookup_valid",
        tool_name="order_lookup",
        arguments={"order_id": "ORD-20260304-001"},
        expected_status="success",
        expected_output_contains="order_status",
        description="Lookup with a valid order number",
    ),
    # Edge case: non-existent order
    ToolTestCase(
        test_id="order_lookup_not_found",
        tool_name="order_lookup",
        arguments={"order_id": "ORD-99991231-999"},
        expected_status="success",
        expected_output_contains="not_found",
        description="Returns proper response for a non-existent order number",
    ),
    # Error case: invalid format
    ToolTestCase(
        test_id="order_lookup_invalid_format",
        tool_name="order_lookup",
        arguments={"order_id": "INVALID-FORMAT"},
        expected_status="validation_error",
        expected_error_type="pattern_mismatch",
        description="Invalid order number format",
    ),
    # Security case: SQL injection attempt
    ToolTestCase(
        test_id="order_lookup_sql_injection",
        tool_name="order_lookup",
        arguments={"order_id": "ORD-20260304-001' OR '1'='1"},
        expected_status="validation_error",
        expected_error_type="pattern_mismatch",
        description="Blocks SQL injection attempt",
    ),
    # FAQ normal search
    ToolTestCase(
        test_id="faq_search_valid",
        tool_name="faq_search",
        arguments={"query": "refund process", "category": "return"},
        expected_status="success",
        expected_output_contains="refund",
        description="Valid FAQ search",
    ),
    # FAQ empty result
    ToolTestCase(
        test_id="faq_search_no_results",
        tool_name="faq_search",
        arguments={"query": "xyzzyspoon"},
        expected_status="success",
        expected_output_contains="no_results",
        description="Returns proper empty response when no results found",
    ),
]

@pytest.mark.parametrize("case", TOOL_TEST_CASES, ids=lambda c: c.test_id)
async def test_mcp_tool(mcp_client, case):
    """Verifies the functional correctness of MCP tools."""
    try:
        result = await mcp_client.call_tool(case.tool_name, case.arguments)

        if case.expected_status == "validation_error":
            pytest.fail(
                f"Expected validation error but tool executed successfully: {result}"
            )

        if case.expected_output_contains:
            result_text = str(result)
            assert case.expected_output_contains in result_text, (
                f"Expected output to contain '{case.expected_output_contains}', "
                f"got: {result_text[:200]}"
            )

    except Exception as e:
        if case.expected_status == "validation_error":
            if case.expected_error_type:
                assert case.expected_error_type in str(type(e).__name__).lower() or \
                       case.expected_error_type in str(e).lower(), (
                    f"Expected error type '{case.expected_error_type}', got: {type(e).__name__}: {e}"
                )
        else:
            raise

Guardrail Effectiveness Measurement

"""
Quantitatively measures the effectiveness of MCP Gateway guardrails.
Calculates block rate and false positive rate against an attack scenario dataset.
"""
from typing import List, Dict
from dataclasses import dataclass

@dataclass
class GuardrailEvalResult:
    total_scenarios: int
    true_positives: int     # Correctly blocked attacks
    false_positives: int    # Incorrectly blocked legitimate requests
    true_negatives: int     # Correctly allowed legitimate requests
    false_negatives: int    # Missed attacks
    precision: float
    recall: float
    f1_score: float

def evaluate_guardrails(
    gateway,
    attack_scenarios: List[dict],
    benign_scenarios: List[dict],
) -> GuardrailEvalResult:
    """
    Measures the precision/recall of guardrails against attack and benign scenarios.
    """
    tp = fp = tn = fn = 0

    # Attack scenarios: should be blocked
    for scenario in attack_scenarios:
        result = gateway.evaluate(
            tool_name=scenario["tool_name"],
            arguments=scenario["arguments"],
            user_role=scenario.get("user_role", "customer"),
            session_id=scenario.get("session_id", "eval"),
        )
        if result["decision"] in (GatewayDecision.DENY, GatewayDecision.RATE_LIMITED):
            tp += 1
        else:
            fn += 1
            print(f"MISS: Attack scenario not blocked: {scenario.get('name', 'unknown')}")

    # Benign scenarios: should be allowed
    for scenario in benign_scenarios:
        result = gateway.evaluate(
            tool_name=scenario["tool_name"],
            arguments=scenario["arguments"],
            user_role=scenario.get("user_role", "customer"),
            session_id=scenario.get("session_id", "eval"),
        )
        if result["decision"] == GatewayDecision.ALLOW:
            tn += 1
        elif result["decision"] == GatewayDecision.REQUIRE_APPROVAL:
            tn += 1  # Confirmation request for high-risk tools is normal behavior
        else:
            fp += 1
            print(f"FALSE POSITIVE: Benign scenario blocked: {scenario.get('name', 'unknown')}")

    total = tp + fp + tn + fn
    precision = tp / max(tp + fp, 1)
    recall = tp / max(tp + fn, 1)
    f1 = 2 * precision * recall / max(precision + recall, 1e-9)

    return GuardrailEvalResult(
        total_scenarios=total,
        true_positives=tp,
        false_positives=fp,
        true_negatives=tn,
        false_negatives=fn,
        precision=round(precision, 4),
        recall=round(recall, 4),
        f1_score=round(f1, 4),
    )

MCP Security Threat Model and Countermeasures

A systematic overview of the security threats that can arise in MCP architecture.

Threat TypeAttack VectorImpactCountermeasure
Tool PoisoningMalicious MCP Server disguised as a legitimate toolData exfiltration, system takeoverServer certificate validation, tool allowlisting
Rug PullInitially normal behavior, then switches to maliciousTrust-based security bypassContinuous tool behavior monitoring, hash-based integrity checks
Argument InjectionMalicious payload inserted in tool argumentsSQL injection, command executionStrict JSON Schema validation, pattern blocking
Excessive PrivilegeTools exposed with unnecessarily broad permissionsUnintended data accessPrinciple of least privilege, role-based access control
Cross-Context LeakageContext from other sessions leaksPII exposureSession isolation, context scoping
Denial of ServiceServer overload through mass tool callsService disruptionRate limiting, per-session concurrent call limits

MCP Server Operational Monitoring

# prometheus-rules.yaml
groups:
  - name: mcp_server_monitoring
    interval: 15s
    rules:
      - alert: MCPToolCallErrorRateHigh
        expr: |
          sum(rate(mcp_tool_call_total{status="error"}[5m])) by (tool_name) /
          sum(rate(mcp_tool_call_total[5m])) by (tool_name) > 0.05
        for: 3m
        labels:
          severity: warning
          team: chatbot
        annotations:
          summary: 'Error rate for MCP tool {{ $labels.tool_name }} exceeds 5%'
          description: |
            Current error rate: {{ $value | humanizePercentage }}
            Check the tool implementation or downstream service status.

      - alert: MCPGatewayBlockRateSpike
        expr: |
          sum(rate(mcp_gateway_decision_total{decision="deny"}[15m])) /
          sum(rate(mcp_gateway_decision_total[15m])) > 0.20
        for: 5m
        labels:
          severity: warning
          team: security
        annotations:
          summary: 'MCP Gateway block rate exceeds 20%'
          description: 'Possible increase in attack attempts or policy misconfiguration. Check audit logs.'

      - alert: MCPToolLatencyHigh
        expr: |
          histogram_quantile(0.95,
            rate(mcp_tool_call_duration_seconds_bucket[5m])
          ) > 3.0
        for: 3m
        labels:
          severity: warning
          team: chatbot
        annotations:
          summary: 'MCP tool call p95 latency exceeds 3 seconds'

MCP Guardrail Adoption Checklist

Follow this checklist in order when introducing guardrails to an MCP-based chatbot.

Phase 1: MCP Server Security Foundation

  • Apply TLS certificates to all MCP Servers
  • Set additionalProperties: false in each tool's inputSchema
  • Specify type, pattern, maxLength, and enum constraints for each tool's arguments
  • Deploy blocking rules for known malicious patterns (SQL injection, path traversal)

Phase 2: Gateway Policy Deployment

  • Deploy MCP Gateway and confirm all tool calls are routed through it
  • Configure role-based tool access control policies
  • Set per-tool rate limits
  • Apply human-in-the-loop confirmation flow for high-risk tools

Phase 3: Monitoring and Evaluation

  • Establish audit log collection and retention policy (minimum 90 days)
  • Deploy Prometheus alert rules
  • Integrate red team test suite into CI
  • Measure guardrail precision/recall (target: precision over 0.95, recall over 0.98)

Phase 4: Continuous Operations

  • Weekly audit log review
  • Monthly red team test scenario updates
  • Quarterly tool permission matrix review
  • Establish MCP SDK security patch application procedures

Failure Scenario Responses

Scenario 1: Tool Unavailability Due to MCP Server Connection Failure

Symptom: Chatbot fails on all tool calls. Repeatedly shows "Feature unavailable" to users.
Error log:
  MCPConnectionError: Failed to connect to MCP server at unix:///tmp/mcp-customer-support.sock
  Timeout after 5000ms

Root cause: MCP Server process terminated due to OOM

Resolution:
  1. Restart the MCP Server process (systemd or supervisor)
  2. Set memory limits and adjust OOM killer priority
  3. Graceful degradation: Switch to general conversation mode without tools when MCP Server is down
  4. Add health check endpoint and configure automatic restart

Scenario 2: Legitimate Requests Blocked Due to Guardrail False Positives

Symptom: A legitimate request "Please cancel my order" is blocked by the Gateway
Error log:
  Gateway DENY: Blocked pattern detected - "cancel" matched "cancel" block rule

Root cause: Overly broad patterns included in blocked_argument_patterns
     ("cancel" keyword matches in arguments as well as tool descriptions)

Resolution:
  1. Restrict pattern matching scope to argument values only (exclude tool names/descriptions)
  2. Require regression testing with benign scenarios before adding new blocked patterns
  3. Establish a hotfix procedure to immediately disable patterns when false positives occur

Scenario 3: Power User Inconvenience Due to Rate Limiting

Symptom: CS agents hit rate limits during bulk order lookups, causing work delays
Error log:
  Gateway RATE_LIMITED: Rate limit exceeded: 30/min for 'order_lookup'

Resolution:
  1. Apply role-based differential rate limits (customer: 30/min, agent: 200/min, admin: unlimited)
  2. Provide a separate batch lookup tool (order_batch_lookup)
  3. Display rate limit history on dashboards to provide tuning insights

Quiz

Quiz

Q1. Why should guardrails in MCP's Client-Server architecture be separated into a Gateway?

It allows policies to be managed centrally without modifying the MCP Server itself, and enables consistent application of the same security policies across multiple MCP Servers. This separates concerns between server implementation and security policy.

Q2. What is the security reason for setting additionalProperties: false in MCP inputSchema?

It prevents the LLM from generating arbitrary fields not defined in the schema, which could cause unintended behavior. For example, it blocks the insertion of fields like {"admin_bypass": true}.

Q3. What is a Tool Poisoning attack and how do you defend against it?

It is an attack where a malicious MCP Server masquerades as a legitimate tool to exfiltrate data or take over systems. Defense includes server certificate validation, tool allowlisting, and integrity hash verification of tool descriptions and behavior.

Q4. Why is recall more important than precision in guardrail evaluation?

Low recall means attacks pass through without being blocked, leading to security incidents. Low precision means legitimate requests are falsely blocked, degrading user experience but remaining a recoverable issue compared to security breaches. Therefore, the recall target (0.98) is set higher than the precision target (0.95).

Q5. How do you apply Graceful Degradation when the MCP Server fails?

When the MCP Server connection fails, disable tool calling and provide the LLM with context that "tools are currently unavailable," switching to general conversation mode. Inform users of the limited functionality and provide guidance on when to retry.

Q6. Why should rate limiting be applied differently by role?

Because the usage patterns of regular customers and CS agents are fundamentally different. Agents need bulk lookups for their work, and applying the same rate limit reduces work efficiency. Rate limit thresholds should be set based on usage pattern data analysis for each role.

Q7. Why should red team test scenarios be updated monthly?

Because new attack techniques are continuously discovered, and LLM model updates can create new patterns that bypass existing guardrails. Test coverage must be maintained by incorporating OWASP LLM Top 10 updates and new findings from the security community.

References