Skip to content
Published on

AI Agent 멀티에이전트 오케스트레이션 패턴: 계층형·파이프라인·스웜 아키텍처 실전 가이드

Authors
  • Name
    Twitter

AI Agent Multi-Agent Orchestration Patterns

왜 지금 멀티에이전트인가

2026년 에이전틱 AI가 기업 애플리케이션의 40%에 탑재될 전망이다(Gartner). 단일 범용 에이전트에서 도메인 특화 멀티에이전트 협업으로 패러다임이 전환되고 있다. NIST AI Agent Standards Initiative 발표로 보안과 상호운용성 표준화가 본격화되었다.

이 글에서는 4가지 주요 멀티에이전트 오케스트레이션 패턴을 분석하고, LangGraph, CrewAI, AutoGen 프레임워크별 구현 코드를 제공한다.


멀티에이전트 시스템 개요

단일 에이전트의 한계

단일 에이전트는 다음과 같은 한계를 가진다.

한계설명
컨텍스트 윈도우 포화복잡한 작업일수록 프롬프트가 길어지고 성능 저하
도구 과부하수십 개의 도구를 하나의 에이전트에 부여하면 선택 정확도 하락
단일 장애점에이전트 하나가 실패하면 전체 워크플로우 중단
전문성 부족범용 프롬프트로는 도메인별 최적 결과를 얻기 어려움
확장성 제약작업량 증가 시 수평 확장 불가

멀티에이전트가 해결하는 문제

멀티에이전트 시스템은 분업과 협업을 통해 위 한계를 극복한다.

  • 전문화: 각 에이전트가 특정 도메인에 특화
  • 병렬 처리: 독립적인 작업을 동시에 수행
  • 장애 격리: 하나의 에이전트 실패가 전체 시스템에 영향을 주지 않음
  • 동적 구성: 작업에 따라 에이전트 조합을 유연하게 변경

4가지 오케스트레이션 패턴

패턴 1: 단일 에이전트 (Single Agent)

가장 기본적인 패턴으로, 하나의 에이전트가 모든 작업을 처리한다.

from langchain.agents import create_tool_calling_agent
from langchain_openai import ChatOpenAI
from langchain.tools import tool

@tool
def search_web(query: str) -> str:
    """웹에서 정보를 검색한다."""
    # 검색 로직
    return f"Search results for: {query}"

@tool
def calculate(expression: str) -> str:
    """수학 계산을 수행한다."""
    return str(eval(expression))

@tool
def write_file(filename: str, content: str) -> str:
    """파일에 내용을 작성한다."""
    with open(filename, "w") as f:
        f.write(content)
    return f"File {filename} written successfully"

llm = ChatOpenAI(model="gpt-4o")
tools = [search_web, calculate, write_file]

agent = create_tool_calling_agent(llm, tools, prompt_template)

적합한 상황: 작업이 단순하고 도구 수가 5개 이하인 경우

패턴 2: 계층형 멀티에이전트 (Hierarchical Multi-Agent)

슈퍼바이저 에이전트가 하위 에이전트들에게 작업을 분배하고 결과를 종합한다.

from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Literal
import operator

class SupervisorState(TypedDict):
    messages: Annotated[list, operator.add]
    next_agent: str
    final_answer: str

llm = ChatOpenAI(model="gpt-4o")

# 하위 에이전트 정의
researcher = create_react_agent(
    llm,
    tools=[search_web],
    state_modifier="You are a research specialist. Find accurate information."
)

analyst = create_react_agent(
    llm,
    tools=[calculate],
    state_modifier="You are a data analyst. Analyze data and provide insights."
)

writer = create_react_agent(
    llm,
    tools=[write_file],
    state_modifier="You are a technical writer. Create clear documentation."
)

# 슈퍼바이저 라우팅 로직
def supervisor_router(state: SupervisorState) -> Literal["researcher", "analyst", "writer", "__end__"]:
    """슈퍼바이저가 다음 에이전트를 결정한다."""
    last_message = state["messages"][-1]

    response = llm.invoke([
        {"role": "system", "content": """You are a supervisor managing a team.
        Route to: researcher (for information), analyst (for data), writer (for documentation).
        Return __end__ when the task is complete."""},
        {"role": "user", "content": last_message.content}
    ])

    return response.content.strip()

