Skip to content
Published on

Chatbot: MCP Guardrails and Evaluation Handbook 2026

Authors
  • Name
    Twitter
Chatbot: MCP Guardrails and Evaluation Handbook 2026

What Is MCP and Why Are Guardrails Needed

Model Context Protocol (MCP) is an open protocol released by Anthropic in November 2024, designed to provide a standardized way for LLM applications to connect with external data sources and tools. Before MCP, each LLM framework used its own proprietary tool/plugin interface, requiring host applications to write separate integration code for each tool.

MCP follows a Client-Server architecture.

  • MCP Host: An application with an embedded LLM (Claude Desktop, IDE extensions, chatbot services)
  • MCP Client: A protocol client within the Host that manages the connection with the MCP Server
  • MCP Server: A server that exposes external tools, data sources, and APIs via the MCP protocol

Since MCP standardizes tool calling, security threats must also be managed in a standardized manner. As the range of tools provided by MCP Servers widens, designing guardrails around "which tools to allow for whom under what conditions" becomes critical.

This handbook consolidates guardrail design, evaluation frameworks, and operational procedures for MCP-based chatbots into a single document.

MCP Server Implementation: Tool Exposure and Access Control

Basic MCP Server Implementation (Python SDK)

"""
MCP Server example: provides customer order lookup and FAQ search tools.
Uses the mcp package (v1.2+).
pip install mcp
"""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import json
from typing import Any

# Create MCP Server instance
app = Server("customer-support-tools")

@app.list_tools()
async def list_tools() -> list[Tool]:
    """Called when the MCP Client requests the list of available tools."""
    return [
        Tool(
            name="order_lookup",
            description="Looks up order status by order number. Order numbers follow the ORD- prefix format.",
            inputSchema={
                "type": "object",
                "properties": {
                    "order_id": {
                        "type": "string",
                        "description": "Order number (e.g., ORD-20260304-001)",
                        "pattern": r"^ORD-\d{8}-\d{3,6}$",
                    },
                },
                "required": ["order_id"],
                "additionalProperties": False,
            },
        ),
        Tool(
            name="faq_search",
            description="Searches for relevant answers in the customer support FAQ.",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "The question to search for",
                        "minLength": 2,
                        "maxLength": 200,
                    },
                    "category": {
                        "type": "string",
                        "enum": ["payment", "shipping", "return", "account"],
                        "description": "FAQ category",
                    },
                },
                "required": ["query"],
                "additionalProperties": False,
            },
        ),
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
    """Called when the MCP Client requests tool execution."""
    if name == "order_lookup":
        # In a real implementation, this would query a DB
        order_id = arguments["order_id"]
        result = await lookup_order_from_db(order_id)
        return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]

    elif name == "faq_search":
        query = arguments["query"]
        category = arguments.get("category")
        results = await search_faq_index(query, category)
        return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False))]

    else:
        raise ValueError(f"Unknown tool: {name}")

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await app.run(read_stream, write_stream)

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

MCP Guardrail Gateway Design

A Gateway is placed between the MCP Client and MCP Server to perform guardrail validation on all tool calls. This Gateway is the core component of this handbook.

