Skip to content

Split View: AI Agent 멀티에이전트 오케스트레이션 패턴: 계층형·파이프라인·스웜 아키텍처 실전 가이드

|

AI Agent 멀티에이전트 오케스트레이션 패턴: 계층형·파이프라인·스웜 아키텍처 실전 가이드

AI Agent Multi-Agent Orchestration Patterns

왜 지금 멀티에이전트인가

2026년 에이전틱 AI가 기업 애플리케이션의 40%에 탑재될 전망이다(Gartner). 단일 범용 에이전트에서 도메인 특화 멀티에이전트 협업으로 패러다임이 전환되고 있다. NIST AI Agent Standards Initiative 발표로 보안과 상호운용성 표준화가 본격화되었다.

이 글에서는 4가지 주요 멀티에이전트 오케스트레이션 패턴을 분석하고, LangGraph, CrewAI, AutoGen 프레임워크별 구현 코드를 제공한다.


멀티에이전트 시스템 개요

단일 에이전트의 한계

단일 에이전트는 다음과 같은 한계를 가진다.

한계설명
컨텍스트 윈도우 포화복잡한 작업일수록 프롬프트가 길어지고 성능 저하
도구 과부하수십 개의 도구를 하나의 에이전트에 부여하면 선택 정확도 하락
단일 장애점에이전트 하나가 실패하면 전체 워크플로우 중단
전문성 부족범용 프롬프트로는 도메인별 최적 결과를 얻기 어려움
확장성 제약작업량 증가 시 수평 확장 불가

멀티에이전트가 해결하는 문제

멀티에이전트 시스템은 분업과 협업을 통해 위 한계를 극복한다.

  • 전문화: 각 에이전트가 특정 도메인에 특화
  • 병렬 처리: 독립적인 작업을 동시에 수행
  • 장애 격리: 하나의 에이전트 실패가 전체 시스템에 영향을 주지 않음
  • 동적 구성: 작업에 따라 에이전트 조합을 유연하게 변경

4가지 오케스트레이션 패턴

패턴 1: 단일 에이전트 (Single Agent)

가장 기본적인 패턴으로, 하나의 에이전트가 모든 작업을 처리한다.

from langchain.agents import create_tool_calling_agent
from langchain_openai import ChatOpenAI
from langchain.tools import tool

@tool
def search_web(query: str) -> str:
    """웹에서 정보를 검색한다."""
    # 검색 로직
    return f"Search results for: {query}"

@tool
def calculate(expression: str) -> str:
    """수학 계산을 수행한다."""
    return str(eval(expression))

@tool
def write_file(filename: str, content: str) -> str:
    """파일에 내용을 작성한다."""
    with open(filename, "w") as f:
        f.write(content)
    return f"File {filename} written successfully"

llm = ChatOpenAI(model="gpt-4o")
tools = [search_web, calculate, write_file]

agent = create_tool_calling_agent(llm, tools, prompt_template)

적합한 상황: 작업이 단순하고 도구 수가 5개 이하인 경우

패턴 2: 계층형 멀티에이전트 (Hierarchical Multi-Agent)

슈퍼바이저 에이전트가 하위 에이전트들에게 작업을 분배하고 결과를 종합한다.

from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Literal
import operator

class SupervisorState(TypedDict):
    messages: Annotated[list, operator.add]
    next_agent: str
    final_answer: str

llm = ChatOpenAI(model="gpt-4o")

# 하위 에이전트 정의
researcher = create_react_agent(
    llm,
    tools=[search_web],
    state_modifier="You are a research specialist. Find accurate information."
)

analyst = create_react_agent(
    llm,
    tools=[calculate],
    state_modifier="You are a data analyst. Analyze data and provide insights."
)

writer = create_react_agent(
    llm,
    tools=[write_file],
    state_modifier="You are a technical writer. Create clear documentation."
)

# 슈퍼바이저 라우팅 로직
def supervisor_router(state: SupervisorState) -> Literal["researcher", "analyst", "writer", "__end__"]:
    """슈퍼바이저가 다음 에이전트를 결정한다."""
    last_message = state["messages"][-1]

    response = llm.invoke([
        {"role": "system", "content": """You are a supervisor managing a team.
        Route to: researcher (for information), analyst (for data), writer (for documentation).
        Return __end__ when the task is complete."""},
        {"role": "user", "content": last_message.content}
    ])

    return response.content.strip()

# 그래프 구성
graph = StateGraph(SupervisorState)
graph.add_node("supervisor", supervisor_router)
graph.add_node("researcher", researcher)
graph.add_node("analyst", analyst)
graph.add_node("writer", writer)

graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", supervisor_router)
graph.add_edge("researcher", "supervisor")
graph.add_edge("analyst", "supervisor")
graph.add_edge("writer", "supervisor")

app = graph.compile()

적합한 상황: 중앙 집중적 제어가 필요하고, 작업의 순서가 동적으로 결정되는 경우

패턴 3: 순차적 파이프라인 (Sequential Pipeline)

에이전트들이 정해진 순서대로 작업을 처리하고, 이전 에이전트의 출력이 다음 에이전트의 입력이 된다.

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator

class PipelineState(TypedDict):
    messages: Annotated[list, operator.add]
    research_output: str
    analysis_output: str
    report_output: str

def research_node(state: PipelineState) -> PipelineState:
    """1단계: 정보 수집"""
    result = researcher.invoke({"messages": state["messages"]})
    return {"research_output": result["messages"][-1].content}

def analysis_node(state: PipelineState) -> PipelineState:
    """2단계: 분석"""
    analysis_prompt = f"Analyze this research: {state['research_output']}"
    result = analyst.invoke({"messages": [{"role": "user", "content": analysis_prompt}]})
    return {"analysis_output": result["messages"][-1].content}

def report_node(state: PipelineState) -> PipelineState:
    """3단계: 보고서 작성"""
    report_prompt = f"""Write a report based on:
    Research: {state['research_output']}
    Analysis: {state['analysis_output']}"""
    result = writer.invoke({"messages": [{"role": "user", "content": report_prompt}]})
    return {"report_output": result["messages"][-1].content}

# 파이프라인 그래프
pipeline = StateGraph(PipelineState)
pipeline.add_node("research", research_node)
pipeline.add_node("analysis", analysis_node)
pipeline.add_node("report", report_node)

pipeline.add_edge(START, "research")
pipeline.add_edge("research", "analysis")
pipeline.add_edge("analysis", "report")
pipeline.add_edge("report", END)

app = pipeline.compile()

적합한 상황: 작업 순서가 명확하고, 각 단계의 출력이 다음 단계의 입력으로 사용되는 경우

패턴 4: 분산형 스웜 (Decentralized Swarm)

에이전트들이 자율적으로 협업하며, 중앙 조율자 없이 작업을 수행한다.

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated, Literal
import operator

class SwarmState(TypedDict):
    messages: Annotated[list, operator.add]
    current_agent: str
    task_board: dict  # 공유 태스크 보드

def agent_handoff(state: SwarmState, agent_name: str, target: str) -> SwarmState:
    """에이전트 간 핸드오프"""
    return {
        "current_agent": target,
        "messages": state["messages"] + [
            {"role": "system", "content": f"Handoff from {agent_name} to {target}"}
        ]
    }

def triage_agent(state: SwarmState) -> Literal["researcher", "analyst", "writer"]:
    """트리아지 에이전트: 작업을 적절한 에이전트에게 라우팅"""
    last_message = state["messages"][-1]

    if "search" in last_message.content.lower():
        return "researcher"
    elif "analyze" in last_message.content.lower():
        return "analyst"
    else:
        return "writer"