# 그래프 구성
graph = StateGraph(SupervisorState)
graph.add_node("supervisor", supervisor_router)
graph.add_node("researcher", researcher)
graph.add_node("analyst", analyst)
graph.add_node("writer", writer)

graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", supervisor_router)
graph.add_edge("researcher", "supervisor")
graph.add_edge("analyst", "supervisor")
graph.add_edge("writer", "supervisor")

app = graph.compile()

적합한 상황: 중앙 집중적 제어가 필요하고, 작업의 순서가 동적으로 결정되는 경우

패턴 3: 순차적 파이프라인 (Sequential Pipeline)

에이전트들이 정해진 순서대로 작업을 처리하고, 이전 에이전트의 출력이 다음 에이전트의 입력이 된다.

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator

class PipelineState(TypedDict):
    messages: Annotated[list, operator.add]
    research_output: str
    analysis_output: str
    report_output: str

def research_node(state: PipelineState) -> PipelineState:
    """1단계: 정보 수집"""
    result = researcher.invoke({"messages": state["messages"]})
    return {"research_output": result["messages"][-1].content}

def analysis_node(state: PipelineState) -> PipelineState:
    """2단계: 분석"""
    analysis_prompt = f"Analyze this research: {state['research_output']}"
    result = analyst.invoke({"messages": [{"role": "user", "content": analysis_prompt}]})
    return {"analysis_output": result["messages"][-1].content}

def report_node(state: PipelineState) -> PipelineState:
    """3단계: 보고서 작성"""
    report_prompt = f"""Write a report based on:
    Research: {state['research_output']}
    Analysis: {state['analysis_output']}"""
    result = writer.invoke({"messages": [{"role": "user", "content": report_prompt}]})
    return {"report_output": result["messages"][-1].content}

# 파이프라인 그래프
pipeline = StateGraph(PipelineState)
pipeline.add_node("research", research_node)
pipeline.add_node("analysis", analysis_node)
pipeline.add_node("report", report_node)

pipeline.add_edge(START, "research")
pipeline.add_edge("research", "analysis")
pipeline.add_edge("analysis", "report")
pipeline.add_edge("report", END)

app = pipeline.compile()

적합한 상황: 작업 순서가 명확하고, 각 단계의 출력이 다음 단계의 입력으로 사용되는 경우

패턴 4: 분산형 스웜 (Decentralized Swarm)

에이전트들이 자율적으로 협업하며, 중앙 조율자 없이 작업을 수행한다.

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated, Literal
import operator

class SwarmState(TypedDict):
    messages: Annotated[list, operator.add]
    current_agent: str
    task_board: dict  # 공유 태스크 보드

def agent_handoff(state: SwarmState, agent_name: str, target: str) -> SwarmState:
    """에이전트 간 핸드오프"""
    return {
        "current_agent": target,
        "messages": state["messages"] + [
            {"role": "system", "content": f"Handoff from {agent_name} to {target}"}
        ]
    }

def triage_agent(state: SwarmState) -> Literal["researcher", "analyst", "writer"]:
    """트리아지 에이전트: 작업을 적절한 에이전트에게 라우팅"""
    last_message = state["messages"][-1]

    if "search" in last_message.content.lower():
        return "researcher"
    elif "analyze" in last_message.content.lower():
        return "analyst"
    else:
        return "writer"

def researcher_with_handoff(state: SwarmState):
    """리서처가 작업 후 다음 에이전트에게 핸드오프"""
    result = researcher.invoke({"messages": state["messages"]})
    # 분석이 필요하면 analyst에게, 아니면 종료
    return agent_handoff(state, "researcher", "analyst")

