Skip to content
Published on

AI Agent Multi-Agent Orchestration Patterns: A Practical Guide to Hierarchical, Pipeline, and Swarm Architectures

Authors
  • Name
    Twitter

AI Agent Multi-Agent Orchestration Patterns

Why Multi-Agent Now

Gartner projects that agentic AI will be embedded in 40% of enterprise applications by 2026. The paradigm is shifting from single general-purpose agents to domain-specialized multi-agent collaboration. The NIST AI Agent Standards Initiative has formalized security and interoperability standards.

This article analyzes four major multi-agent orchestration patterns and provides implementation code for LangGraph, CrewAI, and AutoGen frameworks.


Multi-Agent System Overview

Limitations of Single Agents

Single agents face the following constraints:

LimitationDescription
Context window saturationComplex tasks require longer prompts, degrading performance
Tool overloadAssigning dozens of tools to one agent reduces selection accuracy
Single point of failureOne agent failure halts the entire workflow
Lack of specializationGeneral prompts cannot achieve domain-optimal results
Scalability constraintsCannot scale horizontally as workload increases

What Multi-Agent Systems Solve

Multi-agent systems overcome these limits through division of labor and collaboration:

  • Specialization: Each agent is optimized for a specific domain
  • Parallel processing: Independent tasks execute simultaneously
  • Fault isolation: One agent's failure does not affect the entire system
  • Dynamic composition: Agent combinations adapt flexibly to tasks

Four Orchestration Patterns

Pattern 1: Single Agent

The most basic pattern where one agent handles all tasks.

from langchain.agents import create_tool_calling_agent
from langchain_openai import ChatOpenAI
from langchain.tools import tool

@tool
def search_web(query: str) -> str:
    """Search the web for information."""
    return f"Search results for: {query}"

@tool
def calculate(expression: str) -> str:
    """Perform mathematical calculations."""
    return str(eval(expression))

@tool
def write_file(filename: str, content: str) -> str:
    """Write content to a file."""
    with open(filename, "w") as f:
        f.write(content)
    return f"File {filename} written successfully"

llm = ChatOpenAI(model="gpt-4o")
tools = [search_web, calculate, write_file]

agent = create_tool_calling_agent(llm, tools, prompt_template)

Best for: Simple tasks with 5 or fewer tools

Pattern 2: Hierarchical Multi-Agent

A supervisor agent distributes tasks to subordinate agents and aggregates results.

from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Literal
import operator

class SupervisorState(TypedDict):
    messages: Annotated[list, operator.add]
    next_agent: str
    final_answer: str

llm = ChatOpenAI(model="gpt-4o")

# Define subordinate agents
researcher = create_react_agent(
    llm,
    tools=[search_web],
    state_modifier="You are a research specialist. Find accurate information."
)

analyst = create_react_agent(
    llm,
    tools=[calculate],
    state_modifier="You are a data analyst. Analyze data and provide insights."
)

writer = create_react_agent(
    llm,
    tools=[write_file],
    state_modifier="You are a technical writer. Create clear documentation."
)

# Supervisor routing logic
def supervisor_router(state: SupervisorState) -> Literal["researcher", "analyst", "writer", "__end__"]:
    """Supervisor determines the next agent."""
    last_message = state["messages"][-1]

    response = llm.invoke([
        {"role": "system", "content": """You are a supervisor managing a team.
        Route to: researcher (for information), analyst (for data), writer (for documentation).
        Return __end__ when the task is complete."""},
        {"role": "user", "content": last_message.content}
    ])

    return response.content.strip()

# Build the graph
graph = StateGraph(SupervisorState)
graph.add_node("supervisor", supervisor_router)
graph.add_node("researcher", researcher)
graph.add_node("analyst", analyst)
graph.add_node("writer", writer)

graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", supervisor_router)
graph.add_edge("researcher", "supervisor")
graph.add_edge("analyst", "supervisor")
graph.add_edge("writer", "supervisor")

app = graph.compile()

Best for: Centralized control with dynamic task ordering

Pattern 3: Sequential Pipeline