def researcher_with_handoff(state: SwarmState):
    """리서처가 작업 후 다음 에이전트에게 핸드오프"""
    result = researcher.invoke({"messages": state["messages"]})
    # 분석이 필요하면 analyst에게, 아니면 종료
    return agent_handoff(state, "researcher", "analyst")

def analyst_with_handoff(state: SwarmState):
    """분석가가 작업 후 다음 에이전트에게 핸드오프"""
    result = analyst.invoke({"messages": state["messages"]})
    return agent_handoff(state, "analyst", "writer")

# 스웜 그래프
swarm = StateGraph(SwarmState)
swarm.add_node("triage", triage_agent)
swarm.add_node("researcher", researcher_with_handoff)
swarm.add_node("analyst", analyst_with_handoff)
swarm.add_node("writer", writer)

swarm.add_edge(START, "triage")
swarm.add_conditional_edges("triage", triage_agent)
swarm.add_edge("researcher", "analyst")
swarm.add_edge("analyst", "writer")
swarm.add_edge("writer", END)

app = swarm.compile()

적합한 상황: 에이전트가 자율적으로 판단해야 하고, 유연한 협업이 필요한 경우


프레임워크 비교

LangGraph vs CrewAI vs AutoGen

특성LangGraphCrewAIAutoGen
아키텍처그래프 기반 상태 머신역할 기반 에이전트 팀대화 기반 멀티에이전트
유연성매우 높음 (저수준 제어)중간 (추상화된 API)높음 (커스텀 가능)
학습 곡선높음낮음중간
상태 관리내장 (체크포인트 지원)기본적대화 히스토리 기반
Human-in-the-Loop네이티브 지원기본 지원네이티브 지원
스트리밍네이티브 지원제한적이벤트 기반
프로덕션 준비도높음중간높음
커뮤니티 크기대형중형대형
라이선스MITMITMIT

CrewAI 구현 예시

from crewai import Agent, Task, Crew, Process

# 에이전트 정의
researcher = Agent(
    role="Senior Research Analyst",
    goal="Find comprehensive and accurate information about the given topic",
    backstory="""You are an expert researcher with decades of experience
    in gathering and synthesizing information from multiple sources.""",
    verbose=True,
    allow_delegation=True,
    tools=[search_tool, scrape_tool]
)

analyst = Agent(
    role="Data Analyst",
    goal="Analyze research findings and extract actionable insights",
    backstory="""You are a skilled data analyst who excels at finding
    patterns and drawing meaningful conclusions from data.""",
    verbose=True,
    tools=[analysis_tool, chart_tool]
)

writer = Agent(
    role="Technical Writer",
    goal="Create clear and comprehensive reports",
    backstory="""You are an experienced technical writer who can transform
    complex analyses into readable documents.""",
    verbose=True,
    tools=[write_tool]
)

# 태스크 정의
research_task = Task(
    description="Research the latest trends in AI agent orchestration",
    expected_output="A comprehensive summary of findings with sources",
    agent=researcher
)

analysis_task = Task(
    description="Analyze the research findings and identify key patterns",
    expected_output="An analytical report with data-driven insights",
    agent=analyst,
    context=[research_task]  # 이전 태스크 결과 참조
)

report_task = Task(
    description="Write a final report combining research and analysis",
    expected_output="A polished report ready for stakeholders",
    agent=writer,
    context=[research_task, analysis_task]
)

# 크루 구성 및 실행
crew = Crew(
    agents=[researcher, analyst, writer],
    tasks=[research_task, analysis_task, report_task],
    process=Process.sequential,  # 또는 Process.hierarchical
    verbose=True
)

result = crew.kickoff()
print(result)

AutoGen 구현 예시

from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager

# 에이전트 설정
config_list = [{"model": "gpt-4o", "api_key": "YOUR_API_KEY"}]

researcher = AssistantAgent(
    name="Researcher",
    system_message="""You are a research specialist.
    Find accurate and relevant information.
    When your research is complete, say RESEARCH_DONE.""",
    llm_config={"config_list": config_list}
)

analyst = AssistantAgent(
    name="Analyst",
    system_message="""You are a data analyst.
    Analyze the research findings and provide insights.
    When analysis is complete, say ANALYSIS_DONE.""",
    llm_config={"config_list": config_list}
)

writer = AssistantAgent(
    name="Writer",
    system_message="""You are a technical writer.
    Create clear documentation based on research and analysis.
    When the report is complete, say TERMINATE.""",
    llm_config={"config_list": config_list}
)

user_proxy = UserProxyAgent(
    name="Admin",
    human_input_mode="NEVER",
    code_execution_config={"work_dir": "output"},
    is_termination_msg=lambda x: "TERMINATE" in x.get("content", "")
)

# 그룹 채팅 설정
group_chat = GroupChat(
    agents=[user_proxy, researcher, analyst, writer],
    messages=[],
    max_round=20,
    speaker_selection_method="round_robin"
)

manager = GroupChatManager(
    groupchat=group_chat,
    llm_config={"config_list": config_list}
)

# 실행
user_proxy.initiate_chat(
    manager,
    message="Research AI agent orchestration patterns and write a report."
)

슈퍼바이저 패턴 심화

동적 라우팅 구현

슈퍼바이저가 작업을 분석하고, 최적의 에이전트에게 라우팅하는 고급 구현이다.

from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from typing import Literal

class RouteDecision(BaseModel):
    """슈퍼바이저의 라우팅 결정"""
    next_agent: Literal["researcher", "analyst", "writer", "FINISH"] = Field(
        description="The next agent to route to"
    )
    reasoning: str = Field(
        description="Why this agent was chosen"
    )
    task_description: str = Field(
        description="Specific task for the chosen agent"
    )

llm = ChatOpenAI(model="gpt-4o")
structured_llm = llm.with_structured_output(RouteDecision)

SUPERVISOR_PROMPT = """You are a supervisor managing a team of agents.
Based on the current state and conversation, decide:
1. Which agent should work next
2. What specific task they should perform
3. Whether the overall task is complete (FINISH)

Available agents:
- researcher: Searches for information and gathers data
- analyst: Analyzes data and provides insights
- writer: Creates reports and documentation

Current conversation:
{messages}

Task Board:
{task_board}
"""

def supervisor_node(state):
    """슈퍼바이저 노드: 동적 라우팅"""
    decision = structured_llm.invoke(
        SUPERVISOR_PROMPT.format(
            messages=state["messages"],
            task_board=state.get("task_board", "Empty")
        )
    )

    return {
        "next_agent": decision.next_agent,
        "messages": state["messages"] + [
            {"role": "system",
             "content": f"Supervisor routed to {decision.next_agent}: {decision.task_description}"}
        ]
    }

Human-in-the-Loop 통합

사람의 승인이 필요한 단계를 워크플로우에 삽입하는 패턴이다.

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END

checkpointer = MemorySaver()

def human_approval_node(state):
    """사람의 승인을 대기하는 노드"""
    # 이 노드에서 interrupt가 발생하면 실행이 중단됨
    # 사람이 승인하면 resume으로 계속 진행
    return {
        "messages": state["messages"] + [
            {"role": "system", "content": "Awaiting human approval..."}
        ],
        "approval_status": "pending"
    }

def check_approval(state) -> Literal["approved", "rejected"]:
    """승인 상태 확인"""
    return state.get("approval_status", "pending")

