- Authors
- Name

- What Is MCP and Why Are Guardrails Needed
- MCP Server Implementation: Tool Exposure and Access Control
- MCP Guardrail Gateway Design
- MCP Tool Evaluation Framework
- MCP Security Threat Model and Countermeasures
- MCP Server Operational Monitoring
- MCP Guardrail Adoption Checklist
- Failure Scenario Responses
- Quiz
- References
What Is MCP and Why Are Guardrails Needed
Model Context Protocol (MCP) is an open protocol released by Anthropic in November 2024, designed to provide a standardized way for LLM applications to connect with external data sources and tools. Before MCP, each LLM framework used its own proprietary tool/plugin interface, requiring host applications to write separate integration code for each tool.
MCP follows a Client-Server architecture.
- MCP Host: An application with an embedded LLM (Claude Desktop, IDE extensions, chatbot services)
- MCP Client: A protocol client within the Host that manages the connection with the MCP Server
- MCP Server: A server that exposes external tools, data sources, and APIs via the MCP protocol
Since MCP standardizes tool calling, security threats must also be managed in a standardized manner. As the range of tools provided by MCP Servers widens, designing guardrails around "which tools to allow for whom under what conditions" becomes critical.
This handbook consolidates guardrail design, evaluation frameworks, and operational procedures for MCP-based chatbots into a single document.
MCP Server Implementation: Tool Exposure and Access Control
Basic MCP Server Implementation (Python SDK)
"""
MCP Server example: provides customer order lookup and FAQ search tools.
Uses the mcp package (v1.2+).
pip install mcp
"""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import json
from typing import Any
# Create MCP Server instance
app = Server("customer-support-tools")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""Called when the MCP Client requests the list of available tools."""
return [
Tool(
name="order_lookup",
description="Looks up order status by order number. Order numbers follow the ORD- prefix format.",
inputSchema={
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "Order number (e.g., ORD-20260304-001)",
"pattern": r"^ORD-\d{8}-\d{3,6}$",
},
},
"required": ["order_id"],
"additionalProperties": False,
},
),
Tool(
name="faq_search",
description="Searches for relevant answers in the customer support FAQ.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The question to search for",
"minLength": 2,
"maxLength": 200,
},
"category": {
"type": "string",
"enum": ["payment", "shipping", "return", "account"],
"description": "FAQ category",
},
},
"required": ["query"],
"additionalProperties": False,
},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
"""Called when the MCP Client requests tool execution."""
if name == "order_lookup":
# In a real implementation, this would query a DB
order_id = arguments["order_id"]
result = await lookup_order_from_db(order_id)
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]
elif name == "faq_search":
query = arguments["query"]
category = arguments.get("category")
results = await search_faq_index(query, category)
return [TextContent(type="text", text=json.dumps(results, ensure_ascii=False))]
else:
raise ValueError(f"Unknown tool: {name}")
async def main():
async with stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
MCP Guardrail Gateway Design
A Gateway is placed between the MCP Client and MCP Server to perform guardrail validation on all tool calls. This Gateway is the core component of this handbook.
"""
MCP Gateway: Intercepts all tool calls between
MCP Client -> Gateway -> MCP Server and performs guardrail validation.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from enum import Enum
import re
import json
import logging
logger = logging.getLogger("mcp_gateway")
class GatewayDecision(Enum):
ALLOW = "allow"
DENY = "deny"
REQUIRE_APPROVAL = "require_approval"
RATE_LIMITED = "rate_limited"
@dataclass
class GatewayPolicy:
"""MCP Gateway policy definition"""
# Per-tool access control
tool_permissions: Dict[str, List[str]] = field(default_factory=dict)
# e.g., {"order_lookup": ["customer", "agent", "admin"],
# "order_cancel": ["agent", "admin"]}
# Per-tool call limits
rate_limits: Dict[str, int] = field(default_factory=dict)
# e.g., {"order_lookup": 30, "faq_search": 60} # per minute
# Argument sanitization rules
argument_sanitizers: Dict[str, dict] = field(default_factory=dict)
# High-risk tool list (requires user confirmation)
high_risk_tools: List[str] = field(default_factory=list)
# Blocked patterns
blocked_argument_patterns: List[str] = field(default_factory=list)
# Default policy
DEFAULT_POLICY = GatewayPolicy(
tool_permissions={
"order_lookup": ["customer", "agent", "admin"],
"faq_search": ["customer", "agent", "admin"],
"order_cancel": ["agent", "admin"],
"refund_process": ["admin"],
"user_data_export": ["admin"],
},
rate_limits={
"order_lookup": 30,
"faq_search": 60,
"order_cancel": 5,
"refund_process": 3,
"user_data_export": 1,
},
high_risk_tools=["order_cancel", "refund_process", "user_data_export"],
blocked_argument_patterns=[
r"(?i)(drop|delete|truncate)\s+(table|database)",
r"(?i)(union\s+select|or\s+1\s*=\s*1)",
r"\.\./", # path traversal
r"<script", # XSS
],
)
class MCPGateway:
def __init__(self, policy: GatewayPolicy = DEFAULT_POLICY):
self.policy = policy
self._call_history: Dict[str, List[datetime]] = {}
def evaluate(
self,
tool_name: str,
arguments: Dict[str, Any],
user_role: str,
session_id: str,
) -> dict:
"""
Performs guardrail validation on a tool call.
Returns: {"decision": GatewayDecision, "reason": str, "sanitized_args": dict}
"""
# 1. Permission check
allowed_roles = self.policy.tool_permissions.get(tool_name, [])
if not allowed_roles:
return self._deny(f"Tool '{tool_name}' is not registered in gateway policy")
if user_role not in allowed_roles:
return self._deny(
f"Role '{user_role}' is not authorized for tool '{tool_name}'. "
f"Required: {allowed_roles}"
)
# 2. Rate limit check
rate_limit = self.policy.rate_limits.get(tool_name, 60)
if self._is_rate_limited(session_id, tool_name, rate_limit):
return {
"decision": GatewayDecision.RATE_LIMITED,
"reason": f"Rate limit exceeded: {rate_limit}/min for '{tool_name}'",
"sanitized_args": arguments,
}
# 3. Argument sanitization
sanitized_args, blocked_patterns = self._sanitize_arguments(arguments)
if blocked_patterns:
logger.warning(
f"Blocked patterns detected in tool '{tool_name}': {blocked_patterns}"
)
return self._deny(
f"Potentially malicious argument patterns detected: {blocked_patterns}"
)
# 4. High-risk tool confirmation
if tool_name in self.policy.high_risk_tools:
return {
"decision": GatewayDecision.REQUIRE_APPROVAL,
"reason": f"Tool '{tool_name}' requires user confirmation before execution",
"sanitized_args": sanitized_args,
}
return {
"decision": GatewayDecision.ALLOW,
"reason": "All checks passed",
"sanitized_args": sanitized_args,
}
def _is_rate_limited(self, session_id: str, tool_name: str, limit: int) -> bool:
key = f"{session_id}:{tool_name}"
now = datetime.utcnow()
cutoff = now - timedelta(minutes=1)
if key not in self._call_history:
self._call_history[key] = []
self._call_history[key] = [
t for t in self._call_history[key] if t > cutoff
]
if len(self._call_history[key]) >= limit:
return True
self._call_history[key].append(now)
return False
def _sanitize_arguments(self, arguments: Dict[str, Any]) -> tuple:
blocked = []
args_str = json.dumps(arguments)
for pattern in self.policy.blocked_argument_patterns:
if re.search(pattern, args_str):
blocked.append(pattern)
return arguments, blocked
def _deny(self, reason: str) -> dict:
return {
"decision": GatewayDecision.DENY,
"reason": reason,
"sanitized_args": {},
}
MCP Tool Evaluation Framework
It is essential to systematically evaluate whether the tools provided by MCP Servers function correctly and whether guardrails properly block attacks.
Tool Correctness Evaluation
"""
Test suite for evaluating the functional correctness of MCP tools.
Validates normal cases, edge cases, and error cases for each tool.
"""
import pytest
from dataclasses import dataclass
from typing import Any, Optional, List
@dataclass
class ToolTestCase:
"""Single tool test case"""
test_id: str
tool_name: str
arguments: dict
expected_status: str # success, error, validation_error
expected_output_contains: Optional[str] = None
expected_error_type: Optional[str] = None
description: str = ""
TOOL_TEST_CASES = [
# Normal case
ToolTestCase(
test_id="order_lookup_valid",
tool_name="order_lookup",
arguments={"order_id": "ORD-20260304-001"},
expected_status="success",
expected_output_contains="order_status",
description="Lookup with a valid order number",
),
# Edge case: non-existent order
ToolTestCase(
test_id="order_lookup_not_found",
tool_name="order_lookup",
arguments={"order_id": "ORD-99991231-999"},
expected_status="success",
expected_output_contains="not_found",
description="Returns proper response for a non-existent order number",
),
# Error case: invalid format
ToolTestCase(
test_id="order_lookup_invalid_format",
tool_name="order_lookup",
arguments={"order_id": "INVALID-FORMAT"},
expected_status="validation_error",
expected_error_type="pattern_mismatch",
description="Invalid order number format",
),
# Security case: SQL injection attempt
ToolTestCase(
test_id="order_lookup_sql_injection",
tool_name="order_lookup",
arguments={"order_id": "ORD-20260304-001' OR '1'='1"},
expected_status="validation_error",
expected_error_type="pattern_mismatch",
description="Blocks SQL injection attempt",
),
# FAQ normal search
ToolTestCase(
test_id="faq_search_valid",
tool_name="faq_search",
arguments={"query": "refund process", "category": "return"},
expected_status="success",
expected_output_contains="refund",
description="Valid FAQ search",
),
# FAQ empty result
ToolTestCase(
test_id="faq_search_no_results",
tool_name="faq_search",
arguments={"query": "xyzzyspoon"},
expected_status="success",
expected_output_contains="no_results",
description="Returns proper empty response when no results found",
),
]
@pytest.mark.parametrize("case", TOOL_TEST_CASES, ids=lambda c: c.test_id)
async def test_mcp_tool(mcp_client, case):
"""Verifies the functional correctness of MCP tools."""
try:
result = await mcp_client.call_tool(case.tool_name, case.arguments)
if case.expected_status == "validation_error":
pytest.fail(
f"Expected validation error but tool executed successfully: {result}"
)
if case.expected_output_contains:
result_text = str(result)
assert case.expected_output_contains in result_text, (
f"Expected output to contain '{case.expected_output_contains}', "
f"got: {result_text[:200]}"
)
except Exception as e:
if case.expected_status == "validation_error":
if case.expected_error_type:
assert case.expected_error_type in str(type(e).__name__).lower() or \
case.expected_error_type in str(e).lower(), (
f"Expected error type '{case.expected_error_type}', got: {type(e).__name__}: {e}"
)
else:
raise
Guardrail Effectiveness Measurement
"""
Quantitatively measures the effectiveness of MCP Gateway guardrails.
Calculates block rate and false positive rate against an attack scenario dataset.
"""
from typing import List, Dict
from dataclasses import dataclass
@dataclass
class GuardrailEvalResult:
total_scenarios: int
true_positives: int # Correctly blocked attacks
false_positives: int # Incorrectly blocked legitimate requests
true_negatives: int # Correctly allowed legitimate requests
false_negatives: int # Missed attacks
precision: float
recall: float
f1_score: float
def evaluate_guardrails(
gateway,
attack_scenarios: List[dict],
benign_scenarios: List[dict],
) -> GuardrailEvalResult:
"""
Measures the precision/recall of guardrails against attack and benign scenarios.
"""
tp = fp = tn = fn = 0
# Attack scenarios: should be blocked
for scenario in attack_scenarios:
result = gateway.evaluate(
tool_name=scenario["tool_name"],
arguments=scenario["arguments"],
user_role=scenario.get("user_role", "customer"),
session_id=scenario.get("session_id", "eval"),
)
if result["decision"] in (GatewayDecision.DENY, GatewayDecision.RATE_LIMITED):
tp += 1
else:
fn += 1
print(f"MISS: Attack scenario not blocked: {scenario.get('name', 'unknown')}")
# Benign scenarios: should be allowed
for scenario in benign_scenarios:
result = gateway.evaluate(
tool_name=scenario["tool_name"],
arguments=scenario["arguments"],
user_role=scenario.get("user_role", "customer"),
session_id=scenario.get("session_id", "eval"),
)
if result["decision"] == GatewayDecision.ALLOW:
tn += 1
elif result["decision"] == GatewayDecision.REQUIRE_APPROVAL:
tn += 1 # Confirmation request for high-risk tools is normal behavior
else:
fp += 1
print(f"FALSE POSITIVE: Benign scenario blocked: {scenario.get('name', 'unknown')}")
total = tp + fp + tn + fn
precision = tp / max(tp + fp, 1)
recall = tp / max(tp + fn, 1)
f1 = 2 * precision * recall / max(precision + recall, 1e-9)
return GuardrailEvalResult(
total_scenarios=total,
true_positives=tp,
false_positives=fp,
true_negatives=tn,
false_negatives=fn,
precision=round(precision, 4),
recall=round(recall, 4),
f1_score=round(f1, 4),
)
MCP Security Threat Model and Countermeasures
A systematic overview of the security threats that can arise in MCP architecture.
| Threat Type | Attack Vector | Impact | Countermeasure |
|---|---|---|---|
| Tool Poisoning | Malicious MCP Server disguised as a legitimate tool | Data exfiltration, system takeover | Server certificate validation, tool allowlisting |
| Rug Pull | Initially normal behavior, then switches to malicious | Trust-based security bypass | Continuous tool behavior monitoring, hash-based integrity checks |
| Argument Injection | Malicious payload inserted in tool arguments | SQL injection, command execution | Strict JSON Schema validation, pattern blocking |
| Excessive Privilege | Tools exposed with unnecessarily broad permissions | Unintended data access | Principle of least privilege, role-based access control |
| Cross-Context Leakage | Context from other sessions leaks | PII exposure | Session isolation, context scoping |
| Denial of Service | Server overload through mass tool calls | Service disruption | Rate limiting, per-session concurrent call limits |
MCP Server Operational Monitoring
# prometheus-rules.yaml
groups:
- name: mcp_server_monitoring
interval: 15s
rules:
- alert: MCPToolCallErrorRateHigh
expr: |
sum(rate(mcp_tool_call_total{status="error"}[5m])) by (tool_name) /
sum(rate(mcp_tool_call_total[5m])) by (tool_name) > 0.05
for: 3m
labels:
severity: warning
team: chatbot
annotations:
summary: 'Error rate for MCP tool {{ $labels.tool_name }} exceeds 5%'
description: |
Current error rate: {{ $value | humanizePercentage }}
Check the tool implementation or downstream service status.
- alert: MCPGatewayBlockRateSpike
expr: |
sum(rate(mcp_gateway_decision_total{decision="deny"}[15m])) /
sum(rate(mcp_gateway_decision_total[15m])) > 0.20
for: 5m
labels:
severity: warning
team: security
annotations:
summary: 'MCP Gateway block rate exceeds 20%'
description: 'Possible increase in attack attempts or policy misconfiguration. Check audit logs.'
- alert: MCPToolLatencyHigh
expr: |
histogram_quantile(0.95,
rate(mcp_tool_call_duration_seconds_bucket[5m])
) > 3.0
for: 3m
labels:
severity: warning
team: chatbot
annotations:
summary: 'MCP tool call p95 latency exceeds 3 seconds'
MCP Guardrail Adoption Checklist
Follow this checklist in order when introducing guardrails to an MCP-based chatbot.
Phase 1: MCP Server Security Foundation
- Apply TLS certificates to all MCP Servers
- Set
additionalProperties: falsein each tool's inputSchema - Specify type, pattern, maxLength, and enum constraints for each tool's arguments
- Deploy blocking rules for known malicious patterns (SQL injection, path traversal)
Phase 2: Gateway Policy Deployment
- Deploy MCP Gateway and confirm all tool calls are routed through it
- Configure role-based tool access control policies
- Set per-tool rate limits
- Apply human-in-the-loop confirmation flow for high-risk tools
Phase 3: Monitoring and Evaluation
- Establish audit log collection and retention policy (minimum 90 days)
- Deploy Prometheus alert rules
- Integrate red team test suite into CI
- Measure guardrail precision/recall (target: precision over 0.95, recall over 0.98)
Phase 4: Continuous Operations
- Weekly audit log review
- Monthly red team test scenario updates
- Quarterly tool permission matrix review
- Establish MCP SDK security patch application procedures
Failure Scenario Responses
Scenario 1: Tool Unavailability Due to MCP Server Connection Failure
Symptom: Chatbot fails on all tool calls. Repeatedly shows "Feature unavailable" to users.
Error log:
MCPConnectionError: Failed to connect to MCP server at unix:///tmp/mcp-customer-support.sock
Timeout after 5000ms
Root cause: MCP Server process terminated due to OOM
Resolution:
1. Restart the MCP Server process (systemd or supervisor)
2. Set memory limits and adjust OOM killer priority
3. Graceful degradation: Switch to general conversation mode without tools when MCP Server is down
4. Add health check endpoint and configure automatic restart
Scenario 2: Legitimate Requests Blocked Due to Guardrail False Positives
Symptom: A legitimate request "Please cancel my order" is blocked by the Gateway
Error log:
Gateway DENY: Blocked pattern detected - "cancel" matched "cancel" block rule
Root cause: Overly broad patterns included in blocked_argument_patterns
("cancel" keyword matches in arguments as well as tool descriptions)
Resolution:
1. Restrict pattern matching scope to argument values only (exclude tool names/descriptions)
2. Require regression testing with benign scenarios before adding new blocked patterns
3. Establish a hotfix procedure to immediately disable patterns when false positives occur
Scenario 3: Power User Inconvenience Due to Rate Limiting
Symptom: CS agents hit rate limits during bulk order lookups, causing work delays
Error log:
Gateway RATE_LIMITED: Rate limit exceeded: 30/min for 'order_lookup'
Resolution:
1. Apply role-based differential rate limits (customer: 30/min, agent: 200/min, admin: unlimited)
2. Provide a separate batch lookup tool (order_batch_lookup)
3. Display rate limit history on dashboards to provide tuning insights
Quiz
Quiz
Q1. Why should guardrails in MCP's Client-Server architecture be separated into a Gateway?
It allows policies to be managed centrally without modifying the MCP Server itself, and enables consistent application of the same security policies across multiple MCP Servers. This separates concerns between server implementation and security policy.
Q2. What is the security reason for setting additionalProperties: false in MCP inputSchema?
It prevents the LLM from generating arbitrary fields not defined in the schema, which could cause unintended behavior. For example, it blocks the insertion of fields like {"admin_bypass": true}.
Q3. What is a Tool Poisoning attack and how do you defend against it?
It is an attack where a malicious MCP Server masquerades as a legitimate tool to exfiltrate data or take over systems. Defense includes server certificate validation, tool allowlisting, and integrity hash verification of tool descriptions and behavior.
Q4. Why is recall more important than precision in guardrail evaluation?
Low recall means attacks pass through without being blocked, leading to security incidents. Low precision means legitimate requests are falsely blocked, degrading user experience but remaining a recoverable issue compared to security breaches. Therefore, the recall target (0.98) is set higher than the precision target (0.95).
Q5. How do you apply Graceful Degradation when the MCP Server fails?
When the MCP Server connection fails, disable tool calling and provide the LLM with context that "tools are currently unavailable," switching to general conversation mode. Inform users of the limited functionality and provide guidance on when to retry.
Q6. Why should rate limiting be applied differently by role?
Because the usage patterns of regular customers and CS agents are fundamentally different. Agents need bulk lookups for their work, and applying the same rate limit reduces work efficiency. Rate limit thresholds should be set based on usage pattern data analysis for each role.
Q7. Why should red team test scenarios be updated monthly?
Because new attack techniques are continuously discovered, and LLM model updates can create new patterns that bypass existing guardrails. Test coverage must be maintained by incorporating OWASP LLM Top 10 updates and new findings from the security community.