Agents process tasks in a fixed order, with each agent's output feeding into the next agent's input.

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator

class PipelineState(TypedDict):
    messages: Annotated[list, operator.add]
    research_output: str
    analysis_output: str
    report_output: str

def research_node(state: PipelineState) -> PipelineState:
    """Stage 1: Information gathering"""
    result = researcher.invoke({"messages": state["messages"]})
    return {"research_output": result["messages"][-1].content}

def analysis_node(state: PipelineState) -> PipelineState:
    """Stage 2: Analysis"""
    analysis_prompt = f"Analyze this research: {state['research_output']}"
    result = analyst.invoke({"messages": [{"role": "user", "content": analysis_prompt}]})
    return {"analysis_output": result["messages"][-1].content}

def report_node(state: PipelineState) -> PipelineState:
    """Stage 3: Report generation"""
    report_prompt = f"""Write a report based on:
    Research: {state['research_output']}
    Analysis: {state['analysis_output']}"""
    result = writer.invoke({"messages": [{"role": "user", "content": report_prompt}]})
    return {"report_output": result["messages"][-1].content}

# Pipeline graph
pipeline = StateGraph(PipelineState)
pipeline.add_node("research", research_node)
pipeline.add_node("analysis", analysis_node)
pipeline.add_node("report", report_node)

pipeline.add_edge(START, "research")
pipeline.add_edge("research", "analysis")
pipeline.add_edge("analysis", "report")
pipeline.add_edge("report", END)

app = pipeline.compile()

Best for: Well-defined sequential workflows where each stage's output feeds the next

Pattern 4: Decentralized Swarm

Agents collaborate autonomously without a central coordinator.

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated, Literal
import operator

class SwarmState(TypedDict):
    messages: Annotated[list, operator.add]
    current_agent: str
    task_board: dict  # Shared task board

def agent_handoff(state: SwarmState, agent_name: str, target: str) -> SwarmState:
    """Handoff between agents"""
    return {
        "current_agent": target,
        "messages": state["messages"] + [
            {"role": "system", "content": f"Handoff from {agent_name} to {target}"}
        ]
    }

def triage_agent(state: SwarmState) -> Literal["researcher", "analyst", "writer"]:
    """Triage agent: routes tasks to the appropriate agent"""
    last_message = state["messages"][-1]

    if "search" in last_message.content.lower():
        return "researcher"
    elif "analyze" in last_message.content.lower():
        return "analyst"
    else:
        return "writer"

def researcher_with_handoff(state: SwarmState):
    """Researcher processes and hands off to the next agent"""
    result = researcher.invoke({"messages": state["messages"]})
    return agent_handoff(state, "researcher", "analyst")

def analyst_with_handoff(state: SwarmState):
    """Analyst processes and hands off to the next agent"""
    result = analyst.invoke({"messages": state["messages"]})
    return agent_handoff(state, "analyst", "writer")

# Swarm graph
swarm = StateGraph(SwarmState)
swarm.add_node("triage", triage_agent)
swarm.add_node("researcher", researcher_with_handoff)
swarm.add_node("analyst", analyst_with_handoff)
swarm.add_node("writer", writer)

swarm.add_edge(START, "triage")
swarm.add_conditional_edges("triage", triage_agent)
swarm.add_edge("researcher", "analyst")
swarm.add_edge("analyst", "writer")
swarm.add_edge("writer", END)

app = swarm.compile()

Best for: Tasks requiring autonomous decision-making and flexible collaboration


Framework Comparison

LangGraph vs CrewAI vs AutoGen

FeatureLangGraphCrewAIAutoGen
ArchitectureGraph-based state machineRole-based agent teamsConversation-based multi-agent
FlexibilityVery high (low-level control)Medium (abstracted API)High (customizable)
Learning curveHighLowMedium
State managementBuilt-in (checkpoint support)BasicConversation history-based
Human-in-the-LoopNative supportBasic supportNative support
StreamingNative supportLimitedEvent-based
Production readinessHighMediumHigh
Community sizeLargeMediumLarge
LicenseMITMITMIT