# 그래프에 Human-in-the-Loop 추가
graph = StateGraph(SupervisorState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("researcher", researcher)
graph.add_node("human_review", human_approval_node)
graph.add_node("writer", writer)

graph.add_edge(START, "supervisor")
graph.add_edge("supervisor", "researcher")
graph.add_edge("researcher", "human_review")
graph.add_conditional_edges(
    "human_review",
    check_approval,
    {"approved": "writer", "rejected": "supervisor"}
)
graph.add_edge("writer", END)

# 체크포인터로 상태 저장 및 복원
app = graph.compile(checkpointer=checkpointer, interrupt_before=["human_review"])

# 실행 후 중단 시점에서 재개
config = {"configurable": {"thread_id": "review-thread-1"}}
result = app.invoke(initial_state, config)

# 사람이 승인 후 재개
app.invoke(None, config)  # resume with approval

MCP 프로토콜 통합

Model Context Protocol (MCP) 이란

MCP는 Anthropic이 발표한 에이전트 간 상호운용성 프로토콜로, 에이전트가 외부 도구와 데이터 소스에 표준화된 방식으로 접근할 수 있게 한다.

# MCP 서버 구현 예시
from mcp import Server, Tool
import asyncio

server = Server("analytics-server")

@server.tool()
async def query_database(query: str) -> str:
    """데이터베이스에서 SQL 쿼리를 실행한다."""
    # 실제 DB 연결 및 쿼리 실행
    result = await db.execute(query)
    return str(result)

@server.tool()
async def generate_chart(data: str, chart_type: str) -> str:
    """데이터를 기반으로 차트를 생성한다."""
    # 차트 생성 로직
    return f"Chart generated: {chart_type}"

@server.resource("schema://tables")
async def list_tables() -> str:
    """사용 가능한 데이터베이스 테이블 목록"""
    tables = await db.get_tables()
    return "\n".join(tables)

# 서버 실행
async def main():
    async with server.run_stdio() as running:
        await running.wait()

asyncio.run(main())

MCP 클라이언트와 멀티에이전트 연동

from mcp import ClientSession, StdioServerParameters
from langchain_mcp_adapters.tools import load_mcp_tools
from langgraph.prebuilt import create_react_agent

# MCP 서버 연결 설정
server_params = StdioServerParameters(
    command="python",
    args=["analytics_server.py"]
)

async def create_mcp_agent():
    """MCP 도구를 사용하는 에이전트 생성"""
    async with ClientSession(*server_params) as session:
        await session.initialize()

        # MCP 도구를 LangChain 도구로 변환
        tools = await load_mcp_tools(session)

        # 에이전트 생성
        agent = create_react_agent(
            ChatOpenAI(model="gpt-4o"),
            tools,
            state_modifier="You are a data analyst with access to database tools."
        )

        return agent

실전 사례: 고객 지원 멀티에이전트 시스템

아키텍처 설계

고객 지원 시스템을 계층형 멀티에이전트로 구현하는 실전 예시이다.

from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Literal
import operator

class CustomerSupportState(TypedDict):
    messages: Annotated[list, operator.add]
    customer_id: str
    issue_category: str
    sentiment: str
    resolution: str
    escalated: bool

# 트리아지 에이전트
def triage_agent(state: CustomerSupportState) -> CustomerSupportState:
    """고객 문의를 분류하고 적절한 전문 에이전트에게 라우팅"""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke([
        {"role": "system", "content": """Classify the customer issue into one of:
        - billing: Payment, invoice, subscription issues
        - technical: Product bugs, errors, configuration
        - general: General inquiries, feedback
        Also assess sentiment: positive, neutral, negative, urgent"""},
        {"role": "user", "content": state["messages"][-1].content}
    ])
    # 분류 결과 파싱
    return {
        "issue_category": "technical",  # 파싱 결과
        "sentiment": "negative"
    }

# 기술 지원 에이전트
def technical_support_agent(state: CustomerSupportState) -> CustomerSupportState:
    """기술 문제를 진단하고 해결책을 제시"""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke([
        {"role": "system", "content": """You are a technical support specialist.
        Diagnose the issue and provide step-by-step solutions.
        If the issue requires engineering escalation, set escalated=true."""},
        {"role": "user", "content": str(state["messages"])}
    ])
    return {
        "resolution": response.content,
        "messages": [{"role": "assistant", "content": response.content}]
    }

# 빌링 지원 에이전트
def billing_support_agent(state: CustomerSupportState) -> CustomerSupportState:
    """결제 관련 문제를 처리"""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke([
        {"role": "system", "content": """You are a billing specialist.
        Handle payment issues, refunds, and subscription changes."""},
        {"role": "user", "content": str(state["messages"])}
    ])
    return {
        "resolution": response.content,
        "messages": [{"role": "assistant", "content": response.content}]
    }

# 에스컬레이션 에이전트
def escalation_agent(state: CustomerSupportState) -> CustomerSupportState:
    """복잡한 문제를 상위 레벨로 에스컬레이션"""
    return {
        "escalated": True,
        "messages": [
            {"role": "system",
             "content": f"Issue escalated for customer {state['customer_id']}"}
        ]
    }

# 라우팅 함수
def route_issue(state: CustomerSupportState) -> Literal["technical", "billing", "general"]:
    return state["issue_category"]

def check_escalation(state: CustomerSupportState) -> Literal["escalate", "resolve"]:
    if state.get("escalated"):
        return "escalate"
    return "resolve"

# 그래프 구성
workflow = StateGraph(CustomerSupportState)
workflow.add_node("triage", triage_agent)
workflow.add_node("technical", technical_support_agent)
workflow.add_node("billing", billing_support_agent)
workflow.add_node("escalation", escalation_agent)

workflow.add_edge(START, "triage")
workflow.add_conditional_edges("triage", route_issue, {
    "technical": "technical",
    "billing": "billing",
    "general": "billing"  # 일반 문의도 billing이 처리
})
workflow.add_conditional_edges("technical", check_escalation, {
    "escalate": "escalation",
    "resolve": END
})
workflow.add_edge("billing", END)
workflow.add_edge("escalation", END)

app = workflow.compile()

실패 처리 전략

재시도 및 폴백 패턴

from tenacity import retry, stop_after_attempt, wait_exponential
from langgraph.graph import StateGraph
import logging

logger = logging.getLogger(__name__)

class AgentWithRetry:
    """재시도 로직이 포함된 에이전트 래퍼"""

    def __init__(self, agent, max_retries=3, fallback_agent=None):
        self.agent = agent
        self.max_retries = max_retries
        self.fallback_agent = fallback_agent

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=30)
    )
    async def invoke_with_retry(self, state):
        """재시도 로직으로 에이전트 호출"""
        try:
            return await self.agent.ainvoke(state)
        except Exception as e:
            logger.error(f"Agent failed: {e}")
            raise

    async def invoke(self, state):
        """폴백 포함 에이전트 호출"""
        try:
            return await self.invoke_with_retry(state)
        except Exception as e:
            if self.fallback_agent:
                logger.warning(f"Falling back to backup agent: {e}")
                return await self.fallback_agent.ainvoke(state)
            raise

# 서킷 브레이커 패턴
class CircuitBreaker:
    """서킷 브레이커 패턴"""

    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.state = "closed"  # closed, open, half-open
        self.last_failure_time = None

    def can_execute(self) -> bool:
        if self.state == "closed":
            return True
        if self.state == "open":
            import time
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "half-open"
                return True
            return False
        return True  # half-open

    def record_success(self):
        self.failure_count = 0
        self.state = "closed"

    def record_failure(self):
        import time
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = "open"