def analyst_with_handoff(state: SwarmState):
    """분석가가 작업 후 다음 에이전트에게 핸드오프"""
    result = analyst.invoke({"messages": state["messages"]})
    return agent_handoff(state, "analyst", "writer")

# 스웜 그래프
swarm = StateGraph(SwarmState)
swarm.add_node("triage", triage_agent)
swarm.add_node("researcher", researcher_with_handoff)
swarm.add_node("analyst", analyst_with_handoff)
swarm.add_node("writer", writer)

swarm.add_edge(START, "triage")
swarm.add_conditional_edges("triage", triage_agent)
swarm.add_edge("researcher", "analyst")
swarm.add_edge("analyst", "writer")
swarm.add_edge("writer", END)

app = swarm.compile()

적합한 상황: 에이전트가 자율적으로 판단해야 하고, 유연한 협업이 필요한 경우


프레임워크 비교

LangGraph vs CrewAI vs AutoGen

특성LangGraphCrewAIAutoGen
아키텍처그래프 기반 상태 머신역할 기반 에이전트 팀대화 기반 멀티에이전트
유연성매우 높음 (저수준 제어)중간 (추상화된 API)높음 (커스텀 가능)
학습 곡선높음낮음중간
상태 관리내장 (체크포인트 지원)기본적대화 히스토리 기반
Human-in-the-Loop네이티브 지원기본 지원네이티브 지원
스트리밍네이티브 지원제한적이벤트 기반
프로덕션 준비도높음중간높음
커뮤니티 크기대형중형대형
라이선스MITMITMIT

CrewAI 구현 예시

from crewai import Agent, Task, Crew, Process

# 에이전트 정의
researcher = Agent(
    role="Senior Research Analyst",
    goal="Find comprehensive and accurate information about the given topic",
    backstory="""You are an expert researcher with decades of experience
    in gathering and synthesizing information from multiple sources.""",
    verbose=True,
    allow_delegation=True,
    tools=[search_tool, scrape_tool]
)

analyst = Agent(
    role="Data Analyst",
    goal="Analyze research findings and extract actionable insights",
    backstory="""You are a skilled data analyst who excels at finding
    patterns and drawing meaningful conclusions from data.""",
    verbose=True,
    tools=[analysis_tool, chart_tool]
)

writer = Agent(
    role="Technical Writer",
    goal="Create clear and comprehensive reports",
    backstory="""You are an experienced technical writer who can transform
    complex analyses into readable documents.""",
    verbose=True,
    tools=[write_tool]
)

# 태스크 정의
research_task = Task(
    description="Research the latest trends in AI agent orchestration",
    expected_output="A comprehensive summary of findings with sources",
    agent=researcher
)

analysis_task = Task(
    description="Analyze the research findings and identify key patterns",
    expected_output="An analytical report with data-driven insights",
    agent=analyst,
    context=[research_task]  # 이전 태스크 결과 참조
)

report_task = Task(
    description="Write a final report combining research and analysis",
    expected_output="A polished report ready for stakeholders",
    agent=writer,
    context=[research_task, analysis_task]
)

# 크루 구성 및 실행
crew = Crew(
    agents=[researcher, analyst, writer],
    tasks=[research_task, analysis_task, report_task],
    process=Process.sequential,  # 또는 Process.hierarchical
    verbose=True
)

result = crew.kickoff()
print(result)

AutoGen 구현 예시

from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager

# 에이전트 설정
config_list = [{"model": "gpt-4o", "api_key": "YOUR_API_KEY"}]

researcher = AssistantAgent(
    name="Researcher",
    system_message="""You are a research specialist.
    Find accurate and relevant information.
    When your research is complete, say RESEARCH_DONE.""",
    llm_config={"config_list": config_list}
)

analyst = AssistantAgent(
    name="Analyst",
    system_message="""You are a data analyst.
    Analyze the research findings and provide insights.
    When analysis is complete, say ANALYSIS_DONE.""",
    llm_config={"config_list": config_list}
)

writer = AssistantAgent(
    name="Writer",
    system_message="""You are a technical writer.
    Create clear documentation based on research and analysis.
    When the report is complete, say TERMINATE.""",
    llm_config={"config_list": config_list}
)