CrewAI Implementation

from crewai import Agent, Task, Crew, Process

# Define agents
researcher = Agent(
    role="Senior Research Analyst",
    goal="Find comprehensive and accurate information about the given topic",
    backstory="""You are an expert researcher with decades of experience
    in gathering and synthesizing information from multiple sources.""",
    verbose=True,
    allow_delegation=True,
    tools=[search_tool, scrape_tool]
)

analyst = Agent(
    role="Data Analyst",
    goal="Analyze research findings and extract actionable insights",
    backstory="""You are a skilled data analyst who excels at finding
    patterns and drawing meaningful conclusions from data.""",
    verbose=True,
    tools=[analysis_tool, chart_tool]
)

writer = Agent(
    role="Technical Writer",
    goal="Create clear and comprehensive reports",
    backstory="""You are an experienced technical writer who can transform
    complex analyses into readable documents.""",
    verbose=True,
    tools=[write_tool]
)

# Define tasks
research_task = Task(
    description="Research the latest trends in AI agent orchestration",
    expected_output="A comprehensive summary of findings with sources",
    agent=researcher
)

analysis_task = Task(
    description="Analyze the research findings and identify key patterns",
    expected_output="An analytical report with data-driven insights",
    agent=analyst,
    context=[research_task]  # Reference previous task results
)

report_task = Task(
    description="Write a final report combining research and analysis",
    expected_output="A polished report ready for stakeholders",
    agent=writer,
    context=[research_task, analysis_task]
)

# Build and execute crew
crew = Crew(
    agents=[researcher, analyst, writer],
    tasks=[research_task, analysis_task, report_task],
    process=Process.sequential,  # or Process.hierarchical
    verbose=True
)

result = crew.kickoff()
print(result)

AutoGen Implementation

from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager

# Agent configuration
config_list = [{"model": "gpt-4o", "api_key": "YOUR_API_KEY"}]

researcher = AssistantAgent(
    name="Researcher",
    system_message="""You are a research specialist.
    Find accurate and relevant information.
    When your research is complete, say RESEARCH_DONE.""",
    llm_config={"config_list": config_list}
)

analyst = AssistantAgent(
    name="Analyst",
    system_message="""You are a data analyst.
    Analyze the research findings and provide insights.
    When analysis is complete, say ANALYSIS_DONE.""",
    llm_config={"config_list": config_list}
)

writer = AssistantAgent(
    name="Writer",
    system_message="""You are a technical writer.
    Create clear documentation based on research and analysis.
    When the report is complete, say TERMINATE.""",
    llm_config={"config_list": config_list}
)

user_proxy = UserProxyAgent(
    name="Admin",
    human_input_mode="NEVER",
    code_execution_config={"work_dir": "output"},
    is_termination_msg=lambda x: "TERMINATE" in x.get("content", "")
)

# Group chat setup
group_chat = GroupChat(
    agents=[user_proxy, researcher, analyst, writer],
    messages=[],
    max_round=20,
    speaker_selection_method="round_robin"
)

manager = GroupChatManager(
    groupchat=group_chat,
    llm_config={"config_list": config_list}
)

# Execute
user_proxy.initiate_chat(
    manager,
    message="Research AI agent orchestration patterns and write a report."
)

Advanced Supervisor Pattern

Dynamic Routing Implementation

An advanced supervisor that analyzes tasks and routes them to the optimal agent.

from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from typing import Literal

class RouteDecision(BaseModel):
    """Supervisor routing decision"""
    next_agent: Literal["researcher", "analyst", "writer", "FINISH"] = Field(
        description="The next agent to route to"
    )
    reasoning: str = Field(
        description="Why this agent was chosen"
    )
    task_description: str = Field(
        description="Specific task for the chosen agent"
    )

llm = ChatOpenAI(model="gpt-4o")
structured_llm = llm.with_structured_output(RouteDecision)