데드 레터 큐 패턴

import json
from datetime import datetime

class DeadLetterQueue:
    """처리 실패한 메시지를 저장하는 데드 레터 큐"""

    def __init__(self, storage_path="dead_letters.json"):
        self.storage_path = storage_path
        self.messages = []

    def add(self, message: dict, error: str, agent_name: str):
        """실패한 메시지를 큐에 추가"""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "agent": agent_name,
            "message": message,
            "error": str(error),
            "retry_count": 0
        }
        self.messages.append(entry)
        self._persist()

    def retry_all(self, agent_registry: dict):
        """큐의 모든 메시지 재시도"""
        for entry in self.messages:
            agent = agent_registry.get(entry["agent"])
            if agent:
                try:
                    agent.invoke(entry["message"])
                    self.messages.remove(entry)
                except Exception as e:
                    entry["retry_count"] += 1
                    entry["last_error"] = str(e)
        self._persist()

    def _persist(self):
        with open(self.storage_path, "w") as f:
            json.dump(self.messages, f, indent=2)

관찰 가능성 (Observability)

LangSmith 통합

import os

# LangSmith 추적 활성화
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "multi-agent-orchestration"

# 커스텀 메트릭 수집
from langsmith import Client

client = Client()

def track_agent_metrics(agent_name: str, duration: float, tokens: int, success: bool):
    """에이전트 실행 메트릭 추적"""
    client.create_run(
        name=f"agent-{agent_name}",
        run_type="chain",
        inputs={"agent": agent_name},
        outputs={
            "duration_ms": duration * 1000,
            "total_tokens": tokens,
            "success": success
        }
    )

OpenTelemetry 통합

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# 트레이서 설정
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer("multi-agent-system")

def traced_agent_node(agent_name: str):
    """OpenTelemetry 트레이싱이 포함된 에이전트 노드"""
    def node_fn(state):
        with tracer.start_as_current_span(f"agent.{agent_name}") as span:
            span.set_attribute("agent.name", agent_name)
            span.set_attribute("agent.input_messages", len(state["messages"]))

            try:
                result = agent.invoke(state)
                span.set_attribute("agent.success", True)
                return result
            except Exception as e:
                span.set_attribute("agent.success", False)
                span.record_exception(e)
                raise

    return node_fn

프로덕션 배포 체크리스트

설계 단계

  • 에이전트별 역할과 도구가 명확히 정의되었는가
  • 에이전트 간 통신 프로토콜이 표준화되었는가
  • 상태 관리 전략이 수립되었는가 (로컬 vs 분산)
  • 장애 시나리오별 대응 전략이 있는가
  • Human-in-the-Loop 필요 지점이 식별되었는가

구현 단계

  • 각 에이전트에 적절한 모델이 할당되었는가 (비용 vs 성능)
  • 도구 실행에 타임아웃이 설정되었는가
  • 재시도 로직과 서킷 브레이커가 구현되었는가
  • 데드 레터 큐로 실패한 작업을 추적하는가
  • 입출력 검증 (guard rails)이 적용되었는가

배포 단계

  • 관찰 가능성 파이프라인이 구성되었는가 (LangSmith / OTEL)
  • 에이전트별 비용 추적이 가능한가
  • 레이트 리밋이 적용되었는가
  • 보안 감사 로그가 활성화되었는가
  • 롤백 전략이 수립되었는가

운영 단계

  • 에이전트 성능 대시보드가 구축되었는가
  • 이상 탐지 알림이 설정되었는가
  • 프롬프트 버전 관리가 적용되었는가
  • A/B 테스트 프레임워크가 준비되었는가
  • 정기적인 프롬프트 최적화 프로세스가 있는가

패턴 선택 가이드

의사결정 플로우차트

작업 유형 판단
  |
  ├─ 단순 작업 (도구 5개 이하) ─────> 단일 에이전트
  |
  ├─ 순서가 정해진 다단계 작업 ─────> 파이프라인
  |
  ├─ 동적 라우팅이 필요한 작업 ─────> 계층형 (슈퍼바이저)
  |
  └─ 자율적 협업이 필요한 복잡 작업 ─> 스웜

패턴별 장단점 종합

패턴장점단점복잡도적합한 규모
단일 에이전트구현 간단, 디버깅 용이확장성 한계, 컨텍스트 포화낮음소규모
계층형중앙 제어, 동적 라우팅슈퍼바이저 병목, 단일 장애점중간중규모
파이프라인예측 가능, 테스트 용이유연성 부족, 순차 실행 지연낮음-중간중규모
스웜높은 유연성, 자율적 협업디버깅 어려움, 예측 불가높음대규모

보안 고려사항

에이전트 격리

class SandboxedAgent:
    """격리된 환경에서 실행되는 에이전트"""

    def __init__(self, agent, allowed_tools: list, max_tokens: int = 4096):
        self.agent = agent
        self.allowed_tools = set(allowed_tools)
        self.max_tokens = max_tokens

    def invoke(self, state):
        # 도구 접근 권한 검증
        requested_tools = self._extract_tool_calls(state)
        unauthorized = requested_tools - self.allowed_tools
        if unauthorized:
            raise PermissionError(
                f"Agent attempted to use unauthorized tools: {unauthorized}"
            )

        # 토큰 사용량 제한
        if self._estimate_tokens(state) > self.max_tokens:
            raise ResourceError("Token limit exceeded")

        return self.agent.invoke(state)

    def _extract_tool_calls(self, state) -> set:
        # 상태에서 도구 호출 추출
        return set()

    def _estimate_tokens(self, state) -> int:
        # 토큰 사용량 추정
        return 0

프롬프트 인젝션 방어

from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, validator

class SafeAgentOutput(BaseModel):
    """에이전트 출력 검증 스키마"""
    response: str
    confidence: float
    sources: list[str]

    @validator("response")
    def validate_response(cls, v):
        # 금지된 패턴 검출
        forbidden_patterns = [
            "ignore previous instructions",
            "system prompt",
            "bypass",
            "jailbreak"
        ]
        for pattern in forbidden_patterns:
            if pattern.lower() in v.lower():
                raise ValueError(f"Suspicious pattern detected: {pattern}")
        return v

parser = PydanticOutputParser(pydantic_object=SafeAgentOutput)

성능 최적화

병렬 실행 전략

from langgraph.graph import StateGraph, START, END
import asyncio

class ParallelState(TypedDict):
    messages: Annotated[list, operator.add]
    research_result: str
    analysis_result: str

async def parallel_execution(state):
    """독립적인 에이전트를 병렬로 실행"""
    research_task = asyncio.create_task(
        researcher.ainvoke({"messages": state["messages"]})
    )
    analysis_task = asyncio.create_task(
        analyst.ainvoke({"messages": state["messages"]})
    )

    research_result, analysis_result = await asyncio.gather(
        research_task, analysis_task
    )

    return {
        "research_result": research_result["messages"][-1].content,
        "analysis_result": analysis_result["messages"][-1].content
    }

# LangGraph의 fan-out 패턴
graph = StateGraph(ParallelState)
graph.add_node("research", researcher)
graph.add_node("analysis", analyst)
graph.add_node("synthesis", writer)

# 병렬 실행: START에서 두 노드로 동시 분기
graph.add_edge(START, "research")
graph.add_edge(START, "analysis")