"""
MCP Gateway: Intercepts all tool calls between
MCP Client -> Gateway -> MCP Server and performs guardrail validation.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from enum import Enum
import re
import json
import logging

logger = logging.getLogger("mcp_gateway")

class GatewayDecision(Enum):
    ALLOW = "allow"
    DENY = "deny"
    REQUIRE_APPROVAL = "require_approval"
    RATE_LIMITED = "rate_limited"

@dataclass
class GatewayPolicy:
    """MCP Gateway policy definition"""
    # Per-tool access control
    tool_permissions: Dict[str, List[str]] = field(default_factory=dict)
    # e.g., {"order_lookup": ["customer", "agent", "admin"],
    #        "order_cancel": ["agent", "admin"]}

    # Per-tool call limits
    rate_limits: Dict[str, int] = field(default_factory=dict)
    # e.g., {"order_lookup": 30, "faq_search": 60}  # per minute

    # Argument sanitization rules
    argument_sanitizers: Dict[str, dict] = field(default_factory=dict)

    # High-risk tool list (requires user confirmation)
    high_risk_tools: List[str] = field(default_factory=list)

    # Blocked patterns
    blocked_argument_patterns: List[str] = field(default_factory=list)

# Default policy
DEFAULT_POLICY = GatewayPolicy(
    tool_permissions={
        "order_lookup": ["customer", "agent", "admin"],
        "faq_search": ["customer", "agent", "admin"],
        "order_cancel": ["agent", "admin"],
        "refund_process": ["admin"],
        "user_data_export": ["admin"],
    },
    rate_limits={
        "order_lookup": 30,
        "faq_search": 60,
        "order_cancel": 5,
        "refund_process": 3,
        "user_data_export": 1,
    },
    high_risk_tools=["order_cancel", "refund_process", "user_data_export"],
    blocked_argument_patterns=[
        r"(?i)(drop|delete|truncate)\s+(table|database)",
        r"(?i)(union\s+select|or\s+1\s*=\s*1)",
        r"\.\./",  # path traversal
        r"<script",  # XSS
    ],
)

class MCPGateway:
    def __init__(self, policy: GatewayPolicy = DEFAULT_POLICY):
        self.policy = policy
        self._call_history: Dict[str, List[datetime]] = {}

    def evaluate(
        self,
        tool_name: str,
        arguments: Dict[str, Any],
        user_role: str,
        session_id: str,
    ) -> dict:
        """
        Performs guardrail validation on a tool call.
        Returns: {"decision": GatewayDecision, "reason": str, "sanitized_args": dict}
        """
        # 1. Permission check
        allowed_roles = self.policy.tool_permissions.get(tool_name, [])
        if not allowed_roles:
            return self._deny(f"Tool '{tool_name}' is not registered in gateway policy")
        if user_role not in allowed_roles:
            return self._deny(
                f"Role '{user_role}' is not authorized for tool '{tool_name}'. "
                f"Required: {allowed_roles}"
            )

        # 2. Rate limit check
        rate_limit = self.policy.rate_limits.get(tool_name, 60)
        if self._is_rate_limited(session_id, tool_name, rate_limit):
            return {
                "decision": GatewayDecision.RATE_LIMITED,
                "reason": f"Rate limit exceeded: {rate_limit}/min for '{tool_name}'",
                "sanitized_args": arguments,
            }

        # 3. Argument sanitization
        sanitized_args, blocked_patterns = self._sanitize_arguments(arguments)
        if blocked_patterns:
            logger.warning(
                f"Blocked patterns detected in tool '{tool_name}': {blocked_patterns}"
            )
            return self._deny(
                f"Potentially malicious argument patterns detected: {blocked_patterns}"
            )

        # 4. High-risk tool confirmation
        if tool_name in self.policy.high_risk_tools:
            return {
                "decision": GatewayDecision.REQUIRE_APPROVAL,
                "reason": f"Tool '{tool_name}' requires user confirmation before execution",
                "sanitized_args": sanitized_args,
            }

        return {
            "decision": GatewayDecision.ALLOW,
            "reason": "All checks passed",
            "sanitized_args": sanitized_args,
        }

    def _is_rate_limited(self, session_id: str, tool_name: str, limit: int) -> bool:
        key = f"{session_id}:{tool_name}"
        now = datetime.utcnow()
        cutoff = now - timedelta(minutes=1)

        if key not in self._call_history:
            self._call_history[key] = []

        self._call_history[key] = [
            t for t in self._call_history[key] if t > cutoff
        ]

        if len(self._call_history[key]) >= limit:
            return True

        self._call_history[key].append(now)
        return False

    def _sanitize_arguments(self, arguments: Dict[str, Any]) -> tuple:
        blocked = []
        args_str = json.dumps(arguments)

        for pattern in self.policy.blocked_argument_patterns:
            if re.search(pattern, args_str):
                blocked.append(pattern)

        return arguments, blocked

    def _deny(self, reason: str) -> dict:
        return {
            "decision": GatewayDecision.DENY,
            "reason": reason,
            "sanitized_args": {},
        }

MCP Tool Evaluation Framework

It is essential to systematically evaluate whether the tools provided by MCP Servers function correctly and whether guardrails properly block attacks.

Tool Correctness Evaluation

"""
Test suite for evaluating the functional correctness of MCP tools.
Validates normal cases, edge cases, and error cases for each tool.
"""
import pytest
from dataclasses import dataclass
from typing import Any, Optional, List

@dataclass
class ToolTestCase:
    """Single tool test case"""
    test_id: str
    tool_name: str
    arguments: dict
    expected_status: str  # success, error, validation_error
    expected_output_contains: Optional[str] = None
    expected_error_type: Optional[str] = None
    description: str = ""

TOOL_TEST_CASES = [
    # Normal case
    ToolTestCase(
        test_id="order_lookup_valid",
        tool_name="order_lookup",
        arguments={"order_id": "ORD-20260304-001"},
        expected_status="success",
        expected_output_contains="order_status",
        description="Lookup with a valid order number",
    ),
    # Edge case: non-existent order
    ToolTestCase(
        test_id="order_lookup_not_found",
        tool_name="order_lookup",
        arguments={"order_id": "ORD-99991231-999"},
        expected_status="success",
        expected_output_contains="not_found",
        description="Returns proper response for a non-existent order number",
    ),
    # Error case: invalid format
    ToolTestCase(
        test_id="order_lookup_invalid_format",
        tool_name="order_lookup",
        arguments={"order_id": "INVALID-FORMAT"},
        expected_status="validation_error",
        expected_error_type="pattern_mismatch",
        description="Invalid order number format",
    ),
    # Security case: SQL injection attempt
    ToolTestCase(
        test_id="order_lookup_sql_injection",
        tool_name="order_lookup",
        arguments={"order_id": "ORD-20260304-001' OR '1'='1"},
        expected_status="validation_error",
        expected_error_type="pattern_mismatch",
        description="Blocks SQL injection attempt",
    ),
    # FAQ normal search
    ToolTestCase(
        test_id="faq_search_valid",
        tool_name="faq_search",
        arguments={"query": "refund process", "category": "return"},
        expected_status="success",
        expected_output_contains="refund",
        description="Valid FAQ search",
    ),
    # FAQ empty result
    ToolTestCase(
        test_id="faq_search_no_results",
        tool_name="faq_search",
        arguments={"query": "xyzzyspoon"},
        expected_status="success",
        expected_output_contains="no_results",
        description="Returns proper empty response when no results found",
    ),
]

@pytest.mark.parametrize("case", TOOL_TEST_CASES, ids=lambda c: c.test_id)
async def test_mcp_tool(mcp_client, case):
    """Verifies the functional correctness of MCP tools."""
    try:
        result = await mcp_client.call_tool(case.tool_name, case.arguments)

        if case.expected_status == "validation_error":
            pytest.fail(
                f"Expected validation error but tool executed successfully: {result}"
            )

        if case.expected_output_contains:
            result_text = str(result)
            assert case.expected_output_contains in result_text, (
                f"Expected output to contain '{case.expected_output_contains}', "
                f"got: {result_text[:200]}"
            )

    except Exception as e:
        if case.expected_status == "validation_error":
            if case.expected_error_type:
                assert case.expected_error_type in str(type(e).__name__).lower() or \
                       case.expected_error_type in str(e).lower(), (
                    f"Expected error type '{case.expected_error_type}', got: {type(e).__name__}: {e}"
                )
        else:
            raise

Guardrail Effectiveness Measurement

"""
Quantitatively measures the effectiveness of MCP Gateway guardrails.
Calculates block rate and false positive rate against an attack scenario dataset.
"""
from typing import List, Dict
from dataclasses import dataclass

@dataclass
class GuardrailEvalResult:
    total_scenarios: int
    true_positives: int     # Correctly blocked attacks
    false_positives: int    # Incorrectly blocked legitimate requests
    true_negatives: int     # Correctly allowed legitimate requests
    false_negatives: int    # Missed attacks
    precision: float
    recall: float
    f1_score: float

def evaluate_guardrails(
    gateway,
    attack_scenarios: List[dict],
    benign_scenarios: List[dict],
) -> GuardrailEvalResult:
    """
    Measures the precision/recall of guardrails against attack and benign scenarios.
    """
    tp = fp = tn = fn = 0

    # Attack scenarios: should be blocked
    for scenario in attack_scenarios:
        result = gateway.evaluate(
            tool_name=scenario["tool_name"],
            arguments=scenario["arguments"],
            user_role=scenario.get("user_role", "customer"),
            session_id=scenario.get("session_id", "eval"),
        )
        if result["decision"] in (GatewayDecision.DENY, GatewayDecision.RATE_LIMITED):
            tp += 1
        else:
            fn += 1
            print(f"MISS: Attack scenario not blocked: {scenario.get('name', 'unknown')}")

    # Benign scenarios: should be allowed
    for scenario in benign_scenarios:
        result = gateway.evaluate(
            tool_name=scenario["tool_name"],
            arguments=scenario["arguments"],
            user_role=scenario.get("user_role", "customer"),
            session_id=scenario.get("session_id", "eval"),
        )
        if result["decision"] == GatewayDecision.ALLOW:
            tn += 1
        elif result["decision"] == GatewayDecision.REQUIRE_APPROVAL:
            tn += 1  # Confirmation request for high-risk tools is normal behavior
        else:
            fp += 1
            print(f"FALSE POSITIVE: Benign scenario blocked: {scenario.get('name', 'unknown')}")

    total = tp + fp + tn + fn
    precision = tp / max(tp + fp, 1)
    recall = tp / max(tp + fn, 1)
    f1 = 2 * precision * recall / max(precision + recall, 1e-9)

    return GuardrailEvalResult(
        total_scenarios=total,
        true_positives=tp,
        false_positives=fp,
        true_negatives=tn,
        false_negatives=fn,
        precision=round(precision, 4),
        recall=round(recall, 4),
        f1_score=round(f1, 4),
    )

MCP Security Threat Model and Countermeasures

A systematic overview of the security threats that can arise in MCP architecture.

Threat TypeAttack VectorImpactCountermeasure
Tool PoisoningMalicious MCP Server disguised as a legitimate toolData exfiltration, system takeoverServer certificate validation, tool allowlisting
Rug PullInitially normal behavior, then switches to maliciousTrust-based security bypassContinuous tool behavior monitoring, hash-based integrity checks
Argument InjectionMalicious payload inserted in tool argumentsSQL injection, command executionStrict JSON Schema validation, pattern blocking
Excessive PrivilegeTools exposed with unnecessarily broad permissionsUnintended data accessPrinciple of least privilege, role-based access control
Cross-Context LeakageContext from other sessions leaksPII exposureSession isolation, context scoping
Denial of ServiceServer overload through mass tool callsService disruptionRate limiting, per-session concurrent call limits

MCP Server Operational Monitoring

# prometheus-rules.yaml
groups:
  - name: mcp_server_monitoring
    interval: 15s
    rules:
      - alert: MCPToolCallErrorRateHigh
        expr: |
          sum(rate(mcp_tool_call_total{status="error"}[5m])) by (tool_name) /
          sum(rate(mcp_tool_call_total[5m])) by (tool_name) > 0.05
        for: 3m
        labels:
          severity: warning
          team: chatbot
        annotations:
          summary: 'Error rate for MCP tool {{ $labels.tool_name }} exceeds 5%'
          description: |
            Current error rate: {{ $value | humanizePercentage }}
            Check the tool implementation or downstream service status.

      - alert: MCPGatewayBlockRateSpike
        expr: |
          sum(rate(mcp_gateway_decision_total{decision="deny"}[15m])) /
          sum(rate(mcp_gateway_decision_total[15m])) > 0.20
        for: 5m
        labels:
          severity: warning
          team: security
        annotations:
          summary: 'MCP Gateway block rate exceeds 20%'
          description: 'Possible increase in attack attempts or policy misconfiguration. Check audit logs.'

      - alert: MCPToolLatencyHigh
        expr: |
          histogram_quantile(0.95,
            rate(mcp_tool_call_duration_seconds_bucket[5m])
          ) > 3.0
        for: 3m
        labels:
          severity: warning
          team: chatbot
        annotations:
          summary: 'MCP tool call p95 latency exceeds 3 seconds'

MCP Guardrail Adoption Checklist

Follow this checklist in order when introducing guardrails to an MCP-based chatbot.

Phase 1: MCP Server Security Foundation

  • Apply TLS certificates to all MCP Servers
  • Set additionalProperties: false in each tool's inputSchema
  • Specify type, pattern, maxLength, and enum constraints for each tool's arguments
  • Deploy blocking rules for known malicious patterns (SQL injection, path traversal)

Phase 2: Gateway Policy Deployment

  • Deploy MCP Gateway and confirm all tool calls are routed through it
  • Configure role-based tool access control policies
  • Set per-tool rate limits
  • Apply human-in-the-loop confirmation flow for high-risk tools

Phase 3: Monitoring and Evaluation

  • Establish audit log collection and retention policy (minimum 90 days)
  • Deploy Prometheus alert rules
  • Integrate red team test suite into CI
  • Measure guardrail precision/recall (target: precision over 0.95, recall over 0.98)

Phase 4: Continuous Operations

  • Weekly audit log review
  • Monthly red team test scenario updates
  • Quarterly tool permission matrix review
  • Establish MCP SDK security patch application procedures

Failure Scenario Responses

Scenario 1: Tool Unavailability Due to MCP Server Connection Failure

Symptom: Chatbot fails on all tool calls. Repeatedly shows "Feature unavailable" to users.
Error log:
  MCPConnectionError: Failed to connect to MCP server at unix:///tmp/mcp-customer-support.sock
  Timeout after 5000ms

Root cause: MCP Server process terminated due to OOM

Resolution:
  1. Restart the MCP Server process (systemd or supervisor)
  2. Set memory limits and adjust OOM killer priority
  3. Graceful degradation: Switch to general conversation mode without tools when MCP Server is down
  4. Add health check endpoint and configure automatic restart

Scenario 2: Legitimate Requests Blocked Due to Guardrail False Positives

Symptom: A legitimate request "Please cancel my order" is blocked by the Gateway
Error log:
  Gateway DENY: Blocked pattern detected - "cancel" matched "cancel" block rule

Root cause: Overly broad patterns included in blocked_argument_patterns
     ("cancel" keyword matches in arguments as well as tool descriptions)

Resolution:
  1. Restrict pattern matching scope to argument values only (exclude tool names/descriptions)
  2. Require regression testing with benign scenarios before adding new blocked patterns
  3. Establish a hotfix procedure to immediately disable patterns when false positives occur

Scenario 3: Power User Inconvenience Due to Rate Limiting

Symptom: CS agents hit rate limits during bulk order lookups, causing work delays
Error log:
  Gateway RATE_LIMITED: Rate limit exceeded: 30/min for 'order_lookup'

Resolution:
  1. Apply role-based differential rate limits (customer: 30/min, agent: 200/min, admin: unlimited)
  2. Provide a separate batch lookup tool (order_batch_lookup)
  3. Display rate limit history on dashboards to provide tuning insights

Quiz

Quiz

Q1. Why should guardrails in MCP's Client-Server architecture be separated into a Gateway?

It allows policies to be managed centrally without modifying the MCP Server itself, and enables consistent application of the same security policies across multiple MCP Servers. This separates concerns between server implementation and security policy.

Q2. What is the security reason for setting additionalProperties: false in MCP inputSchema?

It prevents the LLM from generating arbitrary fields not defined in the schema, which could cause unintended behavior. For example, it blocks the insertion of fields like {"admin_bypass": true}.

Q3. What is a Tool Poisoning attack and how do you defend against it?

It is an attack where a malicious MCP Server masquerades as a legitimate tool to exfiltrate data or take over systems. Defense includes server certificate validation, tool allowlisting, and integrity hash verification of tool descriptions and behavior.

Q4. Why is recall more important than precision in guardrail evaluation?

Low recall means attacks pass through without being blocked, leading to security incidents. Low precision means legitimate requests are falsely blocked, degrading user experience but remaining a recoverable issue compared to security breaches. Therefore, the recall target (0.98) is set higher than the precision target (0.95).

Q5. How do you apply Graceful Degradation when the MCP Server fails?

When the MCP Server connection fails, disable tool calling and provide the LLM with context that "tools are currently unavailable," switching to general conversation mode. Inform users of the limited functionality and provide guidance on when to retry.

Q6. Why should rate limiting be applied differently by role?

Because the usage patterns of regular customers and CS agents are fundamentally different. Agents need bulk lookups for their work, and applying the same rate limit reduces work efficiency. Rate limit thresholds should be set based on usage pattern data analysis for each role.

Q7. Why should red team test scenarios be updated monthly?

Because new attack techniques are continuously discovered, and LLM model updates can create new patterns that bypass existing guardrails. Test coverage must be maintained by incorporating OWASP LLM Top 10 updates and new findings from the security community.

References