SUPERVISOR_PROMPT = """You are a supervisor managing a team of agents.
Based on the current state and conversation, decide:
1. Which agent should work next
2. What specific task they should perform
3. Whether the overall task is complete (FINISH)

Available agents:
- researcher: Searches for information and gathers data
- analyst: Analyzes data and provides insights
- writer: Creates reports and documentation

Current conversation:
{messages}

Task Board:
{task_board}
"""

def supervisor_node(state):
    """Supervisor node: dynamic routing"""
    decision = structured_llm.invoke(
        SUPERVISOR_PROMPT.format(
            messages=state["messages"],
            task_board=state.get("task_board", "Empty")
        )
    )

    return {
        "next_agent": decision.next_agent,
        "messages": state["messages"] + [
            {"role": "system",
             "content": f"Supervisor routed to {decision.next_agent}: {decision.task_description}"}
        ]
    }

Human-in-the-Loop Integration

Inserting human approval steps into the workflow.

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END

checkpointer = MemorySaver()

def human_approval_node(state):
    """Node that awaits human approval"""
    return {
        "messages": state["messages"] + [
            {"role": "system", "content": "Awaiting human approval..."}
        ],
        "approval_status": "pending"
    }

def check_approval(state) -> Literal["approved", "rejected"]:
    """Check approval status"""
    return state.get("approval_status", "pending")

