- Published on
AI Agent Multi-Agent Orchestration Patterns: A Practical Guide to Hierarchical, Pipeline, and Swarm Architectures
- Authors
- Name
- Why Multi-Agent Now
- Multi-Agent System Overview
- Four Orchestration Patterns
- Framework Comparison
- Advanced Supervisor Pattern
- MCP Protocol Integration
- Real-World Example: Customer Support Multi-Agent System
- Failure Handling Strategies
- Observability
- Production Deployment Checklist
- Pattern Selection Guide
- Security Considerations
- Performance Optimization
- Conclusion
- References

Why Multi-Agent Now
Gartner projects that agentic AI will be embedded in 40% of enterprise applications by 2026. The paradigm is shifting from single general-purpose agents to domain-specialized multi-agent collaboration. The NIST AI Agent Standards Initiative has formalized security and interoperability standards.
This article analyzes four major multi-agent orchestration patterns and provides implementation code for LangGraph, CrewAI, and AutoGen frameworks.
Multi-Agent System Overview
Limitations of Single Agents
Single agents face the following constraints:
| Limitation | Description |
|---|---|
| Context window saturation | Complex tasks require longer prompts, degrading performance |
| Tool overload | Assigning dozens of tools to one agent reduces selection accuracy |
| Single point of failure | One agent failure halts the entire workflow |
| Lack of specialization | General prompts cannot achieve domain-optimal results |
| Scalability constraints | Cannot scale horizontally as workload increases |
What Multi-Agent Systems Solve
Multi-agent systems overcome these limits through division of labor and collaboration:
- Specialization: Each agent is optimized for a specific domain
- Parallel processing: Independent tasks execute simultaneously
- Fault isolation: One agent's failure does not affect the entire system
- Dynamic composition: Agent combinations adapt flexibly to tasks
Four Orchestration Patterns
Pattern 1: Single Agent
The most basic pattern where one agent handles all tasks.
from langchain.agents import create_tool_calling_agent
from langchain_openai import ChatOpenAI
from langchain.tools import tool
@tool
def search_web(query: str) -> str:
"""Search the web for information."""
return f"Search results for: {query}"
@tool
def calculate(expression: str) -> str:
"""Perform mathematical calculations."""
return str(eval(expression))
@tool
def write_file(filename: str, content: str) -> str:
"""Write content to a file."""
with open(filename, "w") as f:
f.write(content)
return f"File {filename} written successfully"
llm = ChatOpenAI(model="gpt-4o")
tools = [search_web, calculate, write_file]
agent = create_tool_calling_agent(llm, tools, prompt_template)
Best for: Simple tasks with 5 or fewer tools
Pattern 2: Hierarchical Multi-Agent
A supervisor agent distributes tasks to subordinate agents and aggregates results.
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Literal
import operator
class SupervisorState(TypedDict):
messages: Annotated[list, operator.add]
next_agent: str
final_answer: str
llm = ChatOpenAI(model="gpt-4o")
# Define subordinate agents
researcher = create_react_agent(
llm,
tools=[search_web],
state_modifier="You are a research specialist. Find accurate information."
)
analyst = create_react_agent(
llm,
tools=[calculate],
state_modifier="You are a data analyst. Analyze data and provide insights."
)
writer = create_react_agent(
llm,
tools=[write_file],
state_modifier="You are a technical writer. Create clear documentation."
)
# Supervisor routing logic
def supervisor_router(state: SupervisorState) -> Literal["researcher", "analyst", "writer", "__end__"]:
"""Supervisor determines the next agent."""
last_message = state["messages"][-1]
response = llm.invoke([
{"role": "system", "content": """You are a supervisor managing a team.
Route to: researcher (for information), analyst (for data), writer (for documentation).
Return __end__ when the task is complete."""},
{"role": "user", "content": last_message.content}
])
return response.content.strip()
# Build the graph
graph = StateGraph(SupervisorState)
graph.add_node("supervisor", supervisor_router)
graph.add_node("researcher", researcher)
graph.add_node("analyst", analyst)
graph.add_node("writer", writer)
graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", supervisor_router)
graph.add_edge("researcher", "supervisor")
graph.add_edge("analyst", "supervisor")
graph.add_edge("writer", "supervisor")
app = graph.compile()
Best for: Centralized control with dynamic task ordering
Pattern 3: Sequential Pipeline
Agents process tasks in a fixed order, with each agent's output feeding into the next agent's input.
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator
class PipelineState(TypedDict):
messages: Annotated[list, operator.add]
research_output: str
analysis_output: str
report_output: str
def research_node(state: PipelineState) -> PipelineState:
"""Stage 1: Information gathering"""
result = researcher.invoke({"messages": state["messages"]})
return {"research_output": result["messages"][-1].content}
def analysis_node(state: PipelineState) -> PipelineState:
"""Stage 2: Analysis"""
analysis_prompt = f"Analyze this research: {state['research_output']}"
result = analyst.invoke({"messages": [{"role": "user", "content": analysis_prompt}]})
return {"analysis_output": result["messages"][-1].content}
def report_node(state: PipelineState) -> PipelineState:
"""Stage 3: Report generation"""
report_prompt = f"""Write a report based on:
Research: {state['research_output']}
Analysis: {state['analysis_output']}"""
result = writer.invoke({"messages": [{"role": "user", "content": report_prompt}]})
return {"report_output": result["messages"][-1].content}
# Pipeline graph
pipeline = StateGraph(PipelineState)
pipeline.add_node("research", research_node)
pipeline.add_node("analysis", analysis_node)
pipeline.add_node("report", report_node)
pipeline.add_edge(START, "research")
pipeline.add_edge("research", "analysis")
pipeline.add_edge("analysis", "report")
pipeline.add_edge("report", END)
app = pipeline.compile()
Best for: Well-defined sequential workflows where each stage's output feeds the next
Pattern 4: Decentralized Swarm
Agents collaborate autonomously without a central coordinator.
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated, Literal
import operator
class SwarmState(TypedDict):
messages: Annotated[list, operator.add]
current_agent: str
task_board: dict # Shared task board
def agent_handoff(state: SwarmState, agent_name: str, target: str) -> SwarmState:
"""Handoff between agents"""
return {
"current_agent": target,
"messages": state["messages"] + [
{"role": "system", "content": f"Handoff from {agent_name} to {target}"}
]
}
def triage_agent(state: SwarmState) -> Literal["researcher", "analyst", "writer"]:
"""Triage agent: routes tasks to the appropriate agent"""
last_message = state["messages"][-1]
if "search" in last_message.content.lower():
return "researcher"
elif "analyze" in last_message.content.lower():
return "analyst"
else:
return "writer"
def researcher_with_handoff(state: SwarmState):
"""Researcher processes and hands off to the next agent"""
result = researcher.invoke({"messages": state["messages"]})
return agent_handoff(state, "researcher", "analyst")
def analyst_with_handoff(state: SwarmState):
"""Analyst processes and hands off to the next agent"""
result = analyst.invoke({"messages": state["messages"]})
return agent_handoff(state, "analyst", "writer")
# Swarm graph
swarm = StateGraph(SwarmState)
swarm.add_node("triage", triage_agent)
swarm.add_node("researcher", researcher_with_handoff)
swarm.add_node("analyst", analyst_with_handoff)
swarm.add_node("writer", writer)
swarm.add_edge(START, "triage")
swarm.add_conditional_edges("triage", triage_agent)
swarm.add_edge("researcher", "analyst")
swarm.add_edge("analyst", "writer")
swarm.add_edge("writer", END)
app = swarm.compile()
Best for: Tasks requiring autonomous decision-making and flexible collaboration
Framework Comparison
LangGraph vs CrewAI vs AutoGen
| Feature | LangGraph | CrewAI | AutoGen |
|---|---|---|---|
| Architecture | Graph-based state machine | Role-based agent teams | Conversation-based multi-agent |
| Flexibility | Very high (low-level control) | Medium (abstracted API) | High (customizable) |
| Learning curve | High | Low | Medium |
| State management | Built-in (checkpoint support) | Basic | Conversation history-based |
| Human-in-the-Loop | Native support | Basic support | Native support |
| Streaming | Native support | Limited | Event-based |
| Production readiness | High | Medium | High |
| Community size | Large | Medium | Large |
| License | MIT | MIT | MIT |
CrewAI Implementation
from crewai import Agent, Task, Crew, Process
# Define agents
researcher = Agent(
role="Senior Research Analyst",
goal="Find comprehensive and accurate information about the given topic",
backstory="""You are an expert researcher with decades of experience
in gathering and synthesizing information from multiple sources.""",
verbose=True,
allow_delegation=True,
tools=[search_tool, scrape_tool]
)
analyst = Agent(
role="Data Analyst",
goal="Analyze research findings and extract actionable insights",
backstory="""You are a skilled data analyst who excels at finding
patterns and drawing meaningful conclusions from data.""",
verbose=True,
tools=[analysis_tool, chart_tool]
)
writer = Agent(
role="Technical Writer",
goal="Create clear and comprehensive reports",
backstory="""You are an experienced technical writer who can transform
complex analyses into readable documents.""",
verbose=True,
tools=[write_tool]
)
# Define tasks
research_task = Task(
description="Research the latest trends in AI agent orchestration",
expected_output="A comprehensive summary of findings with sources",
agent=researcher
)
analysis_task = Task(
description="Analyze the research findings and identify key patterns",
expected_output="An analytical report with data-driven insights",
agent=analyst,
context=[research_task] # Reference previous task results
)
report_task = Task(
description="Write a final report combining research and analysis",
expected_output="A polished report ready for stakeholders",
agent=writer,
context=[research_task, analysis_task]
)
# Build and execute crew
crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, report_task],
process=Process.sequential, # or Process.hierarchical
verbose=True
)
result = crew.kickoff()
print(result)
AutoGen Implementation
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
# Agent configuration
config_list = [{"model": "gpt-4o", "api_key": "YOUR_API_KEY"}]
researcher = AssistantAgent(
name="Researcher",
system_message="""You are a research specialist.
Find accurate and relevant information.
When your research is complete, say RESEARCH_DONE.""",
llm_config={"config_list": config_list}
)
analyst = AssistantAgent(
name="Analyst",
system_message="""You are a data analyst.
Analyze the research findings and provide insights.
When analysis is complete, say ANALYSIS_DONE.""",
llm_config={"config_list": config_list}
)
writer = AssistantAgent(
name="Writer",
system_message="""You are a technical writer.
Create clear documentation based on research and analysis.
When the report is complete, say TERMINATE.""",
llm_config={"config_list": config_list}
)
user_proxy = UserProxyAgent(
name="Admin",
human_input_mode="NEVER",
code_execution_config={"work_dir": "output"},
is_termination_msg=lambda x: "TERMINATE" in x.get("content", "")
)
# Group chat setup
group_chat = GroupChat(
agents=[user_proxy, researcher, analyst, writer],
messages=[],
max_round=20,
speaker_selection_method="round_robin"
)
manager = GroupChatManager(
groupchat=group_chat,
llm_config={"config_list": config_list}
)
# Execute
user_proxy.initiate_chat(
manager,
message="Research AI agent orchestration patterns and write a report."
)
Advanced Supervisor Pattern
Dynamic Routing Implementation
An advanced supervisor that analyzes tasks and routes them to the optimal agent.
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from typing import Literal
class RouteDecision(BaseModel):
"""Supervisor routing decision"""
next_agent: Literal["researcher", "analyst", "writer", "FINISH"] = Field(
description="The next agent to route to"
)
reasoning: str = Field(
description="Why this agent was chosen"
)
task_description: str = Field(
description="Specific task for the chosen agent"
)
llm = ChatOpenAI(model="gpt-4o")
structured_llm = llm.with_structured_output(RouteDecision)
SUPERVISOR_PROMPT = """You are a supervisor managing a team of agents.
Based on the current state and conversation, decide:
1. Which agent should work next
2. What specific task they should perform
3. Whether the overall task is complete (FINISH)
Available agents:
- researcher: Searches for information and gathers data
- analyst: Analyzes data and provides insights
- writer: Creates reports and documentation
Current conversation:
{messages}
Task Board:
{task_board}
"""
def supervisor_node(state):
"""Supervisor node: dynamic routing"""
decision = structured_llm.invoke(
SUPERVISOR_PROMPT.format(
messages=state["messages"],
task_board=state.get("task_board", "Empty")
)
)
return {
"next_agent": decision.next_agent,
"messages": state["messages"] + [
{"role": "system",
"content": f"Supervisor routed to {decision.next_agent}: {decision.task_description}"}
]
}
Human-in-the-Loop Integration
Inserting human approval steps into the workflow.
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
checkpointer = MemorySaver()
def human_approval_node(state):
"""Node that awaits human approval"""
return {
"messages": state["messages"] + [
{"role": "system", "content": "Awaiting human approval..."}
],
"approval_status": "pending"
}
def check_approval(state) -> Literal["approved", "rejected"]:
"""Check approval status"""
return state.get("approval_status", "pending")
# Add Human-in-the-Loop to graph
graph = StateGraph(SupervisorState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("researcher", researcher)
graph.add_node("human_review", human_approval_node)
graph.add_node("writer", writer)
graph.add_edge(START, "supervisor")
graph.add_edge("supervisor", "researcher")
graph.add_edge("researcher", "human_review")
graph.add_conditional_edges(
"human_review",
check_approval,
{"approved": "writer", "rejected": "supervisor"}
)
graph.add_edge("writer", END)
# Compile with checkpointer for state persistence
app = graph.compile(checkpointer=checkpointer, interrupt_before=["human_review"])
# Execute and pause at interrupt point
config = {"configurable": {"thread_id": "review-thread-1"}}
result = app.invoke(initial_state, config)
# Resume after human approval
app.invoke(None, config)
MCP Protocol Integration
What is Model Context Protocol (MCP)
MCP is an interoperability protocol published by Anthropic that enables agents to access external tools and data sources through a standardized interface.
# MCP server implementation
from mcp import Server, Tool
import asyncio
server = Server("analytics-server")
@server.tool()
async def query_database(query: str) -> str:
"""Execute a SQL query against the database."""
result = await db.execute(query)
return str(result)
@server.tool()
async def generate_chart(data: str, chart_type: str) -> str:
"""Generate a chart from the given data."""
return f"Chart generated: {chart_type}"
@server.resource("schema://tables")
async def list_tables() -> str:
"""List available database tables"""
tables = await db.get_tables()
return "\n".join(tables)
# Run server
async def main():
async with server.run_stdio() as running:
await running.wait()
asyncio.run(main())
MCP Client with Multi-Agent Integration
from mcp import ClientSession, StdioServerParameters
from langchain_mcp_adapters.tools import load_mcp_tools
from langgraph.prebuilt import create_react_agent
# MCP server connection
server_params = StdioServerParameters(
command="python",
args=["analytics_server.py"]
)
async def create_mcp_agent():
"""Create an agent with MCP tools"""
async with ClientSession(*server_params) as session:
await session.initialize()
# Convert MCP tools to LangChain tools
tools = await load_mcp_tools(session)
# Create agent
agent = create_react_agent(
ChatOpenAI(model="gpt-4o"),
tools,
state_modifier="You are a data analyst with access to database tools."
)
return agent
Real-World Example: Customer Support Multi-Agent System
Architecture Design
A production customer support system implemented with hierarchical multi-agent orchestration.
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Literal
import operator
class CustomerSupportState(TypedDict):
messages: Annotated[list, operator.add]
customer_id: str
issue_category: str
sentiment: str
resolution: str
escalated: bool
# Triage agent
def triage_agent(state: CustomerSupportState) -> CustomerSupportState:
"""Classify customer inquiries and route to specialized agents"""
llm = ChatOpenAI(model="gpt-4o")
response = llm.invoke([
{"role": "system", "content": """Classify the customer issue into one of:
- billing: Payment, invoice, subscription issues
- technical: Product bugs, errors, configuration
- general: General inquiries, feedback
Also assess sentiment: positive, neutral, negative, urgent"""},
{"role": "user", "content": state["messages"][-1].content}
])
return {
"issue_category": "technical",
"sentiment": "negative"
}
# Technical support agent
def technical_support_agent(state: CustomerSupportState) -> CustomerSupportState:
"""Diagnose technical issues and provide solutions"""
llm = ChatOpenAI(model="gpt-4o")
response = llm.invoke([
{"role": "system", "content": """You are a technical support specialist.
Diagnose the issue and provide step-by-step solutions.
If the issue requires engineering escalation, set escalated=true."""},
{"role": "user", "content": str(state["messages"])}
])
return {
"resolution": response.content,
"messages": [{"role": "assistant", "content": response.content}]
}
# Billing support agent
def billing_support_agent(state: CustomerSupportState) -> CustomerSupportState:
"""Handle payment-related issues"""
llm = ChatOpenAI(model="gpt-4o")
response = llm.invoke([
{"role": "system", "content": """You are a billing specialist.
Handle payment issues, refunds, and subscription changes."""},
{"role": "user", "content": str(state["messages"])}
])
return {
"resolution": response.content,
"messages": [{"role": "assistant", "content": response.content}]
}
# Escalation agent
def escalation_agent(state: CustomerSupportState) -> CustomerSupportState:
"""Escalate complex issues to higher-level support"""
return {
"escalated": True,
"messages": [
{"role": "system",
"content": f"Issue escalated for customer {state['customer_id']}"}
]
}
# Routing functions
def route_issue(state: CustomerSupportState) -> Literal["technical", "billing", "general"]:
return state["issue_category"]
def check_escalation(state: CustomerSupportState) -> Literal["escalate", "resolve"]:
if state.get("escalated"):
return "escalate"
return "resolve"
# Build graph
workflow = StateGraph(CustomerSupportState)
workflow.add_node("triage", triage_agent)
workflow.add_node("technical", technical_support_agent)
workflow.add_node("billing", billing_support_agent)
workflow.add_node("escalation", escalation_agent)
workflow.add_edge(START, "triage")
workflow.add_conditional_edges("triage", route_issue, {
"technical": "technical",
"billing": "billing",
"general": "billing"
})
workflow.add_conditional_edges("technical", check_escalation, {
"escalate": "escalation",
"resolve": END
})
workflow.add_edge("billing", END)
workflow.add_edge("escalation", END)
app = workflow.compile()
Failure Handling Strategies
Retry and Fallback Patterns
from tenacity import retry, stop_after_attempt, wait_exponential
import logging
logger = logging.getLogger(__name__)
class AgentWithRetry:
"""Agent wrapper with retry logic"""
def __init__(self, agent, max_retries=3, fallback_agent=None):
self.agent = agent
self.max_retries = max_retries
self.fallback_agent = fallback_agent
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30)
)
async def invoke_with_retry(self, state):
"""Invoke agent with retry logic"""
try:
return await self.agent.ainvoke(state)
except Exception as e:
logger.error(f"Agent failed: {e}")
raise
async def invoke(self, state):
"""Invoke agent with fallback"""
try:
return await self.invoke_with_retry(state)
except Exception as e:
if self.fallback_agent:
logger.warning(f"Falling back to backup agent: {e}")
return await self.fallback_agent.ainvoke(state)
raise
# Circuit breaker pattern
class CircuitBreaker:
"""Circuit breaker for agent resilience"""
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = "closed" # closed, open, half-open
self.last_failure_time = None
def can_execute(self) -> bool:
if self.state == "closed":
return True
if self.state == "open":
import time
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half-open"
return True
return False
return True # half-open
def record_success(self):
self.failure_count = 0
self.state = "closed"
def record_failure(self):
import time
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
Dead Letter Queue Pattern
import json
from datetime import datetime
class DeadLetterQueue:
"""Store failed messages for later retry"""
def __init__(self, storage_path="dead_letters.json"):
self.storage_path = storage_path
self.messages = []
def add(self, message: dict, error: str, agent_name: str):
"""Add a failed message to the queue"""
entry = {
"timestamp": datetime.now().isoformat(),
"agent": agent_name,
"message": message,
"error": str(error),
"retry_count": 0
}
self.messages.append(entry)
self._persist()
def retry_all(self, agent_registry: dict):
"""Retry all messages in the queue"""
for entry in self.messages:
agent = agent_registry.get(entry["agent"])
if agent:
try:
agent.invoke(entry["message"])
self.messages.remove(entry)
except Exception as e:
entry["retry_count"] += 1
entry["last_error"] = str(e)
self._persist()
def _persist(self):
with open(self.storage_path, "w") as f:
json.dump(self.messages, f, indent=2)
Observability
LangSmith Integration
import os
# Enable LangSmith tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "multi-agent-orchestration"
from langsmith import Client
client = Client()
def track_agent_metrics(agent_name: str, duration: float, tokens: int, success: bool):
"""Track agent execution metrics"""
client.create_run(
name=f"agent-{agent_name}",
run_type="chain",
inputs={"agent": agent_name},
outputs={
"duration_ms": duration * 1000,
"total_tokens": tokens,
"success": success
}
)
OpenTelemetry Integration
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# Set up tracer
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("multi-agent-system")
def traced_agent_node(agent_name: str):
"""Agent node with OpenTelemetry tracing"""
def node_fn(state):
with tracer.start_as_current_span(f"agent.{agent_name}") as span:
span.set_attribute("agent.name", agent_name)
span.set_attribute("agent.input_messages", len(state["messages"]))
try:
result = agent.invoke(state)
span.set_attribute("agent.success", True)
return result
except Exception as e:
span.set_attribute("agent.success", False)
span.record_exception(e)
raise
return node_fn
Production Deployment Checklist
Design Phase
- Agent roles and tools are clearly defined
- Inter-agent communication protocol is standardized
- State management strategy is established (local vs distributed)
- Failure scenarios have corresponding response strategies
- Human-in-the-Loop intervention points are identified
Implementation Phase
- Appropriate models are assigned to each agent (cost vs performance)
- Tool execution timeouts are configured
- Retry logic and circuit breakers are implemented
- Dead letter queue tracks failed tasks
- Input/output validation (guardrails) is applied
Deployment Phase
- Observability pipeline is configured (LangSmith / OTEL)
- Per-agent cost tracking is available
- Rate limiting is applied
- Security audit logs are enabled
- Rollback strategy is established
Operations Phase
- Agent performance dashboard is built
- Anomaly detection alerts are configured
- Prompt version management is applied
- A/B testing framework is ready
- Regular prompt optimization process is in place
Pattern Selection Guide
Decision Flowchart
Task Type Assessment
|
+-- Simple task (5 or fewer tools) ---------> Single Agent
|
+-- Fixed-order multi-step task ------------> Pipeline
|
+-- Dynamic routing needed -----------------> Hierarchical (Supervisor)
|
+-- Autonomous collaboration required ------> Swarm
Pattern Comparison Summary
| Pattern | Strengths | Weaknesses | Complexity | Scale |
|---|---|---|---|---|
| Single Agent | Simple to implement, easy to debug | Limited scalability, context saturation | Low | Small |
| Hierarchical | Central control, dynamic routing | Supervisor bottleneck, single point of failure | Medium | Medium |
| Pipeline | Predictable, easy to test | Inflexible, sequential latency | Low-Medium | Medium |
| Swarm | High flexibility, autonomous collaboration | Hard to debug, unpredictable | High | Large |
Security Considerations
Agent Isolation
class SandboxedAgent:
"""Agent running in an isolated environment"""
def __init__(self, agent, allowed_tools: list, max_tokens: int = 4096):
self.agent = agent
self.allowed_tools = set(allowed_tools)
self.max_tokens = max_tokens
def invoke(self, state):
# Verify tool access permissions
requested_tools = self._extract_tool_calls(state)
unauthorized = requested_tools - self.allowed_tools
if unauthorized:
raise PermissionError(
f"Agent attempted to use unauthorized tools: {unauthorized}"
)
# Enforce token limit
if self._estimate_tokens(state) > self.max_tokens:
raise ResourceError("Token limit exceeded")
return self.agent.invoke(state)
def _extract_tool_calls(self, state) -> set:
return set()
def _estimate_tokens(self, state) -> int:
return 0
Prompt Injection Defense
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, validator
class SafeAgentOutput(BaseModel):
"""Agent output validation schema"""
response: str
confidence: float
sources: list[str]
@validator("response")
def validate_response(cls, v):
forbidden_patterns = [
"ignore previous instructions",
"system prompt",
"bypass",
"jailbreak"
]
for pattern in forbidden_patterns:
if pattern.lower() in v.lower():
raise ValueError(f"Suspicious pattern detected: {pattern}")
return v
parser = PydanticOutputParser(pydantic_object=SafeAgentOutput)
Performance Optimization
Parallel Execution Strategy
from langgraph.graph import StateGraph, START, END
import asyncio
class ParallelState(TypedDict):
messages: Annotated[list, operator.add]
research_result: str
analysis_result: str
async def parallel_execution(state):
"""Execute independent agents in parallel"""
research_task = asyncio.create_task(
researcher.ainvoke({"messages": state["messages"]})
)
analysis_task = asyncio.create_task(
analyst.ainvoke({"messages": state["messages"]})
)
research_result, analysis_result = await asyncio.gather(
research_task, analysis_task
)
return {
"research_result": research_result["messages"][-1].content,
"analysis_result": analysis_result["messages"][-1].content
}
# LangGraph fan-out pattern
graph = StateGraph(ParallelState)
graph.add_node("research", researcher)
graph.add_node("analysis", analyst)
graph.add_node("synthesis", writer)
# Parallel execution: fan-out from START to both nodes
graph.add_edge(START, "research")
graph.add_edge(START, "analysis")
# Both results converge into synthesis
graph.add_edge("research", "synthesis")
graph.add_edge("analysis", "synthesis")
graph.add_edge("synthesis", END)
Caching Strategy
from functools import lru_cache
import hashlib
import json
class AgentCache:
"""Cache agent responses"""
def __init__(self, ttl_seconds=3600):
self.cache = {}
self.ttl = ttl_seconds
def get_cache_key(self, state: dict) -> str:
"""Generate cache key from state"""
state_str = json.dumps(state, sort_keys=True, default=str)
return hashlib.sha256(state_str.encode()).hexdigest()
def get(self, state: dict):
"""Look up result in cache"""
key = self.get_cache_key(state)
if key in self.cache:
entry = self.cache[key]
import time
if time.time() - entry["timestamp"] < self.ttl:
return entry["result"]
del self.cache[key]
return None
def set(self, state: dict, result):
"""Store result in cache"""
import time
key = self.get_cache_key(state)
self.cache[key] = {
"result": result,
"timestamp": time.time()
}
Conclusion
Multi-agent orchestration is not merely about connecting multiple agents. It is about selecting the right pattern for the task and building robust failure handling and observability.
Key takeaways:
- Start with a single agent and transition to multi-agent as complexity grows
- Hierarchical pattern suits centralized control scenarios
- Pipeline pattern is optimal for fixed sequential workflows
- Swarm pattern fits complex scenarios requiring high autonomy
- Choose frameworks based on need: LangGraph (flexibility), CrewAI (rapid prototyping), AutoGen (conversation-based)