필사 모드: AI Agent Multi-Agent Orchestration Patterns: A Practical Guide to Hierarchical, Pipeline, and Swarm Architectures
English왜 지금 멀티에이전트인가
2026년 에이전틱 AI가 기업 애플리케이션의 40%에 탑재될 전망이다(Gartner). 단일 범용 에이전트에서 도메인 특화 멀티에이전트 협업으로 패러다임이 전환되고 있다. NIST AI Agent Standards Initiative 발표로 보안과 상호운용성 표준화가 본격화되었다.
이 글에서는 **4가지 주요 멀티에이전트 오케스트레이션 패턴**을 분석하고, LangGraph, CrewAI, AutoGen 프레임워크별 구현 코드를 제공한다.
멀티에이전트 시스템 개요
단일 에이전트의 한계
단일 에이전트는 다음과 같은 한계를 가진다.
| 한계 | 설명 |
| ------------------------ | ------------------------------------------------------------ |
| **컨텍스트 윈도우 포화** | 복잡한 작업일수록 프롬프트가 길어지고 성능 저하 |
| **도구 과부하** | 수십 개의 도구를 하나의 에이전트에 부여하면 선택 정확도 하락 |
| **단일 장애점** | 에이전트 하나가 실패하면 전체 워크플로우 중단 |
| **전문성 부족** | 범용 프롬프트로는 도메인별 최적 결과를 얻기 어려움 |
| **확장성 제약** | 작업량 증가 시 수평 확장 불가 |
멀티에이전트가 해결하는 문제
멀티에이전트 시스템은 **분업과 협업**을 통해 위 한계를 극복한다.
- **전문화**: 각 에이전트가 특정 도메인에 특화
- **병렬 처리**: 독립적인 작업을 동시에 수행
- **장애 격리**: 하나의 에이전트 실패가 전체 시스템에 영향을 주지 않음
- **동적 구성**: 작업에 따라 에이전트 조합을 유연하게 변경
4가지 오케스트레이션 패턴
패턴 1: 단일 에이전트 (Single Agent)
가장 기본적인 패턴으로, 하나의 에이전트가 모든 작업을 처리한다.
from langchain.agents import create_tool_calling_agent
from langchain_openai import ChatOpenAI
from langchain.tools import tool
@tool
def search_web(query: str) -> str:
"""웹에서 정보를 검색한다."""
검색 로직
return f"Search results for: {query}"
@tool
def calculate(expression: str) -> str:
"""수학 계산을 수행한다."""
return str(eval(expression))
@tool
def write_file(filename: str, content: str) -> str:
"""파일에 내용을 작성한다."""
with open(filename, "w") as f:
f.write(content)
return f"File {filename} written successfully"
llm = ChatOpenAI(model="gpt-4o")
tools = [search_web, calculate, write_file]
agent = create_tool_calling_agent(llm, tools, prompt_template)
**적합한 상황**: 작업이 단순하고 도구 수가 5개 이하인 경우
패턴 2: 계층형 멀티에이전트 (Hierarchical Multi-Agent)
**슈퍼바이저 에이전트**가 하위 에이전트들에게 작업을 분배하고 결과를 종합한다.
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Literal
class SupervisorState(TypedDict):
messages: Annotated[list, operator.add]
next_agent: str
final_answer: str
llm = ChatOpenAI(model="gpt-4o")
하위 에이전트 정의
researcher = create_react_agent(
llm,
tools=[search_web],
state_modifier="You are a research specialist. Find accurate information."
)
analyst = create_react_agent(
llm,
tools=[calculate],
state_modifier="You are a data analyst. Analyze data and provide insights."
)
writer = create_react_agent(
llm,
tools=[write_file],
state_modifier="You are a technical writer. Create clear documentation."
)
슈퍼바이저 라우팅 로직
def supervisor_router(state: SupervisorState) -> Literal["researcher", "analyst", "writer", "__end__"]:
"""슈퍼바이저가 다음 에이전트를 결정한다."""
last_message = state["messages"][-1]
response = llm.invoke([
{"role": "system", "content": """You are a supervisor managing a team.
Route to: researcher (for information), analyst (for data), writer (for documentation).
Return __end__ when the task is complete."""},
{"role": "user", "content": last_message.content}
])
return response.content.strip()
그래프 구성
graph = StateGraph(SupervisorState)
graph.add_node("supervisor", supervisor_router)
graph.add_node("researcher", researcher)
graph.add_node("analyst", analyst)
graph.add_node("writer", writer)
graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", supervisor_router)
graph.add_edge("researcher", "supervisor")
graph.add_edge("analyst", "supervisor")
graph.add_edge("writer", "supervisor")
app = graph.compile()
**적합한 상황**: 중앙 집중적 제어가 필요하고, 작업의 순서가 동적으로 결정되는 경우
패턴 3: 순차적 파이프라인 (Sequential Pipeline)
에이전트들이 정해진 순서대로 작업을 처리하고, 이전 에이전트의 출력이 다음 에이전트의 입력이 된다.
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
class PipelineState(TypedDict):
messages: Annotated[list, operator.add]
research_output: str
analysis_output: str
report_output: str
def research_node(state: PipelineState) -> PipelineState:
"""1단계: 정보 수집"""
result = researcher.invoke({"messages": state["messages"]})
return {"research_output": result["messages"][-1].content}
def analysis_node(state: PipelineState) -> PipelineState:
"""2단계: 분석"""
analysis_prompt = f"Analyze this research: {state['research_output']}"
result = analyst.invoke({"messages": [{"role": "user", "content": analysis_prompt}]})
return {"analysis_output": result["messages"][-1].content}
def report_node(state: PipelineState) -> PipelineState:
"""3단계: 보고서 작성"""
report_prompt = f"""Write a report based on:
Research: {state['research_output']}
Analysis: {state['analysis_output']}"""
result = writer.invoke({"messages": [{"role": "user", "content": report_prompt}]})
return {"report_output": result["messages"][-1].content}
파이프라인 그래프
pipeline = StateGraph(PipelineState)
pipeline.add_node("research", research_node)
pipeline.add_node("analysis", analysis_node)
pipeline.add_node("report", report_node)
pipeline.add_edge(START, "research")
pipeline.add_edge("research", "analysis")
pipeline.add_edge("analysis", "report")
pipeline.add_edge("report", END)
app = pipeline.compile()
**적합한 상황**: 작업 순서가 명확하고, 각 단계의 출력이 다음 단계의 입력으로 사용되는 경우
패턴 4: 분산형 스웜 (Decentralized Swarm)
에이전트들이 자율적으로 협업하며, 중앙 조율자 없이 작업을 수행한다.
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated, Literal
class SwarmState(TypedDict):
messages: Annotated[list, operator.add]
current_agent: str
task_board: dict # 공유 태스크 보드
def agent_handoff(state: SwarmState, agent_name: str, target: str) -> SwarmState:
"""에이전트 간 핸드오프"""
return {
"current_agent": target,
"messages": state["messages"] + [
{"role": "system", "content": f"Handoff from {agent_name} to {target}"}
]
}
def triage_agent(state: SwarmState) -> Literal["researcher", "analyst", "writer"]:
"""트리아지 에이전트: 작업을 적절한 에이전트에게 라우팅"""
last_message = state["messages"][-1]
if "search" in last_message.content.lower():
return "researcher"
elif "analyze" in last_message.content.lower():
return "analyst"
else:
return "writer"
def researcher_with_handoff(state: SwarmState):
"""리서처가 작업 후 다음 에이전트에게 핸드오프"""
result = researcher.invoke({"messages": state["messages"]})
분석이 필요하면 analyst에게, 아니면 종료
return agent_handoff(state, "researcher", "analyst")
def analyst_with_handoff(state: SwarmState):
"""분석가가 작업 후 다음 에이전트에게 핸드오프"""
result = analyst.invoke({"messages": state["messages"]})
return agent_handoff(state, "analyst", "writer")
스웜 그래프
swarm = StateGraph(SwarmState)
swarm.add_node("triage", triage_agent)
swarm.add_node("researcher", researcher_with_handoff)
swarm.add_node("analyst", analyst_with_handoff)
swarm.add_node("writer", writer)
swarm.add_edge(START, "triage")
swarm.add_conditional_edges("triage", triage_agent)
swarm.add_edge("researcher", "analyst")
swarm.add_edge("analyst", "writer")
swarm.add_edge("writer", END)
app = swarm.compile()
**적합한 상황**: 에이전트가 자율적으로 판단해야 하고, 유연한 협업이 필요한 경우
프레임워크 비교
LangGraph vs CrewAI vs AutoGen
| 특성 | LangGraph | CrewAI | AutoGen |
| --------------------- | ----------------------- | --------------------- | ---------------------- |
| **아키텍처** | 그래프 기반 상태 머신 | 역할 기반 에이전트 팀 | 대화 기반 멀티에이전트 |
| **유연성** | 매우 높음 (저수준 제어) | 중간 (추상화된 API) | 높음 (커스텀 가능) |
| **학습 곡선** | 높음 | 낮음 | 중간 |
| **상태 관리** | 내장 (체크포인트 지원) | 기본적 | 대화 히스토리 기반 |
| **Human-in-the-Loop** | 네이티브 지원 | 기본 지원 | 네이티브 지원 |
| **스트리밍** | 네이티브 지원 | 제한적 | 이벤트 기반 |
| **프로덕션 준비도** | 높음 | 중간 | 높음 |
| **커뮤니티 크기** | 대형 | 중형 | 대형 |
| **라이선스** | MIT | MIT | MIT |
CrewAI 구현 예시
from crewai import Agent, Task, Crew, Process
에이전트 정의
researcher = Agent(
role="Senior Research Analyst",
goal="Find comprehensive and accurate information about the given topic",
backstory="""You are an expert researcher with decades of experience
in gathering and synthesizing information from multiple sources.""",
verbose=True,
allow_delegation=True,
tools=[search_tool, scrape_tool]
)
analyst = Agent(
role="Data Analyst",
goal="Analyze research findings and extract actionable insights",
backstory="""You are a skilled data analyst who excels at finding
patterns and drawing meaningful conclusions from data.""",
verbose=True,
tools=[analysis_tool, chart_tool]
)
writer = Agent(
role="Technical Writer",
goal="Create clear and comprehensive reports",
backstory="""You are an experienced technical writer who can transform
complex analyses into readable documents.""",
verbose=True,
tools=[write_tool]
)
태스크 정의
research_task = Task(
description="Research the latest trends in AI agent orchestration",
expected_output="A comprehensive summary of findings with sources",
agent=researcher
)
analysis_task = Task(
description="Analyze the research findings and identify key patterns",
expected_output="An analytical report with data-driven insights",
agent=analyst,
context=[research_task] # 이전 태스크 결과 참조
)
report_task = Task(
description="Write a final report combining research and analysis",
expected_output="A polished report ready for stakeholders",
agent=writer,
context=[research_task, analysis_task]
)
크루 구성 및 실행
crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, report_task],
process=Process.sequential, # 또는 Process.hierarchical
verbose=True
)
result = crew.kickoff()
print(result)
AutoGen 구현 예시
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
에이전트 설정
config_list = [{"model": "gpt-4o", "api_key": "YOUR_API_KEY"}]
researcher = AssistantAgent(
name="Researcher",
system_message="""You are a research specialist.
Find accurate and relevant information.
When your research is complete, say RESEARCH_DONE.""",
llm_config={"config_list": config_list}
)
analyst = AssistantAgent(
name="Analyst",
system_message="""You are a data analyst.
Analyze the research findings and provide insights.
When analysis is complete, say ANALYSIS_DONE.""",
llm_config={"config_list": config_list}
)
writer = AssistantAgent(
name="Writer",
system_message="""You are a technical writer.
Create clear documentation based on research and analysis.
When the report is complete, say TERMINATE.""",
llm_config={"config_list": config_list}
)
user_proxy = UserProxyAgent(
name="Admin",
human_input_mode="NEVER",
code_execution_config={"work_dir": "output"},
is_termination_msg=lambda x: "TERMINATE" in x.get("content", "")
)
그룹 채팅 설정
group_chat = GroupChat(
agents=[user_proxy, researcher, analyst, writer],
messages=[],
max_round=20,
speaker_selection_method="round_robin"
)
manager = GroupChatManager(
groupchat=group_chat,
llm_config={"config_list": config_list}
)
실행
user_proxy.initiate_chat(
manager,
message="Research AI agent orchestration patterns and write a report."
)
슈퍼바이저 패턴 심화
동적 라우팅 구현
슈퍼바이저가 작업을 분석하고, 최적의 에이전트에게 라우팅하는 고급 구현이다.
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from typing import Literal
class RouteDecision(BaseModel):
"""슈퍼바이저의 라우팅 결정"""
next_agent: Literal["researcher", "analyst", "writer", "FINISH"] = Field(
description="The next agent to route to"
)
reasoning: str = Field(
description="Why this agent was chosen"
)
task_description: str = Field(
description="Specific task for the chosen agent"
)
llm = ChatOpenAI(model="gpt-4o")
structured_llm = llm.with_structured_output(RouteDecision)
SUPERVISOR_PROMPT = """You are a supervisor managing a team of agents.
Based on the current state and conversation, decide:
1. Which agent should work next
2. What specific task they should perform
3. Whether the overall task is complete (FINISH)
Available agents:
- researcher: Searches for information and gathers data
- analyst: Analyzes data and provides insights
- writer: Creates reports and documentation
Current conversation:
{messages}
Task Board:
{task_board}
"""
def supervisor_node(state):
"""슈퍼바이저 노드: 동적 라우팅"""
decision = structured_llm.invoke(
SUPERVISOR_PROMPT.format(
messages=state["messages"],
task_board=state.get("task_board", "Empty")
)
)
return {
"next_agent": decision.next_agent,
"messages": state["messages"] + [
{"role": "system",
"content": f"Supervisor routed to {decision.next_agent}: {decision.task_description}"}
]
}
Human-in-the-Loop 통합
사람의 승인이 필요한 단계를 워크플로우에 삽입하는 패턴이다.
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
checkpointer = MemorySaver()
def human_approval_node(state):
"""사람의 승인을 대기하는 노드"""
이 노드에서 interrupt가 발생하면 실행이 중단됨
사람이 승인하면 resume으로 계속 진행
return {
"messages": state["messages"] + [
{"role": "system", "content": "Awaiting human approval..."}
],
"approval_status": "pending"
}
def check_approval(state) -> Literal["approved", "rejected"]:
"""승인 상태 확인"""
return state.get("approval_status", "pending")
그래프에 Human-in-the-Loop 추가
graph = StateGraph(SupervisorState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("researcher", researcher)
graph.add_node("human_review", human_approval_node)
graph.add_node("writer", writer)
graph.add_edge(START, "supervisor")
graph.add_edge("supervisor", "researcher")
graph.add_edge("researcher", "human_review")
graph.add_conditional_edges(
"human_review",
check_approval,
{"approved": "writer", "rejected": "supervisor"}
)
graph.add_edge("writer", END)
체크포인터로 상태 저장 및 복원
app = graph.compile(checkpointer=checkpointer, interrupt_before=["human_review"])
실행 후 중단 시점에서 재개
config = {"configurable": {"thread_id": "review-thread-1"}}
result = app.invoke(initial_state, config)
사람이 승인 후 재개
app.invoke(None, config) # resume with approval
MCP 프로토콜 통합
Model Context Protocol (MCP) 이란
MCP는 Anthropic이 발표한 에이전트 간 상호운용성 프로토콜로, 에이전트가 외부 도구와 데이터 소스에 표준화된 방식으로 접근할 수 있게 한다.
MCP 서버 구현 예시
from mcp import Server, Tool
server = Server("analytics-server")
@server.tool()
async def query_database(query: str) -> str:
"""데이터베이스에서 SQL 쿼리를 실행한다."""
실제 DB 연결 및 쿼리 실행
result = await db.execute(query)
return str(result)
@server.tool()
async def generate_chart(data: str, chart_type: str) -> str:
"""데이터를 기반으로 차트를 생성한다."""
차트 생성 로직
return f"Chart generated: {chart_type}"
@server.resource("schema://tables")
async def list_tables() -> str:
"""사용 가능한 데이터베이스 테이블 목록"""
tables = await db.get_tables()
return "\n".join(tables)
서버 실행
async def main():
async with server.run_stdio() as running:
await running.wait()
asyncio.run(main())
MCP 클라이언트와 멀티에이전트 연동
from mcp import ClientSession, StdioServerParameters
from langchain_mcp_adapters.tools import load_mcp_tools
from langgraph.prebuilt import create_react_agent
MCP 서버 연결 설정
server_params = StdioServerParameters(
command="python",
args=["analytics_server.py"]
)
async def create_mcp_agent():
"""MCP 도구를 사용하는 에이전트 생성"""
async with ClientSession(*server_params) as session:
await session.initialize()
MCP 도구를 LangChain 도구로 변환
tools = await load_mcp_tools(session)
에이전트 생성
agent = create_react_agent(
ChatOpenAI(model="gpt-4o"),
tools,
state_modifier="You are a data analyst with access to database tools."
)
return agent
실전 사례: 고객 지원 멀티에이전트 시스템
아키텍처 설계
고객 지원 시스템을 계층형 멀티에이전트로 구현하는 실전 예시이다.
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Literal
class CustomerSupportState(TypedDict):
messages: Annotated[list, operator.add]
customer_id: str
issue_category: str
sentiment: str
resolution: str
escalated: bool
트리아지 에이전트
def triage_agent(state: CustomerSupportState) -> CustomerSupportState:
"""고객 문의를 분류하고 적절한 전문 에이전트에게 라우팅"""
llm = ChatOpenAI(model="gpt-4o")
response = llm.invoke([
{"role": "system", "content": """Classify the customer issue into one of:
- billing: Payment, invoice, subscription issues
- technical: Product bugs, errors, configuration
- general: General inquiries, feedback
Also assess sentiment: positive, neutral, negative, urgent"""},
{"role": "user", "content": state["messages"][-1].content}
])
분류 결과 파싱
return {
"issue_category": "technical", # 파싱 결과
"sentiment": "negative"
}
기술 지원 에이전트
def technical_support_agent(state: CustomerSupportState) -> CustomerSupportState:
"""기술 문제를 진단하고 해결책을 제시"""
llm = ChatOpenAI(model="gpt-4o")
response = llm.invoke([
{"role": "system", "content": """You are a technical support specialist.
Diagnose the issue and provide step-by-step solutions.
If the issue requires engineering escalation, set escalated=true."""},
{"role": "user", "content": str(state["messages"])}
])
return {
"resolution": response.content,
"messages": [{"role": "assistant", "content": response.content}]
}
빌링 지원 에이전트
def billing_support_agent(state: CustomerSupportState) -> CustomerSupportState:
"""결제 관련 문제를 처리"""
llm = ChatOpenAI(model="gpt-4o")
response = llm.invoke([
{"role": "system", "content": """You are a billing specialist.
Handle payment issues, refunds, and subscription changes."""},
{"role": "user", "content": str(state["messages"])}
])
return {
"resolution": response.content,
"messages": [{"role": "assistant", "content": response.content}]
}
에스컬레이션 에이전트
def escalation_agent(state: CustomerSupportState) -> CustomerSupportState:
"""복잡한 문제를 상위 레벨로 에스컬레이션"""
return {
"escalated": True,
"messages": [
{"role": "system",
"content": f"Issue escalated for customer {state['customer_id']}"}
]
}
라우팅 함수
def route_issue(state: CustomerSupportState) -> Literal["technical", "billing", "general"]:
return state["issue_category"]
def check_escalation(state: CustomerSupportState) -> Literal["escalate", "resolve"]:
if state.get("escalated"):
return "escalate"
return "resolve"
그래프 구성
workflow = StateGraph(CustomerSupportState)
workflow.add_node("triage", triage_agent)
workflow.add_node("technical", technical_support_agent)
workflow.add_node("billing", billing_support_agent)
workflow.add_node("escalation", escalation_agent)
workflow.add_edge(START, "triage")
workflow.add_conditional_edges("triage", route_issue, {
"technical": "technical",
"billing": "billing",
"general": "billing" # 일반 문의도 billing이 처리
})
workflow.add_conditional_edges("technical", check_escalation, {
"escalate": "escalation",
"resolve": END
})
workflow.add_edge("billing", END)
workflow.add_edge("escalation", END)
app = workflow.compile()
실패 처리 전략
재시도 및 폴백 패턴
from tenacity import retry, stop_after_attempt, wait_exponential
from langgraph.graph import StateGraph
logger = logging.getLogger(__name__)
class AgentWithRetry:
"""재시도 로직이 포함된 에이전트 래퍼"""
def __init__(self, agent, max_retries=3, fallback_agent=None):
self.agent = agent
self.max_retries = max_retries
self.fallback_agent = fallback_agent
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30)
)
async def invoke_with_retry(self, state):
"""재시도 로직으로 에이전트 호출"""
try:
return await self.agent.ainvoke(state)
except Exception as e:
logger.error(f"Agent failed: {e}")
raise
async def invoke(self, state):
"""폴백 포함 에이전트 호출"""
try:
return await self.invoke_with_retry(state)
except Exception as e:
if self.fallback_agent:
logger.warning(f"Falling back to backup agent: {e}")
return await self.fallback_agent.ainvoke(state)
raise
서킷 브레이커 패턴
class CircuitBreaker:
"""서킷 브레이커 패턴"""
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = "closed" # closed, open, half-open
self.last_failure_time = None
def can_execute(self) -> bool:
if self.state == "closed":
return True
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half-open"
return True
return False
return True # half-open
def record_success(self):
self.failure_count = 0
self.state = "closed"
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
데드 레터 큐 패턴
from datetime import datetime
class DeadLetterQueue:
"""처리 실패한 메시지를 저장하는 데드 레터 큐"""
def __init__(self, storage_path="dead_letters.json"):
self.storage_path = storage_path
self.messages = []
def add(self, message: dict, error: str, agent_name: str):
"""실패한 메시지를 큐에 추가"""
entry = {
"timestamp": datetime.now().isoformat(),
"agent": agent_name,
"message": message,
"error": str(error),
"retry_count": 0
}
self.messages.append(entry)
self._persist()
def retry_all(self, agent_registry: dict):
"""큐의 모든 메시지 재시도"""
for entry in self.messages:
agent = agent_registry.get(entry["agent"])
if agent:
try:
agent.invoke(entry["message"])
self.messages.remove(entry)
except Exception as e:
entry["retry_count"] += 1
entry["last_error"] = str(e)
self._persist()
def _persist(self):
with open(self.storage_path, "w") as f:
json.dump(self.messages, f, indent=2)
관찰 가능성 (Observability)
LangSmith 통합
LangSmith 추적 활성화
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "multi-agent-orchestration"
커스텀 메트릭 수집
from langsmith import Client
client = Client()
def track_agent_metrics(agent_name: str, duration: float, tokens: int, success: bool):
"""에이전트 실행 메트릭 추적"""
client.create_run(
name=f"agent-{agent_name}",
run_type="chain",
inputs={"agent": agent_name},
outputs={
"duration_ms": duration * 1000,
"total_tokens": tokens,
"success": success
}
)
OpenTelemetry 통합
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
트레이서 설정
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("multi-agent-system")
def traced_agent_node(agent_name: str):
"""OpenTelemetry 트레이싱이 포함된 에이전트 노드"""
def node_fn(state):
with tracer.start_as_current_span(f"agent.{agent_name}") as span:
span.set_attribute("agent.name", agent_name)
span.set_attribute("agent.input_messages", len(state["messages"]))
try:
result = agent.invoke(state)
span.set_attribute("agent.success", True)
return result
except Exception as e:
span.set_attribute("agent.success", False)
span.record_exception(e)
raise
return node_fn
프로덕션 배포 체크리스트
설계 단계
- [ ] 에이전트별 역할과 도구가 명확히 정의되었는가
- [ ] 에이전트 간 통신 프로토콜이 표준화되었는가
- [ ] 상태 관리 전략이 수립되었는가 (로컬 vs 분산)
- [ ] 장애 시나리오별 대응 전략이 있는가
- [ ] Human-in-the-Loop 필요 지점이 식별되었는가
구현 단계
- [ ] 각 에이전트에 적절한 모델이 할당되었는가 (비용 vs 성능)
- [ ] 도구 실행에 타임아웃이 설정되었는가
- [ ] 재시도 로직과 서킷 브레이커가 구현되었는가
- [ ] 데드 레터 큐로 실패한 작업을 추적하는가
- [ ] 입출력 검증 (guard rails)이 적용되었는가
배포 단계
- [ ] 관찰 가능성 파이프라인이 구성되었는가 (LangSmith / OTEL)
- [ ] 에이전트별 비용 추적이 가능한가
- [ ] 레이트 리밋이 적용되었는가
- [ ] 보안 감사 로그가 활성화되었는가
- [ ] 롤백 전략이 수립되었는가
운영 단계
- [ ] 에이전트 성능 대시보드가 구축되었는가
- [ ] 이상 탐지 알림이 설정되었는가
- [ ] 프롬프트 버전 관리가 적용되었는가
- [ ] A/B 테스트 프레임워크가 준비되었는가
- [ ] 정기적인 프롬프트 최적화 프로세스가 있는가
패턴 선택 가이드
의사결정 플로우차트
작업 유형 판단
|
├─ 단순 작업 (도구 5개 이하) ─────> 단일 에이전트
|
├─ 순서가 정해진 다단계 작업 ─────> 파이프라인
|
├─ 동적 라우팅이 필요한 작업 ─────> 계층형 (슈퍼바이저)
|
└─ 자율적 협업이 필요한 복잡 작업 ─> 스웜
패턴별 장단점 종합
| 패턴 | 장점 | 단점 | 복잡도 | 적합한 규모 |
| ----------------- | ------------------------ | ---------------------------- | --------- | ----------- |
| **단일 에이전트** | 구현 간단, 디버깅 용이 | 확장성 한계, 컨텍스트 포화 | 낮음 | 소규모 |
| **계층형** | 중앙 제어, 동적 라우팅 | 슈퍼바이저 병목, 단일 장애점 | 중간 | 중규모 |
| **파이프라인** | 예측 가능, 테스트 용이 | 유연성 부족, 순차 실행 지연 | 낮음-중간 | 중규모 |
| **스웜** | 높은 유연성, 자율적 협업 | 디버깅 어려움, 예측 불가 | 높음 | 대규모 |
보안 고려사항
에이전트 격리
class SandboxedAgent:
"""격리된 환경에서 실행되는 에이전트"""
def __init__(self, agent, allowed_tools: list, max_tokens: int = 4096):
self.agent = agent
self.allowed_tools = set(allowed_tools)
self.max_tokens = max_tokens
def invoke(self, state):
도구 접근 권한 검증
requested_tools = self._extract_tool_calls(state)
unauthorized = requested_tools - self.allowed_tools
if unauthorized:
raise PermissionError(
f"Agent attempted to use unauthorized tools: {unauthorized}"
)
토큰 사용량 제한
if self._estimate_tokens(state) > self.max_tokens:
raise ResourceError("Token limit exceeded")
return self.agent.invoke(state)
def _extract_tool_calls(self, state) -> set:
상태에서 도구 호출 추출
return set()
def _estimate_tokens(self, state) -> int:
토큰 사용량 추정
return 0
프롬프트 인젝션 방어
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, validator
class SafeAgentOutput(BaseModel):
"""에이전트 출력 검증 스키마"""
response: str
confidence: float
sources: list[str]
@validator("response")
def validate_response(cls, v):
금지된 패턴 검출
forbidden_patterns = [
"ignore previous instructions",
"system prompt",
"bypass",
"jailbreak"
]
for pattern in forbidden_patterns:
if pattern.lower() in v.lower():
raise ValueError(f"Suspicious pattern detected: {pattern}")
return v
parser = PydanticOutputParser(pydantic_object=SafeAgentOutput)
성능 최적화
병렬 실행 전략
from langgraph.graph import StateGraph, START, END
class ParallelState(TypedDict):
messages: Annotated[list, operator.add]
research_result: str
analysis_result: str
async def parallel_execution(state):
"""독립적인 에이전트를 병렬로 실행"""
research_task = asyncio.create_task(
researcher.ainvoke({"messages": state["messages"]})
)
analysis_task = asyncio.create_task(
analyst.ainvoke({"messages": state["messages"]})
)
research_result, analysis_result = await asyncio.gather(
research_task, analysis_task
)
return {
"research_result": research_result["messages"][-1].content,
"analysis_result": analysis_result["messages"][-1].content
}
LangGraph의 fan-out 패턴
graph = StateGraph(ParallelState)
graph.add_node("research", researcher)
graph.add_node("analysis", analyst)
graph.add_node("synthesis", writer)
병렬 실행: START에서 두 노드로 동시 분기
graph.add_edge(START, "research")
graph.add_edge(START, "analysis")
두 결과가 모두 완료되면 synthesis로
graph.add_edge("research", "synthesis")
graph.add_edge("analysis", "synthesis")
graph.add_edge("synthesis", END)
캐싱 전략
from functools import lru_cache
class AgentCache:
"""에이전트 응답 캐싱"""
def __init__(self, ttl_seconds=3600):
self.cache = {}
self.ttl = ttl_seconds
def get_cache_key(self, state: dict) -> str:
"""상태에서 캐시 키 생성"""
state_str = json.dumps(state, sort_keys=True, default=str)
return hashlib.sha256(state_str.encode()).hexdigest()
def get(self, state: dict):
"""캐시에서 결과 조회"""
key = self.get_cache_key(state)
if key in self.cache:
entry = self.cache[key]
if time.time() - entry["timestamp"] < self.ttl:
return entry["result"]
del self.cache[key]
return None
def set(self, state: dict, result):
"""결과를 캐시에 저장"""
key = self.get_cache_key(state)
self.cache[key] = {
"result": result,
"timestamp": time.time()
}
Conclusion
멀티에이전트 오케스트레이션은 단순히 여러 에이전트를 연결하는 것이 아니라, **작업의 특성에 맞는 패턴을 선택**하고 **견고한 장애 처리와 관찰 가능성**을 갖추는 것이 핵심이다.
핵심 정리:
1. **단일 에이전트**로 시작하고, 복잡도가 증가할 때 멀티에이전트로 전환
2. **계층형 패턴**은 중앙 제어가 필요한 경우에 적합
3. **파이프라인 패턴**은 순서가 정해진 워크플로우에 최적
4. **스웜 패턴**은 높은 자율성이 필요한 복잡한 시나리오에 적합
5. 프레임워크는 **LangGraph**(유연성), **CrewAI**(빠른 프로토타이핑), **AutoGen**(대화 기반)을 용도에 맞게 선택
References
- [LangGraph 공식 문서](https://langchain-ai.github.io/langgraph/)
- [CrewAI 공식 문서](https://docs.crewai.com/)
- [AutoGen 공식 문서](https://microsoft.github.io/autogen/)
- [Anthropic MCP 스펙](https://modelcontextprotocol.io/)
- [Gartner: Agentic AI Will Reshape Enterprise Applications](https://www.gartner.com/en/articles/intelligent-agent-in-ai)
- [NIST AI Agent Standards Initiative](https://www.nist.gov/artificial-intelligence)
- [LangChain Multi-Agent Architectures](https://blog.langchain.dev/multi-agent-architectures/)
- [OpenAI Swarm Framework](https://github.com/openai/swarm)
현재 단락 (1/742)
2026년 에이전틱 AI가 기업 애플리케이션의 40%에 탑재될 전망이다(Gartner). 단일 범용 에이전트에서 도메인 특화 멀티에이전트 협업으로 패러다임이 전환되고 있다. NIS...