# Add Human-in-the-Loop to graph
graph = StateGraph(SupervisorState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("researcher", researcher)
graph.add_node("human_review", human_approval_node)
graph.add_node("writer", writer)

graph.add_edge(START, "supervisor")
graph.add_edge("supervisor", "researcher")
graph.add_edge("researcher", "human_review")
graph.add_conditional_edges(
    "human_review",
    check_approval,
    {"approved": "writer", "rejected": "supervisor"}
)
graph.add_edge("writer", END)

# Compile with checkpointer for state persistence
app = graph.compile(checkpointer=checkpointer, interrupt_before=["human_review"])

# Execute and pause at interrupt point
config = {"configurable": {"thread_id": "review-thread-1"}}
result = app.invoke(initial_state, config)

# Resume after human approval
app.invoke(None, config)

MCP Protocol Integration

What is Model Context Protocol (MCP)

MCP is an interoperability protocol published by Anthropic that enables agents to access external tools and data sources through a standardized interface.

# MCP server implementation
from mcp import Server, Tool
import asyncio

server = Server("analytics-server")

@server.tool()
async def query_database(query: str) -> str:
    """Execute a SQL query against the database."""
    result = await db.execute(query)
    return str(result)

@server.tool()
async def generate_chart(data: str, chart_type: str) -> str:
    """Generate a chart from the given data."""
    return f"Chart generated: {chart_type}"

@server.resource("schema://tables")
async def list_tables() -> str:
    """List available database tables"""
    tables = await db.get_tables()
    return "\n".join(tables)

# Run server
async def main():
    async with server.run_stdio() as running:
        await running.wait()

asyncio.run(main())

MCP Client with Multi-Agent Integration

from mcp import ClientSession, StdioServerParameters
from langchain_mcp_adapters.tools import load_mcp_tools
from langgraph.prebuilt import create_react_agent

# MCP server connection
server_params = StdioServerParameters(
    command="python",
    args=["analytics_server.py"]
)

async def create_mcp_agent():
    """Create an agent with MCP tools"""
    async with ClientSession(*server_params) as session:
        await session.initialize()

        # Convert MCP tools to LangChain tools
        tools = await load_mcp_tools(session)

        # Create agent
        agent = create_react_agent(
            ChatOpenAI(model="gpt-4o"),
            tools,
            state_modifier="You are a data analyst with access to database tools."
        )

        return agent

Real-World Example: Customer Support Multi-Agent System

Architecture Design

A production customer support system implemented with hierarchical multi-agent orchestration.

from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Literal
import operator

class CustomerSupportState(TypedDict):
    messages: Annotated[list, operator.add]
    customer_id: str
    issue_category: str
    sentiment: str
    resolution: str
    escalated: bool

# Triage agent
def triage_agent(state: CustomerSupportState) -> CustomerSupportState:
    """Classify customer inquiries and route to specialized agents"""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke([
        {"role": "system", "content": """Classify the customer issue into one of:
        - billing: Payment, invoice, subscription issues
        - technical: Product bugs, errors, configuration
        - general: General inquiries, feedback
        Also assess sentiment: positive, neutral, negative, urgent"""},
        {"role": "user", "content": state["messages"][-1].content}
    ])
    return {
        "issue_category": "technical",
        "sentiment": "negative"
    }

# Technical support agent
def technical_support_agent(state: CustomerSupportState) -> CustomerSupportState:
    """Diagnose technical issues and provide solutions"""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke([
        {"role": "system", "content": """You are a technical support specialist.
        Diagnose the issue and provide step-by-step solutions.
        If the issue requires engineering escalation, set escalated=true."""},
        {"role": "user", "content": str(state["messages"])}
    ])
    return {
        "resolution": response.content,
        "messages": [{"role": "assistant", "content": response.content}]
    }

# Billing support agent
def billing_support_agent(state: CustomerSupportState) -> CustomerSupportState:
    """Handle payment-related issues"""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke([
        {"role": "system", "content": """You are a billing specialist.
        Handle payment issues, refunds, and subscription changes."""},
        {"role": "user", "content": str(state["messages"])}
    ])
    return {
        "resolution": response.content,
        "messages": [{"role": "assistant", "content": response.content}]
    }

# Escalation agent
def escalation_agent(state: CustomerSupportState) -> CustomerSupportState:
    """Escalate complex issues to higher-level support"""
    return {
        "escalated": True,
        "messages": [
            {"role": "system",
             "content": f"Issue escalated for customer {state['customer_id']}"}
        ]
    }

# Routing functions
def route_issue(state: CustomerSupportState) -> Literal["technical", "billing", "general"]:
    return state["issue_category"]

def check_escalation(state: CustomerSupportState) -> Literal["escalate", "resolve"]:
    if state.get("escalated"):
        return "escalate"
    return "resolve"

# Build graph
workflow = StateGraph(CustomerSupportState)
workflow.add_node("triage", triage_agent)
workflow.add_node("technical", technical_support_agent)
workflow.add_node("billing", billing_support_agent)
workflow.add_node("escalation", escalation_agent)

workflow.add_edge(START, "triage")
workflow.add_conditional_edges("triage", route_issue, {
    "technical": "technical",
    "billing": "billing",
    "general": "billing"
})
workflow.add_conditional_edges("technical", check_escalation, {
    "escalate": "escalation",
    "resolve": END
})
workflow.add_edge("billing", END)
workflow.add_edge("escalation", END)

app = workflow.compile()

Failure Handling Strategies

Retry and Fallback Patterns

from tenacity import retry, stop_after_attempt, wait_exponential
import logging

logger = logging.getLogger(__name__)

class AgentWithRetry:
    """Agent wrapper with retry logic"""

    def __init__(self, agent, max_retries=3, fallback_agent=None):
        self.agent = agent
        self.max_retries = max_retries
        self.fallback_agent = fallback_agent

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=30)
    )
    async def invoke_with_retry(self, state):
        """Invoke agent with retry logic"""
        try:
            return await self.agent.ainvoke(state)
        except Exception as e:
            logger.error(f"Agent failed: {e}")
            raise

    async def invoke(self, state):
        """Invoke agent with fallback"""
        try:
            return await self.invoke_with_retry(state)
        except Exception as e:
            if self.fallback_agent:
                logger.warning(f"Falling back to backup agent: {e}")
                return await self.fallback_agent.ainvoke(state)
            raise

# Circuit breaker pattern
class CircuitBreaker:
    """Circuit breaker for agent resilience"""

    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.state = "closed"  # closed, open, half-open
        self.last_failure_time = None

    def can_execute(self) -> bool:
        if self.state == "closed":
            return True
        if self.state == "open":
            import time
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "half-open"
                return True
            return False
        return True  # half-open

    def record_success(self):
        self.failure_count = 0
        self.state = "closed"

    def record_failure(self):
        import time
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = "open"