user_proxy = UserProxyAgent(
    name="Admin",
    human_input_mode="NEVER",
    code_execution_config={"work_dir": "output"},
    is_termination_msg=lambda x: "TERMINATE" in x.get("content", "")
)

# 그룹 채팅 설정
group_chat = GroupChat(
    agents=[user_proxy, researcher, analyst, writer],
    messages=[],
    max_round=20,
    speaker_selection_method="round_robin"
)

manager = GroupChatManager(
    groupchat=group_chat,
    llm_config={"config_list": config_list}
)

# 실행
user_proxy.initiate_chat(
    manager,
    message="Research AI agent orchestration patterns and write a report."
)

슈퍼바이저 패턴 심화

동적 라우팅 구현

슈퍼바이저가 작업을 분석하고, 최적의 에이전트에게 라우팅하는 고급 구현이다.

from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from typing import Literal

class RouteDecision(BaseModel):
    """슈퍼바이저의 라우팅 결정"""
    next_agent: Literal["researcher", "analyst", "writer", "FINISH"] = Field(
        description="The next agent to route to"
    )
    reasoning: str = Field(
        description="Why this agent was chosen"
    )
    task_description: str = Field(
        description="Specific task for the chosen agent"
    )

llm = ChatOpenAI(model="gpt-4o")
structured_llm = llm.with_structured_output(RouteDecision)

SUPERVISOR_PROMPT = """You are a supervisor managing a team of agents.
Based on the current state and conversation, decide:
1. Which agent should work next
2. What specific task they should perform
3. Whether the overall task is complete (FINISH)

Available agents:
- researcher: Searches for information and gathers data
- analyst: Analyzes data and provides insights
- writer: Creates reports and documentation

Current conversation:
{messages}

Task Board:
{task_board}
"""

def supervisor_node(state):
    """슈퍼바이저 노드: 동적 라우팅"""
    decision = structured_llm.invoke(
        SUPERVISOR_PROMPT.format(
            messages=state["messages"],
            task_board=state.get("task_board", "Empty")
        )
    )

    return {
        "next_agent": decision.next_agent,
        "messages": state["messages"] + [
            {"role": "system",
             "content": f"Supervisor routed to {decision.next_agent}: {decision.task_description}"}
        ]
    }

Human-in-the-Loop 통합

사람의 승인이 필요한 단계를 워크플로우에 삽입하는 패턴이다.

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END

checkpointer = MemorySaver()

def human_approval_node(state):
    """사람의 승인을 대기하는 노드"""
    # 이 노드에서 interrupt가 발생하면 실행이 중단됨
    # 사람이 승인하면 resume으로 계속 진행
    return {
        "messages": state["messages"] + [
            {"role": "system", "content": "Awaiting human approval..."}
        ],
        "approval_status": "pending"
    }

def check_approval(state) -> Literal["approved", "rejected"]:
    """승인 상태 확인"""
    return state.get("approval_status", "pending")