# 두 결과가 모두 완료되면 synthesis로
graph.add_edge("research", "synthesis")
graph.add_edge("analysis", "synthesis")
graph.add_edge("synthesis", END)

캐싱 전략

from functools import lru_cache
import hashlib
import json

class AgentCache:
    """에이전트 응답 캐싱"""

    def __init__(self, ttl_seconds=3600):
        self.cache = {}
        self.ttl = ttl_seconds

    def get_cache_key(self, state: dict) -> str:
        """상태에서 캐시 키 생성"""
        state_str = json.dumps(state, sort_keys=True, default=str)
        return hashlib.sha256(state_str.encode()).hexdigest()

    def get(self, state: dict):
        """캐시에서 결과 조회"""
        key = self.get_cache_key(state)
        if key in self.cache:
            entry = self.cache[key]
            import time
            if time.time() - entry["timestamp"] < self.ttl:
                return entry["result"]
            del self.cache[key]
        return None

    def set(self, state: dict, result):
        """결과를 캐시에 저장"""
        import time
        key = self.get_cache_key(state)
        self.cache[key] = {
            "result": result,
            "timestamp": time.time()
        }

마무리

멀티에이전트 오케스트레이션은 단순히 여러 에이전트를 연결하는 것이 아니라, 작업의 특성에 맞는 패턴을 선택하고 견고한 장애 처리와 관찰 가능성을 갖추는 것이 핵심이다.

핵심 정리:

  1. 단일 에이전트로 시작하고, 복잡도가 증가할 때 멀티에이전트로 전환
  2. 계층형 패턴은 중앙 제어가 필요한 경우에 적합
  3. 파이프라인 패턴은 순서가 정해진 워크플로우에 최적
  4. 스웜 패턴은 높은 자율성이 필요한 복잡한 시나리오에 적합
  5. 프레임워크는 LangGraph(유연성), CrewAI(빠른 프로토타이핑), AutoGen(대화 기반)을 용도에 맞게 선택

참고 자료

AI Agent Multi-Agent Orchestration Patterns: A Practical Guide to Hierarchical, Pipeline, and Swarm Architectures

AI Agent Multi-Agent Orchestration Patterns

Why Multi-Agent Now

Gartner projects that agentic AI will be embedded in 40% of enterprise applications by 2026. The paradigm is shifting from single general-purpose agents to domain-specialized multi-agent collaboration. The NIST AI Agent Standards Initiative has formalized security and interoperability standards.

This article analyzes four major multi-agent orchestration patterns and provides implementation code for LangGraph, CrewAI, and AutoGen frameworks.


Multi-Agent System Overview

Limitations of Single Agents

Single agents face the following constraints:

LimitationDescription
Context window saturationComplex tasks require longer prompts, degrading performance
Tool overloadAssigning dozens of tools to one agent reduces selection accuracy
Single point of failureOne agent failure halts the entire workflow
Lack of specializationGeneral prompts cannot achieve domain-optimal results
Scalability constraintsCannot scale horizontally as workload increases

What Multi-Agent Systems Solve

Multi-agent systems overcome these limits through division of labor and collaboration:

  • Specialization: Each agent is optimized for a specific domain
  • Parallel processing: Independent tasks execute simultaneously
  • Fault isolation: One agent's failure does not affect the entire system
  • Dynamic composition: Agent combinations adapt flexibly to tasks

Four Orchestration Patterns

Pattern 1: Single Agent

The most basic pattern where one agent handles all tasks.

from langchain.agents import create_tool_calling_agent
from langchain_openai import ChatOpenAI
from langchain.tools import tool

@tool
def search_web(query: str) -> str:
    """Search the web for information."""
    return f"Search results for: {query}"

@tool
def calculate(expression: str) -> str:
    """Perform mathematical calculations."""
    return str(eval(expression))

@tool
def write_file(filename: str, content: str) -> str:
    """Write content to a file."""
    with open(filename, "w") as f:
        f.write(content)
    return f"File {filename} written successfully"

llm = ChatOpenAI(model="gpt-4o")
tools = [search_web, calculate, write_file]

agent = create_tool_calling_agent(llm, tools, prompt_template)

Best for: Simple tasks with 5 or fewer tools

Pattern 2: Hierarchical Multi-Agent

A supervisor agent distributes tasks to subordinate agents and aggregates results.

from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Literal
import operator

class SupervisorState(TypedDict):
    messages: Annotated[list, operator.add]
    next_agent: str
    final_answer: str

llm = ChatOpenAI(model="gpt-4o")

# Define subordinate agents
researcher = create_react_agent(
    llm,
    tools=[search_web],
    state_modifier="You are a research specialist. Find accurate information."
)

analyst = create_react_agent(
    llm,
    tools=[calculate],
    state_modifier="You are a data analyst. Analyze data and provide insights."
)

writer = create_react_agent(
    llm,
    tools=[write_file],
    state_modifier="You are a technical writer. Create clear documentation."
)

# Supervisor routing logic
def supervisor_router(state: SupervisorState) -> Literal["researcher", "analyst", "writer", "__end__"]:
    """Supervisor determines the next agent."""
    last_message = state["messages"][-1]

    response = llm.invoke([
        {"role": "system", "content": """You are a supervisor managing a team.
        Route to: researcher (for information), analyst (for data), writer (for documentation).
        Return __end__ when the task is complete."""},
        {"role": "user", "content": last_message.content}
    ])

    return response.content.strip()

# Build the graph
graph = StateGraph(SupervisorState)
graph.add_node("supervisor", supervisor_router)
graph.add_node("researcher", researcher)
graph.add_node("analyst", analyst)
graph.add_node("writer", writer)

graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", supervisor_router)
graph.add_edge("researcher", "supervisor")
graph.add_edge("analyst", "supervisor")
graph.add_edge("writer", "supervisor")

app = graph.compile()

Best for: Centralized control with dynamic task ordering

Pattern 3: Sequential Pipeline

Agents process tasks in a fixed order, with each agent's output feeding into the next agent's input.

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator

class PipelineState(TypedDict):
    messages: Annotated[list, operator.add]
    research_output: str
    analysis_output: str
    report_output: str

def research_node(state: PipelineState) -> PipelineState:
    """Stage 1: Information gathering"""
    result = researcher.invoke({"messages": state["messages"]})
    return {"research_output": result["messages"][-1].content}

def analysis_node(state: PipelineState) -> PipelineState:
    """Stage 2: Analysis"""
    analysis_prompt = f"Analyze this research: {state['research_output']}"
    result = analyst.invoke({"messages": [{"role": "user", "content": analysis_prompt}]})
    return {"analysis_output": result["messages"][-1].content}

def report_node(state: PipelineState) -> PipelineState:
    """Stage 3: Report generation"""
    report_prompt = f"""Write a report based on:
    Research: {state['research_output']}
    Analysis: {state['analysis_output']}"""
    result = writer.invoke({"messages": [{"role": "user", "content": report_prompt}]})
    return {"report_output": result["messages"][-1].content}

# Pipeline graph
pipeline = StateGraph(PipelineState)
pipeline.add_node("research", research_node)
pipeline.add_node("analysis", analysis_node)
pipeline.add_node("report", report_node)

pipeline.add_edge(START, "research")
pipeline.add_edge("research", "analysis")
pipeline.add_edge("analysis", "report")
pipeline.add_edge("report", END)

app = pipeline.compile()

Best for: Well-defined sequential workflows where each stage's output feeds the next

