- Authors
- Name
- 왜 지금 멀티에이전트인가
- 멀티에이전트 시스템 개요
- 4가지 오케스트레이션 패턴
- 프레임워크 비교
- 슈퍼바이저 패턴 심화
- MCP 프로토콜 통합
- 실전 사례: 고객 지원 멀티에이전트 시스템
- 실패 처리 전략
- 관찰 가능성 (Observability)
- 프로덕션 배포 체크리스트
- 패턴 선택 가이드
- 보안 고려사항
- 성능 최적화
- 마무리
- 참고 자료

왜 지금 멀티에이전트인가
2026년 에이전틱 AI가 기업 애플리케이션의 40%에 탑재될 전망이다(Gartner). 단일 범용 에이전트에서 도메인 특화 멀티에이전트 협업으로 패러다임이 전환되고 있다. NIST AI Agent Standards Initiative 발표로 보안과 상호운용성 표준화가 본격화되었다.
이 글에서는 4가지 주요 멀티에이전트 오케스트레이션 패턴을 분석하고, LangGraph, CrewAI, AutoGen 프레임워크별 구현 코드를 제공한다.
멀티에이전트 시스템 개요
단일 에이전트의 한계
단일 에이전트는 다음과 같은 한계를 가진다.
| 한계 | 설명 |
|---|---|
| 컨텍스트 윈도우 포화 | 복잡한 작업일수록 프롬프트가 길어지고 성능 저하 |
| 도구 과부하 | 수십 개의 도구를 하나의 에이전트에 부여하면 선택 정확도 하락 |
| 단일 장애점 | 에이전트 하나가 실패하면 전체 워크플로우 중단 |
| 전문성 부족 | 범용 프롬프트로는 도메인별 최적 결과를 얻기 어려움 |
| 확장성 제약 | 작업량 증가 시 수평 확장 불가 |
멀티에이전트가 해결하는 문제
멀티에이전트 시스템은 분업과 협업을 통해 위 한계를 극복한다.
- 전문화: 각 에이전트가 특정 도메인에 특화
- 병렬 처리: 독립적인 작업을 동시에 수행
- 장애 격리: 하나의 에이전트 실패가 전체 시스템에 영향을 주지 않음
- 동적 구성: 작업에 따라 에이전트 조합을 유연하게 변경
4가지 오케스트레이션 패턴
패턴 1: 단일 에이전트 (Single Agent)
가장 기본적인 패턴으로, 하나의 에이전트가 모든 작업을 처리한다.
from langchain.agents import create_tool_calling_agent
from langchain_openai import ChatOpenAI
from langchain.tools import tool
@tool
def search_web(query: str) -> str:
"""웹에서 정보를 검색한다."""
# 검색 로직
return f"Search results for: {query}"
@tool
def calculate(expression: str) -> str:
"""수학 계산을 수행한다."""
return str(eval(expression))
@tool
def write_file(filename: str, content: str) -> str:
"""파일에 내용을 작성한다."""
with open(filename, "w") as f:
f.write(content)
return f"File {filename} written successfully"
llm = ChatOpenAI(model="gpt-4o")
tools = [search_web, calculate, write_file]
agent = create_tool_calling_agent(llm, tools, prompt_template)
적합한 상황: 작업이 단순하고 도구 수가 5개 이하인 경우
패턴 2: 계층형 멀티에이전트 (Hierarchical Multi-Agent)
슈퍼바이저 에이전트가 하위 에이전트들에게 작업을 분배하고 결과를 종합한다.
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Literal
import operator
class SupervisorState(TypedDict):
messages: Annotated[list, operator.add]
next_agent: str
final_answer: str
llm = ChatOpenAI(model="gpt-4o")
# 하위 에이전트 정의
researcher = create_react_agent(
llm,
tools=[search_web],
state_modifier="You are a research specialist. Find accurate information."
)
analyst = create_react_agent(
llm,
tools=[calculate],
state_modifier="You are a data analyst. Analyze data and provide insights."
)
writer = create_react_agent(
llm,
tools=[write_file],
state_modifier="You are a technical writer. Create clear documentation."
)
# 슈퍼바이저 라우팅 로직
def supervisor_router(state: SupervisorState) -> Literal["researcher", "analyst", "writer", "__end__"]:
"""슈퍼바이저가 다음 에이전트를 결정한다."""
last_message = state["messages"][-1]
response = llm.invoke([
{"role": "system", "content": """You are a supervisor managing a team.
Route to: researcher (for information), analyst (for data), writer (for documentation).
Return __end__ when the task is complete."""},
{"role": "user", "content": last_message.content}
])
return response.content.strip()
# 그래프 구성
graph = StateGraph(SupervisorState)
graph.add_node("supervisor", supervisor_router)
graph.add_node("researcher", researcher)
graph.add_node("analyst", analyst)
graph.add_node("writer", writer)
graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", supervisor_router)
graph.add_edge("researcher", "supervisor")
graph.add_edge("analyst", "supervisor")
graph.add_edge("writer", "supervisor")
app = graph.compile()
적합한 상황: 중앙 집중적 제어가 필요하고, 작업의 순서가 동적으로 결정되는 경우
패턴 3: 순차적 파이프라인 (Sequential Pipeline)
에이전트들이 정해진 순서대로 작업을 처리하고, 이전 에이전트의 출력이 다음 에이전트의 입력이 된다.
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator
class PipelineState(TypedDict):
messages: Annotated[list, operator.add]
research_output: str
analysis_output: str
report_output: str
def research_node(state: PipelineState) -> PipelineState:
"""1단계: 정보 수집"""
result = researcher.invoke({"messages": state["messages"]})
return {"research_output": result["messages"][-1].content}
def analysis_node(state: PipelineState) -> PipelineState:
"""2단계: 분석"""
analysis_prompt = f"Analyze this research: {state['research_output']}"
result = analyst.invoke({"messages": [{"role": "user", "content": analysis_prompt}]})
return {"analysis_output": result["messages"][-1].content}
def report_node(state: PipelineState) -> PipelineState:
"""3단계: 보고서 작성"""
report_prompt = f"""Write a report based on:
Research: {state['research_output']}
Analysis: {state['analysis_output']}"""
result = writer.invoke({"messages": [{"role": "user", "content": report_prompt}]})
return {"report_output": result["messages"][-1].content}
# 파이프라인 그래프
pipeline = StateGraph(PipelineState)
pipeline.add_node("research", research_node)
pipeline.add_node("analysis", analysis_node)
pipeline.add_node("report", report_node)
pipeline.add_edge(START, "research")
pipeline.add_edge("research", "analysis")
pipeline.add_edge("analysis", "report")
pipeline.add_edge("report", END)
app = pipeline.compile()
적합한 상황: 작업 순서가 명확하고, 각 단계의 출력이 다음 단계의 입력으로 사용되는 경우
패턴 4: 분산형 스웜 (Decentralized Swarm)
에이전트들이 자율적으로 협업하며, 중앙 조율자 없이 작업을 수행한다.
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated, Literal
import operator
class SwarmState(TypedDict):
messages: Annotated[list, operator.add]
current_agent: str
task_board: dict # 공유 태스크 보드
def agent_handoff(state: SwarmState, agent_name: str, target: str) -> SwarmState:
"""에이전트 간 핸드오프"""
return {
"current_agent": target,
"messages": state["messages"] + [
{"role": "system", "content": f"Handoff from {agent_name} to {target}"}
]
}
def triage_agent(state: SwarmState) -> Literal["researcher", "analyst", "writer"]:
"""트리아지 에이전트: 작업을 적절한 에이전트에게 라우팅"""
last_message = state["messages"][-1]
if "search" in last_message.content.lower():
return "researcher"
elif "analyze" in last_message.content.lower():
return "analyst"
else:
return "writer"
def researcher_with_handoff(state: SwarmState):
"""리서처가 작업 후 다음 에이전트에게 핸드오프"""
result = researcher.invoke({"messages": state["messages"]})
# 분석이 필요하면 analyst에게, 아니면 종료
return agent_handoff(state, "researcher", "analyst")
def analyst_with_handoff(state: SwarmState):
"""분석가가 작업 후 다음 에이전트에게 핸드오프"""
result = analyst.invoke({"messages": state["messages"]})
return agent_handoff(state, "analyst", "writer")
# 스웜 그래프
swarm = StateGraph(SwarmState)
swarm.add_node("triage", triage_agent)
swarm.add_node("researcher", researcher_with_handoff)
swarm.add_node("analyst", analyst_with_handoff)
swarm.add_node("writer", writer)
swarm.add_edge(START, "triage")
swarm.add_conditional_edges("triage", triage_agent)
swarm.add_edge("researcher", "analyst")
swarm.add_edge("analyst", "writer")
swarm.add_edge("writer", END)
app = swarm.compile()
적합한 상황: 에이전트가 자율적으로 판단해야 하고, 유연한 협업이 필요한 경우
프레임워크 비교
LangGraph vs CrewAI vs AutoGen
| 특성 | LangGraph | CrewAI | AutoGen |
|---|---|---|---|
| 아키텍처 | 그래프 기반 상태 머신 | 역할 기반 에이전트 팀 | 대화 기반 멀티에이전트 |
| 유연성 | 매우 높음 (저수준 제어) | 중간 (추상화된 API) | 높음 (커스텀 가능) |
| 학습 곡선 | 높음 | 낮음 | 중간 |
| 상태 관리 | 내장 (체크포인트 지원) | 기본적 | 대화 히스토리 기반 |
| Human-in-the-Loop | 네이티브 지원 | 기본 지원 | 네이티브 지원 |
| 스트리밍 | 네이티브 지원 | 제한적 | 이벤트 기반 |
| 프로덕션 준비도 | 높음 | 중간 | 높음 |
| 커뮤니티 크기 | 대형 | 중형 | 대형 |
| 라이선스 | MIT | MIT | MIT |
CrewAI 구현 예시
from crewai import Agent, Task, Crew, Process
# 에이전트 정의
researcher = Agent(
role="Senior Research Analyst",
goal="Find comprehensive and accurate information about the given topic",
backstory="""You are an expert researcher with decades of experience
in gathering and synthesizing information from multiple sources.""",
verbose=True,
allow_delegation=True,
tools=[search_tool, scrape_tool]
)
analyst = Agent(
role="Data Analyst",
goal="Analyze research findings and extract actionable insights",
backstory="""You are a skilled data analyst who excels at finding
patterns and drawing meaningful conclusions from data.""",
verbose=True,
tools=[analysis_tool, chart_tool]
)
writer = Agent(
role="Technical Writer",
goal="Create clear and comprehensive reports",
backstory="""You are an experienced technical writer who can transform
complex analyses into readable documents.""",
verbose=True,
tools=[write_tool]
)
# 태스크 정의
research_task = Task(
description="Research the latest trends in AI agent orchestration",
expected_output="A comprehensive summary of findings with sources",
agent=researcher
)
analysis_task = Task(
description="Analyze the research findings and identify key patterns",
expected_output="An analytical report with data-driven insights",
agent=analyst,
context=[research_task] # 이전 태스크 결과 참조
)
report_task = Task(
description="Write a final report combining research and analysis",
expected_output="A polished report ready for stakeholders",
agent=writer,
context=[research_task, analysis_task]
)
# 크루 구성 및 실행
crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, report_task],
process=Process.sequential, # 또는 Process.hierarchical
verbose=True
)
result = crew.kickoff()
print(result)
AutoGen 구현 예시
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
# 에이전트 설정
config_list = [{"model": "gpt-4o", "api_key": "YOUR_API_KEY"}]
researcher = AssistantAgent(
name="Researcher",
system_message="""You are a research specialist.
Find accurate and relevant information.
When your research is complete, say RESEARCH_DONE.""",
llm_config={"config_list": config_list}
)
analyst = AssistantAgent(
name="Analyst",
system_message="""You are a data analyst.
Analyze the research findings and provide insights.
When analysis is complete, say ANALYSIS_DONE.""",
llm_config={"config_list": config_list}
)
writer = AssistantAgent(
name="Writer",
system_message="""You are a technical writer.
Create clear documentation based on research and analysis.
When the report is complete, say TERMINATE.""",
llm_config={"config_list": config_list}
)
user_proxy = UserProxyAgent(
name="Admin",
human_input_mode="NEVER",
code_execution_config={"work_dir": "output"},
is_termination_msg=lambda x: "TERMINATE" in x.get("content", "")
)
# 그룹 채팅 설정
group_chat = GroupChat(
agents=[user_proxy, researcher, analyst, writer],
messages=[],
max_round=20,
speaker_selection_method="round_robin"
)
manager = GroupChatManager(
groupchat=group_chat,
llm_config={"config_list": config_list}
)
# 실행
user_proxy.initiate_chat(
manager,
message="Research AI agent orchestration patterns and write a report."
)
슈퍼바이저 패턴 심화
동적 라우팅 구현
슈퍼바이저가 작업을 분석하고, 최적의 에이전트에게 라우팅하는 고급 구현이다.
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from typing import Literal
class RouteDecision(BaseModel):
"""슈퍼바이저의 라우팅 결정"""
next_agent: Literal["researcher", "analyst", "writer", "FINISH"] = Field(
description="The next agent to route to"
)
reasoning: str = Field(
description="Why this agent was chosen"
)
task_description: str = Field(
description="Specific task for the chosen agent"
)
llm = ChatOpenAI(model="gpt-4o")
structured_llm = llm.with_structured_output(RouteDecision)
SUPERVISOR_PROMPT = """You are a supervisor managing a team of agents.
Based on the current state and conversation, decide:
1. Which agent should work next
2. What specific task they should perform
3. Whether the overall task is complete (FINISH)
Available agents:
- researcher: Searches for information and gathers data
- analyst: Analyzes data and provides insights
- writer: Creates reports and documentation
Current conversation:
{messages}
Task Board:
{task_board}
"""
def supervisor_node(state):
"""슈퍼바이저 노드: 동적 라우팅"""
decision = structured_llm.invoke(
SUPERVISOR_PROMPT.format(
messages=state["messages"],
task_board=state.get("task_board", "Empty")
)
)
return {
"next_agent": decision.next_agent,
"messages": state["messages"] + [
{"role": "system",
"content": f"Supervisor routed to {decision.next_agent}: {decision.task_description}"}
]
}
Human-in-the-Loop 통합
사람의 승인이 필요한 단계를 워크플로우에 삽입하는 패턴이다.
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
checkpointer = MemorySaver()
def human_approval_node(state):
"""사람의 승인을 대기하는 노드"""
# 이 노드에서 interrupt가 발생하면 실행이 중단됨
# 사람이 승인하면 resume으로 계속 진행
return {
"messages": state["messages"] + [
{"role": "system", "content": "Awaiting human approval..."}
],
"approval_status": "pending"
}
def check_approval(state) -> Literal["approved", "rejected"]:
"""승인 상태 확인"""
return state.get("approval_status", "pending")
# 그래프에 Human-in-the-Loop 추가
graph = StateGraph(SupervisorState)
graph.add_node("supervisor", supervisor_node)
graph.add_node("researcher", researcher)
graph.add_node("human_review", human_approval_node)
graph.add_node("writer", writer)
graph.add_edge(START, "supervisor")
graph.add_edge("supervisor", "researcher")
graph.add_edge("researcher", "human_review")
graph.add_conditional_edges(
"human_review",
check_approval,
{"approved": "writer", "rejected": "supervisor"}
)
graph.add_edge("writer", END)
# 체크포인터로 상태 저장 및 복원
app = graph.compile(checkpointer=checkpointer, interrupt_before=["human_review"])
# 실행 후 중단 시점에서 재개
config = {"configurable": {"thread_id": "review-thread-1"}}
result = app.invoke(initial_state, config)
# 사람이 승인 후 재개
app.invoke(None, config) # resume with approval
MCP 프로토콜 통합
Model Context Protocol (MCP) 이란
MCP는 Anthropic이 발표한 에이전트 간 상호운용성 프로토콜로, 에이전트가 외부 도구와 데이터 소스에 표준화된 방식으로 접근할 수 있게 한다.
# MCP 서버 구현 예시
from mcp import Server, Tool
import asyncio
server = Server("analytics-server")
@server.tool()
async def query_database(query: str) -> str:
"""데이터베이스에서 SQL 쿼리를 실행한다."""
# 실제 DB 연결 및 쿼리 실행
result = await db.execute(query)
return str(result)
@server.tool()
async def generate_chart(data: str, chart_type: str) -> str:
"""데이터를 기반으로 차트를 생성한다."""
# 차트 생성 로직
return f"Chart generated: {chart_type}"
@server.resource("schema://tables")
async def list_tables() -> str:
"""사용 가능한 데이터베이스 테이블 목록"""
tables = await db.get_tables()
return "\n".join(tables)
# 서버 실행
async def main():
async with server.run_stdio() as running:
await running.wait()
asyncio.run(main())
MCP 클라이언트와 멀티에이전트 연동
from mcp import ClientSession, StdioServerParameters
from langchain_mcp_adapters.tools import load_mcp_tools
from langgraph.prebuilt import create_react_agent
# MCP 서버 연결 설정
server_params = StdioServerParameters(
command="python",
args=["analytics_server.py"]
)
async def create_mcp_agent():
"""MCP 도구를 사용하는 에이전트 생성"""
async with ClientSession(*server_params) as session:
await session.initialize()
# MCP 도구를 LangChain 도구로 변환
tools = await load_mcp_tools(session)
# 에이전트 생성
agent = create_react_agent(
ChatOpenAI(model="gpt-4o"),
tools,
state_modifier="You are a data analyst with access to database tools."
)
return agent
실전 사례: 고객 지원 멀티에이전트 시스템
아키텍처 설계
고객 지원 시스템을 계층형 멀티에이전트로 구현하는 실전 예시이다.
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Literal
import operator
class CustomerSupportState(TypedDict):
messages: Annotated[list, operator.add]
customer_id: str
issue_category: str
sentiment: str
resolution: str
escalated: bool
# 트리아지 에이전트
def triage_agent(state: CustomerSupportState) -> CustomerSupportState:
"""고객 문의를 분류하고 적절한 전문 에이전트에게 라우팅"""
llm = ChatOpenAI(model="gpt-4o")
response = llm.invoke([
{"role": "system", "content": """Classify the customer issue into one of:
- billing: Payment, invoice, subscription issues
- technical: Product bugs, errors, configuration
- general: General inquiries, feedback
Also assess sentiment: positive, neutral, negative, urgent"""},
{"role": "user", "content": state["messages"][-1].content}
])
# 분류 결과 파싱
return {
"issue_category": "technical", # 파싱 결과
"sentiment": "negative"
}
# 기술 지원 에이전트
def technical_support_agent(state: CustomerSupportState) -> CustomerSupportState:
"""기술 문제를 진단하고 해결책을 제시"""
llm = ChatOpenAI(model="gpt-4o")
response = llm.invoke([
{"role": "system", "content": """You are a technical support specialist.
Diagnose the issue and provide step-by-step solutions.
If the issue requires engineering escalation, set escalated=true."""},
{"role": "user", "content": str(state["messages"])}
])
return {
"resolution": response.content,
"messages": [{"role": "assistant", "content": response.content}]
}
# 빌링 지원 에이전트
def billing_support_agent(state: CustomerSupportState) -> CustomerSupportState:
"""결제 관련 문제를 처리"""
llm = ChatOpenAI(model="gpt-4o")
response = llm.invoke([
{"role": "system", "content": """You are a billing specialist.
Handle payment issues, refunds, and subscription changes."""},
{"role": "user", "content": str(state["messages"])}
])
return {
"resolution": response.content,
"messages": [{"role": "assistant", "content": response.content}]
}
# 에스컬레이션 에이전트
def escalation_agent(state: CustomerSupportState) -> CustomerSupportState:
"""복잡한 문제를 상위 레벨로 에스컬레이션"""
return {
"escalated": True,
"messages": [
{"role": "system",
"content": f"Issue escalated for customer {state['customer_id']}"}
]
}
# 라우팅 함수
def route_issue(state: CustomerSupportState) -> Literal["technical", "billing", "general"]:
return state["issue_category"]
def check_escalation(state: CustomerSupportState) -> Literal["escalate", "resolve"]:
if state.get("escalated"):
return "escalate"
return "resolve"
# 그래프 구성
workflow = StateGraph(CustomerSupportState)
workflow.add_node("triage", triage_agent)
workflow.add_node("technical", technical_support_agent)
workflow.add_node("billing", billing_support_agent)
workflow.add_node("escalation", escalation_agent)
workflow.add_edge(START, "triage")
workflow.add_conditional_edges("triage", route_issue, {
"technical": "technical",
"billing": "billing",
"general": "billing" # 일반 문의도 billing이 처리
})
workflow.add_conditional_edges("technical", check_escalation, {
"escalate": "escalation",
"resolve": END
})
workflow.add_edge("billing", END)
workflow.add_edge("escalation", END)
app = workflow.compile()
실패 처리 전략
재시도 및 폴백 패턴
from tenacity import retry, stop_after_attempt, wait_exponential
from langgraph.graph import StateGraph
import logging
logger = logging.getLogger(__name__)
class AgentWithRetry:
"""재시도 로직이 포함된 에이전트 래퍼"""
def __init__(self, agent, max_retries=3, fallback_agent=None):
self.agent = agent
self.max_retries = max_retries
self.fallback_agent = fallback_agent
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30)
)
async def invoke_with_retry(self, state):
"""재시도 로직으로 에이전트 호출"""
try:
return await self.agent.ainvoke(state)
except Exception as e:
logger.error(f"Agent failed: {e}")
raise
async def invoke(self, state):
"""폴백 포함 에이전트 호출"""
try:
return await self.invoke_with_retry(state)
except Exception as e:
if self.fallback_agent:
logger.warning(f"Falling back to backup agent: {e}")
return await self.fallback_agent.ainvoke(state)
raise
# 서킷 브레이커 패턴
class CircuitBreaker:
"""서킷 브레이커 패턴"""
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = "closed" # closed, open, half-open
self.last_failure_time = None
def can_execute(self) -> bool:
if self.state == "closed":
return True
if self.state == "open":
import time
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half-open"
return True
return False
return True # half-open
def record_success(self):
self.failure_count = 0
self.state = "closed"
def record_failure(self):
import time
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
데드 레터 큐 패턴
import json
from datetime import datetime
class DeadLetterQueue:
"""처리 실패한 메시지를 저장하는 데드 레터 큐"""
def __init__(self, storage_path="dead_letters.json"):
self.storage_path = storage_path
self.messages = []
def add(self, message: dict, error: str, agent_name: str):
"""실패한 메시지를 큐에 추가"""
entry = {
"timestamp": datetime.now().isoformat(),
"agent": agent_name,
"message": message,
"error": str(error),
"retry_count": 0
}
self.messages.append(entry)
self._persist()
def retry_all(self, agent_registry: dict):
"""큐의 모든 메시지 재시도"""
for entry in self.messages:
agent = agent_registry.get(entry["agent"])
if agent:
try:
agent.invoke(entry["message"])
self.messages.remove(entry)
except Exception as e:
entry["retry_count"] += 1
entry["last_error"] = str(e)
self._persist()
def _persist(self):
with open(self.storage_path, "w") as f:
json.dump(self.messages, f, indent=2)
관찰 가능성 (Observability)
LangSmith 통합
import os
# LangSmith 추적 활성화
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "multi-agent-orchestration"
# 커스텀 메트릭 수집
from langsmith import Client
client = Client()
def track_agent_metrics(agent_name: str, duration: float, tokens: int, success: bool):
"""에이전트 실행 메트릭 추적"""
client.create_run(
name=f"agent-{agent_name}",
run_type="chain",
inputs={"agent": agent_name},
outputs={
"duration_ms": duration * 1000,
"total_tokens": tokens,
"success": success
}
)
OpenTelemetry 통합
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# 트레이서 설정
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("multi-agent-system")
def traced_agent_node(agent_name: str):
"""OpenTelemetry 트레이싱이 포함된 에이전트 노드"""
def node_fn(state):
with tracer.start_as_current_span(f"agent.{agent_name}") as span:
span.set_attribute("agent.name", agent_name)
span.set_attribute("agent.input_messages", len(state["messages"]))
try:
result = agent.invoke(state)
span.set_attribute("agent.success", True)
return result
except Exception as e:
span.set_attribute("agent.success", False)
span.record_exception(e)
raise
return node_fn
프로덕션 배포 체크리스트
설계 단계
- 에이전트별 역할과 도구가 명확히 정의되었는가
- 에이전트 간 통신 프로토콜이 표준화되었는가
- 상태 관리 전략이 수립되었는가 (로컬 vs 분산)
- 장애 시나리오별 대응 전략이 있는가
- Human-in-the-Loop 필요 지점이 식별되었는가
구현 단계
- 각 에이전트에 적절한 모델이 할당되었는가 (비용 vs 성능)
- 도구 실행에 타임아웃이 설정되었는가
- 재시도 로직과 서킷 브레이커가 구현되었는가
- 데드 레터 큐로 실패한 작업을 추적하는가
- 입출력 검증 (guard rails)이 적용되었는가
배포 단계
- 관찰 가능성 파이프라인이 구성되었는가 (LangSmith / OTEL)
- 에이전트별 비용 추적이 가능한가
- 레이트 리밋이 적용되었는가
- 보안 감사 로그가 활성화되었는가
- 롤백 전략이 수립되었는가
운영 단계
- 에이전트 성능 대시보드가 구축되었는가
- 이상 탐지 알림이 설정되었는가
- 프롬프트 버전 관리가 적용되었는가
- A/B 테스트 프레임워크가 준비되었는가
- 정기적인 프롬프트 최적화 프로세스가 있는가
패턴 선택 가이드
의사결정 플로우차트
작업 유형 판단
|
├─ 단순 작업 (도구 5개 이하) ─────> 단일 에이전트
|
├─ 순서가 정해진 다단계 작업 ─────> 파이프라인
|
├─ 동적 라우팅이 필요한 작업 ─────> 계층형 (슈퍼바이저)
|
└─ 자율적 협업이 필요한 복잡 작업 ─> 스웜
패턴별 장단점 종합
| 패턴 | 장점 | 단점 | 복잡도 | 적합한 규모 |
|---|---|---|---|---|
| 단일 에이전트 | 구현 간단, 디버깅 용이 | 확장성 한계, 컨텍스트 포화 | 낮음 | 소규모 |
| 계층형 | 중앙 제어, 동적 라우팅 | 슈퍼바이저 병목, 단일 장애점 | 중간 | 중규모 |
| 파이프라인 | 예측 가능, 테스트 용이 | 유연성 부족, 순차 실행 지연 | 낮음-중간 | 중규모 |
| 스웜 | 높은 유연성, 자율적 협업 | 디버깅 어려움, 예측 불가 | 높음 | 대규모 |
보안 고려사항
에이전트 격리
class SandboxedAgent:
"""격리된 환경에서 실행되는 에이전트"""
def __init__(self, agent, allowed_tools: list, max_tokens: int = 4096):
self.agent = agent
self.allowed_tools = set(allowed_tools)
self.max_tokens = max_tokens
def invoke(self, state):
# 도구 접근 권한 검증
requested_tools = self._extract_tool_calls(state)
unauthorized = requested_tools - self.allowed_tools
if unauthorized:
raise PermissionError(
f"Agent attempted to use unauthorized tools: {unauthorized}"
)
# 토큰 사용량 제한
if self._estimate_tokens(state) > self.max_tokens:
raise ResourceError("Token limit exceeded")
return self.agent.invoke(state)
def _extract_tool_calls(self, state) -> set:
# 상태에서 도구 호출 추출
return set()
def _estimate_tokens(self, state) -> int:
# 토큰 사용량 추정
return 0
프롬프트 인젝션 방어
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, validator
class SafeAgentOutput(BaseModel):
"""에이전트 출력 검증 스키마"""
response: str
confidence: float
sources: list[str]
@validator("response")
def validate_response(cls, v):
# 금지된 패턴 검출
forbidden_patterns = [
"ignore previous instructions",
"system prompt",
"bypass",
"jailbreak"
]
for pattern in forbidden_patterns:
if pattern.lower() in v.lower():
raise ValueError(f"Suspicious pattern detected: {pattern}")
return v
parser = PydanticOutputParser(pydantic_object=SafeAgentOutput)
성능 최적화
병렬 실행 전략
from langgraph.graph import StateGraph, START, END
import asyncio
class ParallelState(TypedDict):
messages: Annotated[list, operator.add]
research_result: str
analysis_result: str
async def parallel_execution(state):
"""독립적인 에이전트를 병렬로 실행"""
research_task = asyncio.create_task(
researcher.ainvoke({"messages": state["messages"]})
)
analysis_task = asyncio.create_task(
analyst.ainvoke({"messages": state["messages"]})
)
research_result, analysis_result = await asyncio.gather(
research_task, analysis_task
)
return {
"research_result": research_result["messages"][-1].content,
"analysis_result": analysis_result["messages"][-1].content
}
# LangGraph의 fan-out 패턴
graph = StateGraph(ParallelState)
graph.add_node("research", researcher)
graph.add_node("analysis", analyst)
graph.add_node("synthesis", writer)
# 병렬 실행: START에서 두 노드로 동시 분기
graph.add_edge(START, "research")
graph.add_edge(START, "analysis")
# 두 결과가 모두 완료되면 synthesis로
graph.add_edge("research", "synthesis")
graph.add_edge("analysis", "synthesis")
graph.add_edge("synthesis", END)
캐싱 전략
from functools import lru_cache
import hashlib
import json
class AgentCache:
"""에이전트 응답 캐싱"""
def __init__(self, ttl_seconds=3600):
self.cache = {}
self.ttl = ttl_seconds
def get_cache_key(self, state: dict) -> str:
"""상태에서 캐시 키 생성"""
state_str = json.dumps(state, sort_keys=True, default=str)
return hashlib.sha256(state_str.encode()).hexdigest()
def get(self, state: dict):
"""캐시에서 결과 조회"""
key = self.get_cache_key(state)
if key in self.cache:
entry = self.cache[key]
import time
if time.time() - entry["timestamp"] < self.ttl:
return entry["result"]
del self.cache[key]
return None
def set(self, state: dict, result):
"""결과를 캐시에 저장"""
import time
key = self.get_cache_key(state)
self.cache[key] = {
"result": result,
"timestamp": time.time()
}
마무리
멀티에이전트 오케스트레이션은 단순히 여러 에이전트를 연결하는 것이 아니라, 작업의 특성에 맞는 패턴을 선택하고 견고한 장애 처리와 관찰 가능성을 갖추는 것이 핵심이다.
핵심 정리:
- 단일 에이전트로 시작하고, 복잡도가 증가할 때 멀티에이전트로 전환
- 계층형 패턴은 중앙 제어가 필요한 경우에 적합
- 파이프라인 패턴은 순서가 정해진 워크플로우에 최적
- 스웜 패턴은 높은 자율성이 필요한 복잡한 시나리오에 적합
- 프레임워크는 LangGraph(유연성), CrewAI(빠른 프로토타이핑), AutoGen(대화 기반)을 용도에 맞게 선택