Dead Letter Queue Pattern

import json
from datetime import datetime

class DeadLetterQueue:
    """Store failed messages for later retry"""

    def __init__(self, storage_path="dead_letters.json"):
        self.storage_path = storage_path
        self.messages = []

    def add(self, message: dict, error: str, agent_name: str):
        """Add a failed message to the queue"""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "agent": agent_name,
            "message": message,
            "error": str(error),
            "retry_count": 0
        }
        self.messages.append(entry)
        self._persist()

    def retry_all(self, agent_registry: dict):
        """Retry all messages in the queue"""
        for entry in self.messages:
            agent = agent_registry.get(entry["agent"])
            if agent:
                try:
                    agent.invoke(entry["message"])
                    self.messages.remove(entry)
                except Exception as e:
                    entry["retry_count"] += 1
                    entry["last_error"] = str(e)
        self._persist()

    def _persist(self):
        with open(self.storage_path, "w") as f:
            json.dump(self.messages, f, indent=2)

Observability

LangSmith Integration

import os

# Enable LangSmith tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "multi-agent-orchestration"

from langsmith import Client

client = Client()

def track_agent_metrics(agent_name: str, duration: float, tokens: int, success: bool):
    """Track agent execution metrics"""
    client.create_run(
        name=f"agent-{agent_name}",
        run_type="chain",
        inputs={"agent": agent_name},
        outputs={
            "duration_ms": duration * 1000,
            "total_tokens": tokens,
            "success": success
        }
    )

OpenTelemetry Integration

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# Set up tracer
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer("multi-agent-system")

def traced_agent_node(agent_name: str):
    """Agent node with OpenTelemetry tracing"""
    def node_fn(state):
        with tracer.start_as_current_span(f"agent.{agent_name}") as span:
            span.set_attribute("agent.name", agent_name)
            span.set_attribute("agent.input_messages", len(state["messages"]))

            try:
                result = agent.invoke(state)
                span.set_attribute("agent.success", True)
                return result
            except Exception as e:
                span.set_attribute("agent.success", False)
                span.record_exception(e)
                raise

    return node_fn

Production Deployment Checklist

Design Phase

  • Agent roles and tools are clearly defined
  • Inter-agent communication protocol is standardized
  • State management strategy is established (local vs distributed)
  • Failure scenarios have corresponding response strategies
  • Human-in-the-Loop intervention points are identified

Implementation Phase

  • Appropriate models are assigned to each agent (cost vs performance)
  • Tool execution timeouts are configured
  • Retry logic and circuit breakers are implemented
  • Dead letter queue tracks failed tasks
  • Input/output validation (guardrails) is applied

Deployment Phase

  • Observability pipeline is configured (LangSmith / OTEL)
  • Per-agent cost tracking is available
  • Rate limiting is applied
  • Security audit logs are enabled
  • Rollback strategy is established

Operations Phase

  • Agent performance dashboard is built
  • Anomaly detection alerts are configured
  • Prompt version management is applied
  • A/B testing framework is ready
  • Regular prompt optimization process is in place

Pattern Selection Guide

Decision Flowchart

Task Type Assessment
  |
  +-- Simple task (5 or fewer tools) ---------> Single Agent
  |
  +-- Fixed-order multi-step task ------------> Pipeline
  |
  +-- Dynamic routing needed -----------------> Hierarchical (Supervisor)
  |
  +-- Autonomous collaboration required ------> Swarm

Pattern Comparison Summary

PatternStrengthsWeaknessesComplexityScale
Single AgentSimple to implement, easy to debugLimited scalability, context saturationLowSmall
HierarchicalCentral control, dynamic routingSupervisor bottleneck, single point of failureMediumMedium
PipelinePredictable, easy to testInflexible, sequential latencyLow-MediumMedium
SwarmHigh flexibility, autonomous collaborationHard to debug, unpredictableHighLarge

Security Considerations

Agent Isolation