Pattern 4: Decentralized Swarm

Agents collaborate autonomously without a central coordinator.

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated, Literal
import operator

class SwarmState(TypedDict):
    messages: Annotated[list, operator.add]
    current_agent: str
    task_board: dict  # Shared task board

def agent_handoff(state: SwarmState, agent_name: str, target: str) -> SwarmState:
    """Handoff between agents"""
    return {
        "current_agent": target,
        "messages": state["messages"] + [
            {"role": "system", "content": f"Handoff from {agent_name} to {target}"}
        ]
    }

def triage_agent(state: SwarmState) -> Literal["researcher", "analyst", "writer"]:
    """Triage agent: routes tasks to the appropriate agent"""
    last_message = state["messages"][-1]

    if "search" in last_message.content.lower():
        return "researcher"
    elif "analyze" in last_message.content.lower():
        return "analyst"
    else:
        return "writer"

def researcher_with_handoff(state: SwarmState):
    """Researcher processes and hands off to the next agent"""
    result = researcher.invoke({"messages": state["messages"]})
    return agent_handoff(state, "researcher", "analyst")

def analyst_with_handoff(state: SwarmState):
    """Analyst processes and hands off to the next agent"""
    result = analyst.invoke({"messages": state["messages"]})
    return agent_handoff(state, "analyst", "writer")

# Swarm graph
swarm = StateGraph(SwarmState)
swarm.add_node("triage", triage_agent)
swarm.add_node("researcher", researcher_with_handoff)
swarm.add_node("analyst", analyst_with_handoff)
swarm.add_node("writer", writer)

swarm.add_edge(START, "triage")
swarm.add_conditional_edges("triage", triage_agent)
swarm.add_edge("researcher", "analyst")
swarm.add_edge("analyst", "writer")
swarm.add_edge("writer", END)

app = swarm.compile()

Best for: Tasks requiring autonomous decision-making and flexible collaboration


Framework Comparison

LangGraph vs CrewAI vs AutoGen

FeatureLangGraphCrewAIAutoGen
ArchitectureGraph-based state machineRole-based agent teamsConversation-based multi-agent
FlexibilityVery high (low-level control)Medium (abstracted API)High (customizable)
Learning curveHighLowMedium
State managementBuilt-in (checkpoint support)BasicConversation history-based
Human-in-the-LoopNative supportBasic supportNative support
StreamingNative supportLimitedEvent-based
Production readinessHighMediumHigh
Community sizeLargeMediumLarge
LicenseMITMITMIT

CrewAI Implementation

from crewai import Agent, Task, Crew, Process

# Define agents
researcher = Agent(
    role="Senior Research Analyst",
    goal="Find comprehensive and accurate information about the given topic",
    backstory="""You are an expert researcher with decades of experience
    in gathering and synthesizing information from multiple sources.""",
    verbose=True,
    allow_delegation=True,
    tools=[search_tool, scrape_tool]
)

analyst = Agent(
    role="Data Analyst",
    goal="Analyze research findings and extract actionable insights",
    backstory="""You are a skilled data analyst who excels at finding
    patterns and drawing meaningful conclusions from data.""",
    verbose=True,
    tools=[analysis_tool, chart_tool]
)

writer = Agent(
    role="Technical Writer",
    goal="Create clear and comprehensive reports",
    backstory="""You are an experienced technical writer who can transform
    complex analyses into readable documents.""",
    verbose=True,
    tools=[write_tool]
)

# Define tasks
research_task = Task(
    description="Research the latest trends in AI agent orchestration",
    expected_output="A comprehensive summary of findings with sources",
    agent=researcher
)

analysis_task = Task(
    description="Analyze the research findings and identify key patterns",
    expected_output="An analytical report with data-driven insights",
    agent=analyst,
    context=[research_task]  # Reference previous task results
)

report_task = Task(
    description="Write a final report combining research and analysis",
    expected_output="A polished report ready for stakeholders",
    agent=writer,
    context=[research_task, analysis_task]
)

# Build and execute crew
crew = Crew(
    agents=[researcher, analyst, writer],
    tasks=[research_task, analysis_task, report_task],
    process=Process.sequential,  # or Process.hierarchical
    verbose=True
)

result = crew.kickoff()
print(result)

AutoGen Implementation

from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager

# Agent configuration
config_list = [{"model": "gpt-4o", "api_key": "YOUR_API_KEY"}]

researcher = AssistantAgent(
    name="Researcher",
    system_message="""You are a research specialist.
    Find accurate and relevant information.
    When your research is complete, say RESEARCH_DONE.""",
    llm_config={"config_list": config_list}
)

analyst = AssistantAgent(
    name="Analyst",
    system_message="""You are a data analyst.
    Analyze the research findings and provide insights.
    When analysis is complete, say ANALYSIS_DONE.""",
    llm_config={"config_list": config_list}
)

writer = AssistantAgent(
    name="Writer",
    system_message="""You are a technical writer.
    Create clear documentation based on research and analysis.
    When the report is complete, say TERMINATE.""",
    llm_config={"config_list": config_list}
)

user_proxy = UserProxyAgent(
    name="Admin",
    human_input_mode="NEVER",
    code_execution_config={"work_dir": "output"},
    is_termination_msg=lambda x: "TERMINATE" in x.get("content", "")
)

# Group chat setup
group_chat = GroupChat(
    agents=[user_proxy, researcher, analyst, writer],
    messages=[],
    max_round=20,
    speaker_selection_method="round_robin"
)

manager = GroupChatManager(
    groupchat=group_chat,
    llm_config={"config_list": config_list}
)

# Execute
user_proxy.initiate_chat(
    manager,
    message="Research AI agent orchestration patterns and write a report."
)

Advanced Supervisor Pattern

Dynamic Routing Implementation

An advanced supervisor that analyzes tasks and routes them to the optimal agent.

from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from typing import Literal

class RouteDecision(BaseModel):
    """Supervisor routing decision"""
    next_agent: Literal["researcher", "analyst", "writer", "FINISH"] = Field(
        description="The next agent to route to"
    )
    reasoning: str = Field(
        description="Why this agent was chosen"
    )
    task_description: str = Field(
        description="Specific task for the chosen agent"
    )

llm = ChatOpenAI(model="gpt-4o")
structured_llm = llm.with_structured_output(RouteDecision)

SUPERVISOR_PROMPT = """You are a supervisor managing a team of agents.
Based on the current state and conversation, decide:
1. Which agent should work next
2. What specific task they should perform
3. Whether the overall task is complete (FINISH)

Available agents:
- researcher: Searches for information and gathers data
- analyst: Analyzes data and provides insights
- writer: Creates reports and documentation

Current conversation:
{messages}

Task Board:
{task_board}
"""

def supervisor_node(state):
    """Supervisor node: dynamic routing"""
    decision = structured_llm.invoke(
        SUPERVISOR_PROMPT.format(
            messages=state["messages"],
            task_board=state.get("task_board", "Empty")
        )
    )

    return {
        "next_agent": decision.next_agent,
        "messages": state["messages"] + [
            {"role": "system",
             "content": f"Supervisor routed to {decision.next_agent}: {decision.task_description}"}
        ]
    }

Human-in-the-Loop Integration

Inserting human approval steps into the workflow.

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END

checkpointer = MemorySaver()

def human_approval_node(state):
    """Node that awaits human approval"""
    return {
        "messages": state["messages"] + [
            {"role": "system", "content": "Awaiting human approval..."}
        ],
        "approval_status": "pending"
    }