# 그래프에 Human-in-the-Loop 추가
graph = StateGraph(SupervisorState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("researcher", researcher)
graph.add_node("human_review", human_approval_node)
graph.add_node("writer", writer)

graph.add_edge(START, "supervisor")
graph.add_edge("supervisor", "researcher")
graph.add_edge("researcher", "human_review")
graph.add_conditional_edges(
    "human_review",
    check_approval,
    {"approved": "writer", "rejected": "supervisor"}
)
graph.add_edge("writer", END)

# 체크포인터로 상태 저장 및 복원
app = graph.compile(checkpointer=checkpointer, interrupt_before=["human_review"])

# 실행 후 중단 시점에서 재개
config = {"configurable": {"thread_id": "review-thread-1"}}
result = app.invoke(initial_state, config)

# 사람이 승인 후 재개
app.invoke(None, config)  # resume with approval

MCP 프로토콜 통합

Model Context Protocol (MCP) 이란

MCP는 Anthropic이 발표한 에이전트 간 상호운용성 프로토콜로, 에이전트가 외부 도구와 데이터 소스에 표준화된 방식으로 접근할 수 있게 한다.

# MCP 서버 구현 예시
from mcp import Server, Tool
import asyncio

server = Server("analytics-server")

@server.tool()
async def query_database(query: str) -> str:
    """데이터베이스에서 SQL 쿼리를 실행한다."""
    # 실제 DB 연결 및 쿼리 실행
    result = await db.execute(query)
    return str(result)

@server.tool()
async def generate_chart(data: str, chart_type: str) -> str:
    """데이터를 기반으로 차트를 생성한다."""
    # 차트 생성 로직
    return f"Chart generated: {chart_type}"

@server.resource("schema://tables")
async def list_tables() -> str:
    """사용 가능한 데이터베이스 테이블 목록"""
    tables = await db.get_tables()
    return "\n".join(tables)

# 서버 실행
async def main():
    async with server.run_stdio() as running:
        await running.wait()

asyncio.run(main())

MCP 클라이언트와 멀티에이전트 연동

from mcp import ClientSession, StdioServerParameters
from langchain_mcp_adapters.tools import load_mcp_tools
from langgraph.prebuilt import create_react_agent

# MCP 서버 연결 설정
server_params = StdioServerParameters(
    command="python",
    args=["analytics_server.py"]
)

async def create_mcp_agent():
    """MCP 도구를 사용하는 에이전트 생성"""
    async with ClientSession(*server_params) as session:
        await session.initialize()

        # MCP 도구를 LangChain 도구로 변환
        tools = await load_mcp_tools(session)

        # 에이전트 생성
        agent = create_react_agent(
            ChatOpenAI(model="gpt-4o"),
            tools,
            state_modifier="You are a data analyst with access to database tools."
        )

        return agent

실전 사례: 고객 지원 멀티에이전트 시스템

아키텍처 설계

고객 지원 시스템을 계층형 멀티에이전트로 구현하는 실전 예시이다.

from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Literal
import operator

class CustomerSupportState(TypedDict):
    messages: Annotated[list, operator.add]
    customer_id: str
    issue_category: str
    sentiment: str
    resolution: str
    escalated: bool

# 트리아지 에이전트
def triage_agent(state: CustomerSupportState) -> CustomerSupportState:
    """고객 문의를 분류하고 적절한 전문 에이전트에게 라우팅"""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke([
        {"role": "system", "content": """Classify the customer issue into one of:
        - billing: Payment, invoice, subscription issues
        - technical: Product bugs, errors, configuration
        - general: General inquiries, feedback
        Also assess sentiment: positive, neutral, negative, urgent"""},
        {"role": "user", "content": state["messages"][-1].content}
    ])
    # 분류 결과 파싱
    return {
        "issue_category": "technical",  # 파싱 결과
        "sentiment": "negative"
    }

# 기술 지원 에이전트
def technical_support_agent(state: CustomerSupportState) -> CustomerSupportState:
    """기술 문제를 진단하고 해결책을 제시"""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke([
        {"role": "system", "content": """You are a technical support specialist.
        Diagnose the issue and provide step-by-step solutions.
        If the issue requires engineering escalation, set escalated=true."""},
        {"role": "user", "content": str(state["messages"])}
    ])
    return {
        "resolution": response.content,
        "messages": [{"role": "assistant", "content": response.content}]
    }

# 빌링 지원 에이전트
def billing_support_agent(state: CustomerSupportState) -> CustomerSupportState:
    """결제 관련 문제를 처리"""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke([
        {"role": "system", "content": """You are a billing specialist.
        Handle payment issues, refunds, and subscription changes."""},
        {"role": "user", "content": str(state["messages"])}
    ])
    return {
        "resolution": response.content,
        "messages": [{"role": "assistant", "content": response.content}]
    }

# 에스컬레이션 에이전트
def escalation_agent(state: CustomerSupportState) -> CustomerSupportState:
    """복잡한 문제를 상위 레벨로 에스컬레이션"""
    return {
        "escalated": True,
        "messages": [
            {"role": "system",
             "content": f"Issue escalated for customer {state['customer_id']}"}
        ]
    }

# 라우팅 함수
def route_issue(state: CustomerSupportState) -> Literal["technical", "billing", "general"]:
    return state["issue_category"]

def check_escalation(state: CustomerSupportState) -> Literal["escalate", "resolve"]:
    if state.get("escalated"):
        return "escalate"
    return "resolve"

# 그래프 구성
workflow = StateGraph(CustomerSupportState)
workflow.add_node("triage", triage_agent)
workflow.add_node("technical", technical_support_agent)
workflow.add_node("billing", billing_support_agent)
workflow.add_node("escalation", escalation_agent)

workflow.add_edge(START, "triage")
workflow.add_conditional_edges("triage", route_issue, {
    "technical": "technical",
    "billing": "billing",
    "general": "billing"  # 일반 문의도 billing이 처리
})
workflow.add_conditional_edges("technical", check_escalation, {
    "escalate": "escalation",
    "resolve": END
})
workflow.add_edge("billing", END)
workflow.add_edge("escalation", END)

app = workflow.compile()

실패 처리 전략

재시도 및 폴백 패턴

from tenacity import retry, stop_after_attempt, wait_exponential
from langgraph.graph import StateGraph
import logging

logger = logging.getLogger(__name__)

class AgentWithRetry:
    """재시도 로직이 포함된 에이전트 래퍼"""

    def __init__(self, agent, max_retries=3, fallback_agent=None):
        self.agent = agent
        self.max_retries = max_retries
        self.fallback_agent = fallback_agent

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=30)
    )
    async def invoke_with_retry(self, state):
        """재시도 로직으로 에이전트 호출"""
        try:
            return await self.agent.ainvoke(state)
        except Exception as e:
            logger.error(f"Agent failed: {e}")
            raise

    async def invoke(self, state):
        """폴백 포함 에이전트 호출"""
        try:
            return await self.invoke_with_retry(state)
        except Exception as e:
            if self.fallback_agent:
                logger.warning(f"Falling back to backup agent: {e}")
                return await self.fallback_agent.ainvoke(state)
            raise