class SandboxedAgent:
    """Agent running in an isolated environment"""

    def __init__(self, agent, allowed_tools: list, max_tokens: int = 4096):
        self.agent = agent
        self.allowed_tools = set(allowed_tools)
        self.max_tokens = max_tokens

    def invoke(self, state):
        # Verify tool access permissions
        requested_tools = self._extract_tool_calls(state)
        unauthorized = requested_tools - self.allowed_tools
        if unauthorized:
            raise PermissionError(
                f"Agent attempted to use unauthorized tools: {unauthorized}"
            )

        # Enforce token limit
        if self._estimate_tokens(state) > self.max_tokens:
            raise ResourceError("Token limit exceeded")

        return self.agent.invoke(state)

    def _extract_tool_calls(self, state) -> set:
        return set()

    def _estimate_tokens(self, state) -> int:
        return 0

Prompt Injection Defense

from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, validator

class SafeAgentOutput(BaseModel):
    """Agent output validation schema"""
    response: str
    confidence: float
    sources: list[str]

    @validator("response")
    def validate_response(cls, v):
        forbidden_patterns = [
            "ignore previous instructions",
            "system prompt",
            "bypass",
            "jailbreak"
        ]
        for pattern in forbidden_patterns:
            if pattern.lower() in v.lower():
                raise ValueError(f"Suspicious pattern detected: {pattern}")
        return v

parser = PydanticOutputParser(pydantic_object=SafeAgentOutput)

Performance Optimization

Parallel Execution Strategy

from langgraph.graph import StateGraph, START, END
import asyncio

class ParallelState(TypedDict):
    messages: Annotated[list, operator.add]
    research_result: str
    analysis_result: str

async def parallel_execution(state):
    """Execute independent agents in parallel"""
    research_task = asyncio.create_task(
        researcher.ainvoke({"messages": state["messages"]})
    )
    analysis_task = asyncio.create_task(
        analyst.ainvoke({"messages": state["messages"]})
    )

    research_result, analysis_result = await asyncio.gather(
        research_task, analysis_task
    )

    return {
        "research_result": research_result["messages"][-1].content,
        "analysis_result": analysis_result["messages"][-1].content
    }

# LangGraph fan-out pattern
graph = StateGraph(ParallelState)
graph.add_node("research", researcher)
graph.add_node("analysis", analyst)
graph.add_node("synthesis", writer)

# Parallel execution: fan-out from START to both nodes
graph.add_edge(START, "research")
graph.add_edge(START, "analysis")

# Both results converge into synthesis
graph.add_edge("research", "synthesis")
graph.add_edge("analysis", "synthesis")
graph.add_edge("synthesis", END)

Caching Strategy

from functools import lru_cache
import hashlib
import json

class AgentCache:
    """Cache agent responses"""

    def __init__(self, ttl_seconds=3600):
        self.cache = {}
        self.ttl = ttl_seconds

    def get_cache_key(self, state: dict) -> str:
        """Generate cache key from state"""
        state_str = json.dumps(state, sort_keys=True, default=str)
        return hashlib.sha256(state_str.encode()).hexdigest()

    def get(self, state: dict):
        """Look up result in cache"""
        key = self.get_cache_key(state)
        if key in self.cache:
            entry = self.cache[key]
            import time
            if time.time() - entry["timestamp"] < self.ttl:
                return entry["result"]
            del self.cache[key]
        return None

    def set(self, state: dict, result):
        """Store result in cache"""
        import time
        key = self.get_cache_key(state)
        self.cache[key] = {
            "result": result,
            "timestamp": time.time()
        }

Conclusion

Multi-agent orchestration is not merely about connecting multiple agents. It is about selecting the right pattern for the task and building robust failure handling and observability.

Key takeaways:

  1. Start with a single agent and transition to multi-agent as complexity grows
  2. Hierarchical pattern suits centralized control scenarios
  3. Pipeline pattern is optimal for fixed sequential workflows
  4. Swarm pattern fits complex scenarios requiring high autonomy
  5. Choose frameworks based on need: LangGraph (flexibility), CrewAI (rapid prototyping), AutoGen (conversation-based)

References