def check_approval(state) -> Literal["approved", "rejected"]:
    """Check approval status"""
    return state.get("approval_status", "pending")

# Add Human-in-the-Loop to graph
graph = StateGraph(SupervisorState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("researcher", researcher)
graph.add_node("human_review", human_approval_node)
graph.add_node("writer", writer)

graph.add_edge(START, "supervisor")
graph.add_edge("supervisor", "researcher")
graph.add_edge("researcher", "human_review")
graph.add_conditional_edges(
    "human_review",
    check_approval,
    {"approved": "writer", "rejected": "supervisor"}
)
graph.add_edge("writer", END)

# Compile with checkpointer for state persistence
app = graph.compile(checkpointer=checkpointer, interrupt_before=["human_review"])

# Execute and pause at interrupt point
config = {"configurable": {"thread_id": "review-thread-1"}}
result = app.invoke(initial_state, config)

# Resume after human approval
app.invoke(None, config)

MCP Protocol Integration

What is Model Context Protocol (MCP)

MCP is an interoperability protocol published by Anthropic that enables agents to access external tools and data sources through a standardized interface.

# MCP server implementation
from mcp import Server, Tool
import asyncio

server = Server("analytics-server")

@server.tool()
async def query_database(query: str) -> str:
    """Execute a SQL query against the database."""
    result = await db.execute(query)
    return str(result)

@server.tool()
async def generate_chart(data: str, chart_type: str) -> str:
    """Generate a chart from the given data."""
    return f"Chart generated: {chart_type}"

@server.resource("schema://tables")
async def list_tables() -> str:
    """List available database tables"""
    tables = await db.get_tables()
    return "\n".join(tables)

# Run server
async def main():
    async with server.run_stdio() as running:
        await running.wait()

asyncio.run(main())

MCP Client with Multi-Agent Integration

from mcp import ClientSession, StdioServerParameters
from langchain_mcp_adapters.tools import load_mcp_tools
from langgraph.prebuilt import create_react_agent

# MCP server connection
server_params = StdioServerParameters(
    command="python",
    args=["analytics_server.py"]
)

async def create_mcp_agent():
    """Create an agent with MCP tools"""
    async with ClientSession(*server_params) as session:
        await session.initialize()

        # Convert MCP tools to LangChain tools
        tools = await load_mcp_tools(session)

        # Create agent
        agent = create_react_agent(
            ChatOpenAI(model="gpt-4o"),
            tools,
            state_modifier="You are a data analyst with access to database tools."
        )

        return agent

Real-World Example: Customer Support Multi-Agent System

Architecture Design

A production customer support system implemented with hierarchical multi-agent orchestration.

from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Literal
import operator

class CustomerSupportState(TypedDict):
    messages: Annotated[list, operator.add]
    customer_id: str
    issue_category: str
    sentiment: str
    resolution: str
    escalated: bool

# Triage agent
def triage_agent(state: CustomerSupportState) -> CustomerSupportState:
    """Classify customer inquiries and route to specialized agents"""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke([
        {"role": "system", "content": """Classify the customer issue into one of:
        - billing: Payment, invoice, subscription issues
        - technical: Product bugs, errors, configuration
        - general: General inquiries, feedback
        Also assess sentiment: positive, neutral, negative, urgent"""},
        {"role": "user", "content": state["messages"][-1].content}
    ])
    return {
        "issue_category": "technical",
        "sentiment": "negative"
    }

# Technical support agent
def technical_support_agent(state: CustomerSupportState) -> CustomerSupportState:
    """Diagnose technical issues and provide solutions"""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke([
        {"role": "system", "content": """You are a technical support specialist.
        Diagnose the issue and provide step-by-step solutions.
        If the issue requires engineering escalation, set escalated=true."""},
        {"role": "user", "content": str(state["messages"])}
    ])
    return {
        "resolution": response.content,
        "messages": [{"role": "assistant", "content": response.content}]
    }

# Billing support agent
def billing_support_agent(state: CustomerSupportState) -> CustomerSupportState:
    """Handle payment-related issues"""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke([
        {"role": "system", "content": """You are a billing specialist.
        Handle payment issues, refunds, and subscription changes."""},
        {"role": "user", "content": str(state["messages"])}
    ])
    return {
        "resolution": response.content,
        "messages": [{"role": "assistant", "content": response.content}]
    }

# Escalation agent
def escalation_agent(state: CustomerSupportState) -> CustomerSupportState:
    """Escalate complex issues to higher-level support"""
    return {
        "escalated": True,
        "messages": [
            {"role": "system",
             "content": f"Issue escalated for customer {state['customer_id']}"}
        ]
    }

# Routing functions
def route_issue(state: CustomerSupportState) -> Literal["technical", "billing", "general"]:
    return state["issue_category"]

def check_escalation(state: CustomerSupportState) -> Literal["escalate", "resolve"]:
    if state.get("escalated"):
        return "escalate"
    return "resolve"

# Build graph
workflow = StateGraph(CustomerSupportState)
workflow.add_node("triage", triage_agent)
workflow.add_node("technical", technical_support_agent)
workflow.add_node("billing", billing_support_agent)
workflow.add_node("escalation", escalation_agent)

workflow.add_edge(START, "triage")
workflow.add_conditional_edges("triage", route_issue, {
    "technical": "technical",
    "billing": "billing",
    "general": "billing"
})
workflow.add_conditional_edges("technical", check_escalation, {
    "escalate": "escalation",
    "resolve": END
})
workflow.add_edge("billing", END)
workflow.add_edge("escalation", END)

app = workflow.compile()

Failure Handling Strategies

Retry and Fallback Patterns

from tenacity import retry, stop_after_attempt, wait_exponential
import logging

logger = logging.getLogger(__name__)

class AgentWithRetry:
    """Agent wrapper with retry logic"""

    def __init__(self, agent, max_retries=3, fallback_agent=None):
        self.agent = agent
        self.max_retries = max_retries
        self.fallback_agent = fallback_agent

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=30)
    )
    async def invoke_with_retry(self, state):
        """Invoke agent with retry logic"""
        try:
            return await self.agent.ainvoke(state)
        except Exception as e:
            logger.error(f"Agent failed: {e}")
            raise

    async def invoke(self, state):
        """Invoke agent with fallback"""
        try:
            return await self.invoke_with_retry(state)
        except Exception as e:
            if self.fallback_agent:
                logger.warning(f"Falling back to backup agent: {e}")
                return await self.fallback_agent.ainvoke(state)
            raise

# Circuit breaker pattern
class CircuitBreaker:
    """Circuit breaker for agent resilience"""

    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.state = "closed"  # closed, open, half-open
        self.last_failure_time = None

    def can_execute(self) -> bool:
        if self.state == "closed":
            return True
        if self.state == "open":
            import time
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "half-open"
                return True
            return False
        return True  # half-open

    def record_success(self):
        self.failure_count = 0
        self.state = "closed"

    def record_failure(self):
        import time
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = "open"

Dead Letter Queue Pattern

import json
from datetime import datetime