# 서킷 브레이커 패턴
class CircuitBreaker:
    """서킷 브레이커 패턴"""

    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.state = "closed"  # closed, open, half-open
        self.last_failure_time = None

    def can_execute(self) -> bool:
        if self.state == "closed":
            return True
        if self.state == "open":
            import time
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "half-open"
                return True
            return False
        return True  # half-open

    def record_success(self):
        self.failure_count = 0
        self.state = "closed"

    def record_failure(self):
        import time
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = "open"

데드 레터 큐 패턴

import json
from datetime import datetime

class DeadLetterQueue:
    """처리 실패한 메시지를 저장하는 데드 레터 큐"""

    def __init__(self, storage_path="dead_letters.json"):
        self.storage_path = storage_path
        self.messages = []

    def add(self, message: dict, error: str, agent_name: str):
        """실패한 메시지를 큐에 추가"""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "agent": agent_name,
            "message": message,
            "error": str(error),
            "retry_count": 0
        }
        self.messages.append(entry)
        self._persist()

    def retry_all(self, agent_registry: dict):
        """큐의 모든 메시지 재시도"""
        for entry in self.messages:
            agent = agent_registry.get(entry["agent"])
            if agent:
                try:
                    agent.invoke(entry["message"])
                    self.messages.remove(entry)
                except Exception as e:
                    entry["retry_count"] += 1
                    entry["last_error"] = str(e)
        self._persist()

    def _persist(self):
        with open(self.storage_path, "w") as f:
            json.dump(self.messages, f, indent=2)

관찰 가능성 (Observability)

LangSmith 통합

import os

# LangSmith 추적 활성화
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "multi-agent-orchestration"

# 커스텀 메트릭 수집
from langsmith import Client

client = Client()

def track_agent_metrics(agent_name: str, duration: float, tokens: int, success: bool):
    """에이전트 실행 메트릭 추적"""
    client.create_run(
        name=f"agent-{agent_name}",
        run_type="chain",
        inputs={"agent": agent_name},
        outputs={
            "duration_ms": duration * 1000,
            "total_tokens": tokens,
            "success": success
        }
    )

OpenTelemetry 통합

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# 트레이서 설정
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer("multi-agent-system")

def traced_agent_node(agent_name: str):
    """OpenTelemetry 트레이싱이 포함된 에이전트 노드"""
    def node_fn(state):
        with tracer.start_as_current_span(f"agent.{agent_name}") as span:
            span.set_attribute("agent.name", agent_name)
            span.set_attribute("agent.input_messages", len(state["messages"]))

            try:
                result = agent.invoke(state)
                span.set_attribute("agent.success", True)
                return result
            except Exception as e:
                span.set_attribute("agent.success", False)
                span.record_exception(e)
                raise

    return node_fn

프로덕션 배포 체크리스트

설계 단계

  • 에이전트별 역할과 도구가 명확히 정의되었는가
  • 에이전트 간 통신 프로토콜이 표준화되었는가
  • 상태 관리 전략이 수립되었는가 (로컬 vs 분산)
  • 장애 시나리오별 대응 전략이 있는가
  • Human-in-the-Loop 필요 지점이 식별되었는가

구현 단계

  • 각 에이전트에 적절한 모델이 할당되었는가 (비용 vs 성능)
  • 도구 실행에 타임아웃이 설정되었는가
  • 재시도 로직과 서킷 브레이커가 구현되었는가
  • 데드 레터 큐로 실패한 작업을 추적하는가
  • 입출력 검증 (guard rails)이 적용되었는가

배포 단계

  • 관찰 가능성 파이프라인이 구성되었는가 (LangSmith / OTEL)
  • 에이전트별 비용 추적이 가능한가
  • 레이트 리밋이 적용되었는가
  • 보안 감사 로그가 활성화되었는가
  • 롤백 전략이 수립되었는가

운영 단계

  • 에이전트 성능 대시보드가 구축되었는가
  • 이상 탐지 알림이 설정되었는가
  • 프롬프트 버전 관리가 적용되었는가
  • A/B 테스트 프레임워크가 준비되었는가
  • 정기적인 프롬프트 최적화 프로세스가 있는가

패턴 선택 가이드

의사결정 플로우차트

작업 유형 판단
  |
  ├─ 단순 작업 (도구 5개 이하) ─────> 단일 에이전트
  |
  ├─ 순서가 정해진 다단계 작업 ─────> 파이프라인
  |
  ├─ 동적 라우팅이 필요한 작업 ─────> 계층형 (슈퍼바이저)
  |
  └─ 자율적 협업이 필요한 복잡 작업 ─> 스웜

패턴별 장단점 종합

패턴장점단점복잡도적합한 규모
단일 에이전트구현 간단, 디버깅 용이확장성 한계, 컨텍스트 포화낮음소규모
계층형중앙 제어, 동적 라우팅슈퍼바이저 병목, 단일 장애점중간중규모
파이프라인예측 가능, 테스트 용이유연성 부족, 순차 실행 지연낮음-중간중규모
스웜높은 유연성, 자율적 협업디버깅 어려움, 예측 불가높음대규모

보안 고려사항

에이전트 격리

class SandboxedAgent:
    """격리된 환경에서 실행되는 에이전트"""

    def __init__(self, agent, allowed_tools: list, max_tokens: int = 4096):
        self.agent = agent
        self.allowed_tools = set(allowed_tools)
        self.max_tokens = max_tokens

    def invoke(self, state):
        # 도구 접근 권한 검증
        requested_tools = self._extract_tool_calls(state)
        unauthorized = requested_tools - self.allowed_tools
        if unauthorized:
            raise PermissionError(
                f"Agent attempted to use unauthorized tools: {unauthorized}"
            )

        # 토큰 사용량 제한
        if self._estimate_tokens(state) > self.max_tokens:
            raise ResourceError("Token limit exceeded")

        return self.agent.invoke(state)

    def _extract_tool_calls(self, state) -> set:
        # 상태에서 도구 호출 추출
        return set()

    def _estimate_tokens(self, state) -> int:
        # 토큰 사용량 추정
        return 0