class DeadLetterQueue:
    """Store failed messages for later retry"""

    def __init__(self, storage_path="dead_letters.json"):
        self.storage_path = storage_path
        self.messages = []

    def add(self, message: dict, error: str, agent_name: str):
        """Add a failed message to the queue"""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "agent": agent_name,
            "message": message,
            "error": str(error),
            "retry_count": 0
        }
        self.messages.append(entry)
        self._persist()

    def retry_all(self, agent_registry: dict):
        """Retry all messages in the queue"""
        for entry in self.messages:
            agent = agent_registry.get(entry["agent"])
            if agent:
                try:
                    agent.invoke(entry["message"])
                    self.messages.remove(entry)
                except Exception as e:
                    entry["retry_count"] += 1
                    entry["last_error"] = str(e)
        self._persist()

    def _persist(self):
        with open(self.storage_path, "w") as f:
            json.dump(self.messages, f, indent=2)

Observability

LangSmith Integration

import os

# Enable LangSmith tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "multi-agent-orchestration"

from langsmith import Client

client = Client()

def track_agent_metrics(agent_name: str, duration: float, tokens: int, success: bool):
    """Track agent execution metrics"""
    client.create_run(
        name=f"agent-{agent_name}",
        run_type="chain",
        inputs={"agent": agent_name},
        outputs={
            "duration_ms": duration * 1000,
            "total_tokens": tokens,
            "success": success
        }
    )

OpenTelemetry Integration

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# Set up tracer
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer("multi-agent-system")

def traced_agent_node(agent_name: str):
    """Agent node with OpenTelemetry tracing"""
    def node_fn(state):
        with tracer.start_as_current_span(f"agent.{agent_name}") as span:
            span.set_attribute("agent.name", agent_name)
            span.set_attribute("agent.input_messages", len(state["messages"]))

            try:
                result = agent.invoke(state)
                span.set_attribute("agent.success", True)
                return result
            except Exception as e:
                span.set_attribute("agent.success", False)
                span.record_exception(e)
                raise

    return node_fn

Production Deployment Checklist

Design Phase

  • Agent roles and tools are clearly defined
  • Inter-agent communication protocol is standardized
  • State management strategy is established (local vs distributed)
  • Failure scenarios have corresponding response strategies
  • Human-in-the-Loop intervention points are identified

Implementation Phase

  • Appropriate models are assigned to each agent (cost vs performance)
  • Tool execution timeouts are configured
  • Retry logic and circuit breakers are implemented
  • Dead letter queue tracks failed tasks
  • Input/output validation (guardrails) is applied

Deployment Phase

  • Observability pipeline is configured (LangSmith / OTEL)
  • Per-agent cost tracking is available
  • Rate limiting is applied
  • Security audit logs are enabled
  • Rollback strategy is established

Operations Phase

  • Agent performance dashboard is built
  • Anomaly detection alerts are configured
  • Prompt version management is applied
  • A/B testing framework is ready
  • Regular prompt optimization process is in place

Pattern Selection Guide

Decision Flowchart

Task Type Assessment
  |
  +-- Simple task (5 or fewer tools) ---------> Single Agent
  |
  +-- Fixed-order multi-step task ------------> Pipeline
  |
  +-- Dynamic routing needed -----------------> Hierarchical (Supervisor)
  |
  +-- Autonomous collaboration required ------> Swarm

Pattern Comparison Summary

PatternStrengthsWeaknessesComplexityScale
Single AgentSimple to implement, easy to debugLimited scalability, context saturationLowSmall
HierarchicalCentral control, dynamic routingSupervisor bottleneck, single point of failureMediumMedium
PipelinePredictable, easy to testInflexible, sequential latencyLow-MediumMedium
SwarmHigh flexibility, autonomous collaborationHard to debug, unpredictableHighLarge

Security Considerations

Agent Isolation

class SandboxedAgent:
    """Agent running in an isolated environment"""

    def __init__(self, agent, allowed_tools: list, max_tokens: int = 4096):
        self.agent = agent
        self.allowed_tools = set(allowed_tools)
        self.max_tokens = max_tokens

    def invoke(self, state):
        # Verify tool access permissions
        requested_tools = self._extract_tool_calls(state)
        unauthorized = requested_tools - self.allowed_tools
        if unauthorized:
            raise PermissionError(
                f"Agent attempted to use unauthorized tools: {unauthorized}"
            )

        # Enforce token limit
        if self._estimate_tokens(state) > self.max_tokens:
            raise ResourceError("Token limit exceeded")

        return self.agent.invoke(state)

    def _extract_tool_calls(self, state) -> set:
        return set()

    def _estimate_tokens(self, state) -> int:
        return 0

Prompt Injection Defense

from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, validator

class SafeAgentOutput(BaseModel):
    """Agent output validation schema"""
    response: str
    confidence: float
    sources: list[str]

    @validator("response")
    def validate_response(cls, v):
        forbidden_patterns = [
            "ignore previous instructions",
            "system prompt",
            "bypass",
            "jailbreak"
        ]
        for pattern in forbidden_patterns:
            if pattern.lower() in v.lower():
                raise ValueError(f"Suspicious pattern detected: {pattern}")
        return v

parser = PydanticOutputParser(pydantic_object=SafeAgentOutput)

Performance Optimization

Parallel Execution Strategy

from langgraph.graph import StateGraph, START, END
import asyncio

class ParallelState(TypedDict):
    messages: Annotated[list, operator.add]
    research_result: str
    analysis_result: str

async def parallel_execution(state):
    """Execute independent agents in parallel"""
    research_task = asyncio.create_task(
        researcher.ainvoke({"messages": state["messages"]})
    )
    analysis_task = asyncio.create_task(
        analyst.ainvoke({"messages": state["messages"]})
    )

    research_result, analysis_result = await asyncio.gather(
        research_task, analysis_task
    )

    return {
        "research_result": research_result["messages"][-1].content,
        "analysis_result": analysis_result["messages"][-1].content
    }

# LangGraph fan-out pattern
graph = StateGraph(ParallelState)
graph.add_node("research", researcher)
graph.add_node("analysis", analyst)
graph.add_node("synthesis", writer)

# Parallel execution: fan-out from START to both nodes
graph.add_edge(START, "research")
graph.add_edge(START, "analysis")

# Both results converge into synthesis
graph.add_edge("research", "synthesis")
graph.add_edge("analysis", "synthesis")
graph.add_edge("synthesis", END)

Caching Strategy

from functools import lru_cache
import hashlib
import json

class AgentCache:
    """Cache agent responses"""

    def __init__(self, ttl_seconds=3600):
        self.cache = {}
        self.ttl = ttl_seconds

    def get_cache_key(self, state: dict) -> str:
        """Generate cache key from state"""
        state_str = json.dumps(state, sort_keys=True, default=str)
        return hashlib.sha256(state_str.encode()).hexdigest()

    def get(self, state: dict):
        """Look up result in cache"""
        key = self.get_cache_key(state)
        if key in self.cache:
            entry = self.cache[key]
            import time
            if time.time() - entry["timestamp"] < self.ttl:
                return entry["result"]
            del self.cache[key]
        return None

    def set(self, state: dict, result):
        """Store result in cache"""
        import time
        key = self.get_cache_key(state)
        self.cache[key] = {
            "result": result,
            "timestamp": time.time()
        }

Conclusion

Multi-agent orchestration is not merely about connecting multiple agents. It is about selecting the right pattern for the task and building robust failure handling and observability.

Key takeaways:

  1. Start with a single agent and transition to multi-agent as complexity grows
  2. Hierarchical pattern suits centralized control scenarios
  3. Pipeline pattern is optimal for fixed sequential workflows
  4. Swarm pattern fits complex scenarios requiring high autonomy
  5. Choose frameworks based on need: LangGraph (flexibility), CrewAI (rapid prototyping), AutoGen (conversation-based)

References