프롬프트 인젝션 방어

from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, validator

class SafeAgentOutput(BaseModel):
    """에이전트 출력 검증 스키마"""
    response: str
    confidence: float
    sources: list[str]

    @validator("response")
    def validate_response(cls, v):
        # 금지된 패턴 검출
        forbidden_patterns = [
            "ignore previous instructions",
            "system prompt",
            "bypass",
            "jailbreak"
        ]
        for pattern in forbidden_patterns:
            if pattern.lower() in v.lower():
                raise ValueError(f"Suspicious pattern detected: {pattern}")
        return v

parser = PydanticOutputParser(pydantic_object=SafeAgentOutput)

성능 최적화

병렬 실행 전략

from langgraph.graph import StateGraph, START, END
import asyncio

class ParallelState(TypedDict):
    messages: Annotated[list, operator.add]
    research_result: str
    analysis_result: str

async def parallel_execution(state):
    """독립적인 에이전트를 병렬로 실행"""
    research_task = asyncio.create_task(
        researcher.ainvoke({"messages": state["messages"]})
    )
    analysis_task = asyncio.create_task(
        analyst.ainvoke({"messages": state["messages"]})
    )

    research_result, analysis_result = await asyncio.gather(
        research_task, analysis_task
    )

    return {
        "research_result": research_result["messages"][-1].content,
        "analysis_result": analysis_result["messages"][-1].content
    }

# LangGraph의 fan-out 패턴
graph = StateGraph(ParallelState)
graph.add_node("research", researcher)
graph.add_node("analysis", analyst)
graph.add_node("synthesis", writer)

# 병렬 실행: START에서 두 노드로 동시 분기
graph.add_edge(START, "research")
graph.add_edge(START, "analysis")

# 두 결과가 모두 완료되면 synthesis로
graph.add_edge("research", "synthesis")
graph.add_edge("analysis", "synthesis")
graph.add_edge("synthesis", END)

캐싱 전략

from functools import lru_cache
import hashlib
import json

class AgentCache:
    """에이전트 응답 캐싱"""

    def __init__(self, ttl_seconds=3600):
        self.cache = {}
        self.ttl = ttl_seconds

    def get_cache_key(self, state: dict) -> str:
        """상태에서 캐시 키 생성"""
        state_str = json.dumps(state, sort_keys=True, default=str)
        return hashlib.sha256(state_str.encode()).hexdigest()

    def get(self, state: dict):
        """캐시에서 결과 조회"""
        key = self.get_cache_key(state)
        if key in self.cache:
            entry = self.cache[key]
            import time
            if time.time() - entry["timestamp"] < self.ttl:
                return entry["result"]
            del self.cache[key]
        return None

    def set(self, state: dict, result):
        """결과를 캐시에 저장"""
        import time
        key = self.get_cache_key(state)
        self.cache[key] = {
            "result": result,
            "timestamp": time.time()
        }

마무리

멀티에이전트 오케스트레이션은 단순히 여러 에이전트를 연결하는 것이 아니라, 작업의 특성에 맞는 패턴을 선택하고 견고한 장애 처리와 관찰 가능성을 갖추는 것이 핵심이다.

핵심 정리:

  1. 단일 에이전트로 시작하고, 복잡도가 증가할 때 멀티에이전트로 전환
  2. 계층형 패턴은 중앙 제어가 필요한 경우에 적합
  3. 파이프라인 패턴은 순서가 정해진 워크플로우에 최적
  4. 스웜 패턴은 높은 자율성이 필요한 복잡한 시나리오에 적합
  5. 프레임워크는 LangGraph(유연성), CrewAI(빠른 프로토타이핑), AutoGen(대화 기반)을 용도에 맞게 선택

참고 자료