Skip to content

필사 모드: AI Agent Multi-Agent Orchestration Patterns: A Practical Guide to Hierarchical, Pipeline, and Swarm Architectures

English
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

왜 지금 멀티에이전트인가

2026년 에이전틱 AI가 기업 애플리케이션의 40%에 탑재될 전망이다(Gartner). 단일 범용 에이전트에서 도메인 특화 멀티에이전트 협업으로 패러다임이 전환되고 있다. NIST AI Agent Standards Initiative 발표로 보안과 상호운용성 표준화가 본격화되었다.

이 글에서는 **4가지 주요 멀티에이전트 오케스트레이션 패턴**을 분석하고, LangGraph, CrewAI, AutoGen 프레임워크별 구현 코드를 제공한다.

멀티에이전트 시스템 개요

단일 에이전트의 한계

단일 에이전트는 다음과 같은 한계를 가진다.

| 한계 | 설명 |

| ------------------------ | ------------------------------------------------------------ |

| **컨텍스트 윈도우 포화** | 복잡한 작업일수록 프롬프트가 길어지고 성능 저하 |

| **도구 과부하** | 수십 개의 도구를 하나의 에이전트에 부여하면 선택 정확도 하락 |

| **단일 장애점** | 에이전트 하나가 실패하면 전체 워크플로우 중단 |

| **전문성 부족** | 범용 프롬프트로는 도메인별 최적 결과를 얻기 어려움 |

| **확장성 제약** | 작업량 증가 시 수평 확장 불가 |

멀티에이전트가 해결하는 문제

멀티에이전트 시스템은 **분업과 협업**을 통해 위 한계를 극복한다.

- **전문화**: 각 에이전트가 특정 도메인에 특화

- **병렬 처리**: 독립적인 작업을 동시에 수행

- **장애 격리**: 하나의 에이전트 실패가 전체 시스템에 영향을 주지 않음

- **동적 구성**: 작업에 따라 에이전트 조합을 유연하게 변경

4가지 오케스트레이션 패턴

패턴 1: 단일 에이전트 (Single Agent)

가장 기본적인 패턴으로, 하나의 에이전트가 모든 작업을 처리한다.

from langchain.agents import create_tool_calling_agent

from langchain_openai import ChatOpenAI

from langchain.tools import tool

@tool

def search_web(query: str) -> str:

"""웹에서 정보를 검색한다."""

검색 로직

return f"Search results for: {query}"

@tool

def calculate(expression: str) -> str:

"""수학 계산을 수행한다."""

return str(eval(expression))

@tool

def write_file(filename: str, content: str) -> str:

"""파일에 내용을 작성한다."""

with open(filename, "w") as f:

f.write(content)

return f"File {filename} written successfully"

llm = ChatOpenAI(model="gpt-4o")

tools = [search_web, calculate, write_file]

agent = create_tool_calling_agent(llm, tools, prompt_template)

**적합한 상황**: 작업이 단순하고 도구 수가 5개 이하인 경우

패턴 2: 계층형 멀티에이전트 (Hierarchical Multi-Agent)

**슈퍼바이저 에이전트**가 하위 에이전트들에게 작업을 분배하고 결과를 종합한다.

from langgraph.graph import StateGraph, START, END

from langgraph.prebuilt import create_react_agent

from langchain_openai import ChatOpenAI

from typing import TypedDict, Annotated, Literal

class SupervisorState(TypedDict):

messages: Annotated[list, operator.add]

next_agent: str

final_answer: str

llm = ChatOpenAI(model="gpt-4o")

하위 에이전트 정의

researcher = create_react_agent(

llm,

tools=[search_web],

state_modifier="You are a research specialist. Find accurate information."

)

analyst = create_react_agent(

llm,

tools=[calculate],

state_modifier="You are a data analyst. Analyze data and provide insights."

)

writer = create_react_agent(

llm,

tools=[write_file],

state_modifier="You are a technical writer. Create clear documentation."

)

슈퍼바이저 라우팅 로직

def supervisor_router(state: SupervisorState) -> Literal["researcher", "analyst", "writer", "__end__"]:

"""슈퍼바이저가 다음 에이전트를 결정한다."""

last_message = state["messages"][-1]

response = llm.invoke([

{"role": "system", "content": """You are a supervisor managing a team.

Route to: researcher (for information), analyst (for data), writer (for documentation).

Return __end__ when the task is complete."""},

{"role": "user", "content": last_message.content}

])

return response.content.strip()

그래프 구성

graph = StateGraph(SupervisorState)

graph.add_node("supervisor", supervisor_router)

graph.add_node("researcher", researcher)

graph.add_node("analyst", analyst)

graph.add_node("writer", writer)

graph.add_edge(START, "supervisor")

graph.add_conditional_edges("supervisor", supervisor_router)

graph.add_edge("researcher", "supervisor")

graph.add_edge("analyst", "supervisor")

graph.add_edge("writer", "supervisor")

app = graph.compile()

**적합한 상황**: 중앙 집중적 제어가 필요하고, 작업의 순서가 동적으로 결정되는 경우

패턴 3: 순차적 파이프라인 (Sequential Pipeline)

에이전트들이 정해진 순서대로 작업을 처리하고, 이전 에이전트의 출력이 다음 에이전트의 입력이 된다.

from langgraph.graph import StateGraph, START, END

from typing import TypedDict, Annotated

class PipelineState(TypedDict):

messages: Annotated[list, operator.add]

research_output: str

analysis_output: str

report_output: str

def research_node(state: PipelineState) -> PipelineState:

"""1단계: 정보 수집"""

result = researcher.invoke({"messages": state["messages"]})

return {"research_output": result["messages"][-1].content}

def analysis_node(state: PipelineState) -> PipelineState:

"""2단계: 분석"""

analysis_prompt = f"Analyze this research: {state['research_output']}"

result = analyst.invoke({"messages": [{"role": "user", "content": analysis_prompt}]})

return {"analysis_output": result["messages"][-1].content}

def report_node(state: PipelineState) -> PipelineState:

"""3단계: 보고서 작성"""

report_prompt = f"""Write a report based on:

Research: {state['research_output']}

Analysis: {state['analysis_output']}"""

result = writer.invoke({"messages": [{"role": "user", "content": report_prompt}]})

return {"report_output": result["messages"][-1].content}

파이프라인 그래프

pipeline = StateGraph(PipelineState)

pipeline.add_node("research", research_node)

pipeline.add_node("analysis", analysis_node)

pipeline.add_node("report", report_node)

pipeline.add_edge(START, "research")

pipeline.add_edge("research", "analysis")

pipeline.add_edge("analysis", "report")

pipeline.add_edge("report", END)

app = pipeline.compile()

**적합한 상황**: 작업 순서가 명확하고, 각 단계의 출력이 다음 단계의 입력으로 사용되는 경우

패턴 4: 분산형 스웜 (Decentralized Swarm)

에이전트들이 자율적으로 협업하며, 중앙 조율자 없이 작업을 수행한다.

from langgraph.graph import StateGraph, START, END

from typing import TypedDict, Annotated, Literal

class SwarmState(TypedDict):

messages: Annotated[list, operator.add]

current_agent: str

task_board: dict # 공유 태스크 보드

def agent_handoff(state: SwarmState, agent_name: str, target: str) -> SwarmState:

"""에이전트 간 핸드오프"""

return {

"current_agent": target,

"messages": state["messages"] + [

{"role": "system", "content": f"Handoff from {agent_name} to {target}"}

]

}

def triage_agent(state: SwarmState) -> Literal["researcher", "analyst", "writer"]:

"""트리아지 에이전트: 작업을 적절한 에이전트에게 라우팅"""

last_message = state["messages"][-1]

if "search" in last_message.content.lower():

return "researcher"

elif "analyze" in last_message.content.lower():

return "analyst"

else:

return "writer"

def researcher_with_handoff(state: SwarmState):

"""리서처가 작업 후 다음 에이전트에게 핸드오프"""

result = researcher.invoke({"messages": state["messages"]})

분석이 필요하면 analyst에게, 아니면 종료

return agent_handoff(state, "researcher", "analyst")

def analyst_with_handoff(state: SwarmState):

"""분석가가 작업 후 다음 에이전트에게 핸드오프"""

result = analyst.invoke({"messages": state["messages"]})

return agent_handoff(state, "analyst", "writer")

스웜 그래프

swarm = StateGraph(SwarmState)

swarm.add_node("triage", triage_agent)

swarm.add_node("researcher", researcher_with_handoff)

swarm.add_node("analyst", analyst_with_handoff)

swarm.add_node("writer", writer)

swarm.add_edge(START, "triage")

swarm.add_conditional_edges("triage", triage_agent)

swarm.add_edge("researcher", "analyst")

swarm.add_edge("analyst", "writer")

swarm.add_edge("writer", END)

app = swarm.compile()

**적합한 상황**: 에이전트가 자율적으로 판단해야 하고, 유연한 협업이 필요한 경우

프레임워크 비교

LangGraph vs CrewAI vs AutoGen

| 특성 | LangGraph | CrewAI | AutoGen |

| --------------------- | ----------------------- | --------------------- | ---------------------- |

| **아키텍처** | 그래프 기반 상태 머신 | 역할 기반 에이전트 팀 | 대화 기반 멀티에이전트 |

| **유연성** | 매우 높음 (저수준 제어) | 중간 (추상화된 API) | 높음 (커스텀 가능) |

| **학습 곡선** | 높음 | 낮음 | 중간 |

| **상태 관리** | 내장 (체크포인트 지원) | 기본적 | 대화 히스토리 기반 |

| **Human-in-the-Loop** | 네이티브 지원 | 기본 지원 | 네이티브 지원 |

| **스트리밍** | 네이티브 지원 | 제한적 | 이벤트 기반 |

| **프로덕션 준비도** | 높음 | 중간 | 높음 |

| **커뮤니티 크기** | 대형 | 중형 | 대형 |

| **라이선스** | MIT | MIT | MIT |

CrewAI 구현 예시

from crewai import Agent, Task, Crew, Process

에이전트 정의

researcher = Agent(

role="Senior Research Analyst",

goal="Find comprehensive and accurate information about the given topic",

backstory="""You are an expert researcher with decades of experience

in gathering and synthesizing information from multiple sources.""",

verbose=True,

allow_delegation=True,

tools=[search_tool, scrape_tool]

)

analyst = Agent(

role="Data Analyst",

goal="Analyze research findings and extract actionable insights",

backstory="""You are a skilled data analyst who excels at finding

patterns and drawing meaningful conclusions from data.""",

verbose=True,

tools=[analysis_tool, chart_tool]

)

writer = Agent(

role="Technical Writer",

goal="Create clear and comprehensive reports",

backstory="""You are an experienced technical writer who can transform

complex analyses into readable documents.""",

verbose=True,

tools=[write_tool]

)

태스크 정의

research_task = Task(

description="Research the latest trends in AI agent orchestration",

expected_output="A comprehensive summary of findings with sources",

agent=researcher

)

analysis_task = Task(

description="Analyze the research findings and identify key patterns",

expected_output="An analytical report with data-driven insights",

agent=analyst,

context=[research_task] # 이전 태스크 결과 참조

)

report_task = Task(

description="Write a final report combining research and analysis",

expected_output="A polished report ready for stakeholders",

agent=writer,

context=[research_task, analysis_task]

)

크루 구성 및 실행

crew = Crew(

agents=[researcher, analyst, writer],

tasks=[research_task, analysis_task, report_task],

process=Process.sequential, # 또는 Process.hierarchical

verbose=True

)

result = crew.kickoff()

print(result)

AutoGen 구현 예시

from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager

에이전트 설정

config_list = [{"model": "gpt-4o", "api_key": "YOUR_API_KEY"}]

researcher = AssistantAgent(

name="Researcher",

system_message="""You are a research specialist.

Find accurate and relevant information.

When your research is complete, say RESEARCH_DONE.""",

llm_config={"config_list": config_list}

)

analyst = AssistantAgent(

name="Analyst",

system_message="""You are a data analyst.

Analyze the research findings and provide insights.

When analysis is complete, say ANALYSIS_DONE.""",

llm_config={"config_list": config_list}

)

writer = AssistantAgent(

name="Writer",

system_message="""You are a technical writer.

Create clear documentation based on research and analysis.

When the report is complete, say TERMINATE.""",

llm_config={"config_list": config_list}

)

user_proxy = UserProxyAgent(

name="Admin",

human_input_mode="NEVER",

code_execution_config={"work_dir": "output"},

is_termination_msg=lambda x: "TERMINATE" in x.get("content", "")

)

그룹 채팅 설정

group_chat = GroupChat(

agents=[user_proxy, researcher, analyst, writer],

messages=[],

max_round=20,

speaker_selection_method="round_robin"

)

manager = GroupChatManager(

groupchat=group_chat,

llm_config={"config_list": config_list}

)

실행

user_proxy.initiate_chat(

manager,

message="Research AI agent orchestration patterns and write a report."

)

슈퍼바이저 패턴 심화

동적 라우팅 구현

슈퍼바이저가 작업을 분석하고, 최적의 에이전트에게 라우팅하는 고급 구현이다.

from langgraph.graph import StateGraph, START, END

from langchain_openai import ChatOpenAI

from pydantic import BaseModel, Field

from typing import Literal

class RouteDecision(BaseModel):

"""슈퍼바이저의 라우팅 결정"""

next_agent: Literal["researcher", "analyst", "writer", "FINISH"] = Field(

description="The next agent to route to"

)

reasoning: str = Field(

description="Why this agent was chosen"

)

task_description: str = Field(

description="Specific task for the chosen agent"

)

llm = ChatOpenAI(model="gpt-4o")

structured_llm = llm.with_structured_output(RouteDecision)

SUPERVISOR_PROMPT = """You are a supervisor managing a team of agents.

Based on the current state and conversation, decide:

1. Which agent should work next

2. What specific task they should perform

3. Whether the overall task is complete (FINISH)

Available agents:

- researcher: Searches for information and gathers data

- analyst: Analyzes data and provides insights

- writer: Creates reports and documentation

Current conversation:

{messages}

Task Board:

{task_board}

"""

def supervisor_node(state):

"""슈퍼바이저 노드: 동적 라우팅"""

decision = structured_llm.invoke(

SUPERVISOR_PROMPT.format(

messages=state["messages"],

task_board=state.get("task_board", "Empty")

)

)

return {

"next_agent": decision.next_agent,

"messages": state["messages"] + [

{"role": "system",

"content": f"Supervisor routed to {decision.next_agent}: {decision.task_description}"}

]

}

Human-in-the-Loop 통합

사람의 승인이 필요한 단계를 워크플로우에 삽입하는 패턴이다.

from langgraph.checkpoint.memory import MemorySaver

from langgraph.graph import StateGraph, START, END

checkpointer = MemorySaver()

def human_approval_node(state):

"""사람의 승인을 대기하는 노드"""

이 노드에서 interrupt가 발생하면 실행이 중단됨

사람이 승인하면 resume으로 계속 진행

return {

"messages": state["messages"] + [

{"role": "system", "content": "Awaiting human approval..."}

],

"approval_status": "pending"

}

def check_approval(state) -> Literal["approved", "rejected"]:

"""승인 상태 확인"""

return state.get("approval_status", "pending")

그래프에 Human-in-the-Loop 추가

graph = StateGraph(SupervisorState)

graph.add_node("supervisor", supervisor_node)

graph.add_node("researcher", researcher)

graph.add_node("human_review", human_approval_node)

graph.add_node("writer", writer)

graph.add_edge(START, "supervisor")

graph.add_edge("supervisor", "researcher")

graph.add_edge("researcher", "human_review")

graph.add_conditional_edges(

"human_review",

check_approval,

{"approved": "writer", "rejected": "supervisor"}

)

graph.add_edge("writer", END)

체크포인터로 상태 저장 및 복원

app = graph.compile(checkpointer=checkpointer, interrupt_before=["human_review"])

실행 후 중단 시점에서 재개

config = {"configurable": {"thread_id": "review-thread-1"}}

result = app.invoke(initial_state, config)

사람이 승인 후 재개

app.invoke(None, config) # resume with approval

MCP 프로토콜 통합

Model Context Protocol (MCP) 이란

MCP는 Anthropic이 발표한 에이전트 간 상호운용성 프로토콜로, 에이전트가 외부 도구와 데이터 소스에 표준화된 방식으로 접근할 수 있게 한다.

MCP 서버 구현 예시

from mcp import Server, Tool

server = Server("analytics-server")

@server.tool()

async def query_database(query: str) -> str:

"""데이터베이스에서 SQL 쿼리를 실행한다."""

실제 DB 연결 및 쿼리 실행

result = await db.execute(query)

return str(result)

@server.tool()

async def generate_chart(data: str, chart_type: str) -> str:

"""데이터를 기반으로 차트를 생성한다."""

차트 생성 로직

return f"Chart generated: {chart_type}"

@server.resource("schema://tables")

async def list_tables() -> str:

"""사용 가능한 데이터베이스 테이블 목록"""

tables = await db.get_tables()

return "\n".join(tables)

서버 실행

async def main():

async with server.run_stdio() as running:

await running.wait()

asyncio.run(main())

MCP 클라이언트와 멀티에이전트 연동

from mcp import ClientSession, StdioServerParameters

from langchain_mcp_adapters.tools import load_mcp_tools

from langgraph.prebuilt import create_react_agent

MCP 서버 연결 설정

server_params = StdioServerParameters(

command="python",

args=["analytics_server.py"]

)

async def create_mcp_agent():

"""MCP 도구를 사용하는 에이전트 생성"""

async with ClientSession(*server_params) as session:

await session.initialize()

MCP 도구를 LangChain 도구로 변환

tools = await load_mcp_tools(session)

에이전트 생성

agent = create_react_agent(

ChatOpenAI(model="gpt-4o"),

tools,

state_modifier="You are a data analyst with access to database tools."

)

return agent

실전 사례: 고객 지원 멀티에이전트 시스템

아키텍처 설계

고객 지원 시스템을 계층형 멀티에이전트로 구현하는 실전 예시이다.

from langgraph.graph import StateGraph, START, END

from langchain_openai import ChatOpenAI

from typing import TypedDict, Annotated, Literal

class CustomerSupportState(TypedDict):

messages: Annotated[list, operator.add]

customer_id: str

issue_category: str

sentiment: str

resolution: str

escalated: bool

트리아지 에이전트

def triage_agent(state: CustomerSupportState) -> CustomerSupportState:

"""고객 문의를 분류하고 적절한 전문 에이전트에게 라우팅"""

llm = ChatOpenAI(model="gpt-4o")

response = llm.invoke([

{"role": "system", "content": """Classify the customer issue into one of:

- billing: Payment, invoice, subscription issues

- technical: Product bugs, errors, configuration

- general: General inquiries, feedback

Also assess sentiment: positive, neutral, negative, urgent"""},

{"role": "user", "content": state["messages"][-1].content}

])

분류 결과 파싱

return {

"issue_category": "technical", # 파싱 결과

"sentiment": "negative"

}

기술 지원 에이전트

def technical_support_agent(state: CustomerSupportState) -> CustomerSupportState:

"""기술 문제를 진단하고 해결책을 제시"""

llm = ChatOpenAI(model="gpt-4o")

response = llm.invoke([

{"role": "system", "content": """You are a technical support specialist.

Diagnose the issue and provide step-by-step solutions.

If the issue requires engineering escalation, set escalated=true."""},

{"role": "user", "content": str(state["messages"])}

])

return {

"resolution": response.content,

"messages": [{"role": "assistant", "content": response.content}]

}

빌링 지원 에이전트

def billing_support_agent(state: CustomerSupportState) -> CustomerSupportState:

"""결제 관련 문제를 처리"""

llm = ChatOpenAI(model="gpt-4o")

response = llm.invoke([

{"role": "system", "content": """You are a billing specialist.

Handle payment issues, refunds, and subscription changes."""},

{"role": "user", "content": str(state["messages"])}

])

return {

"resolution": response.content,

"messages": [{"role": "assistant", "content": response.content}]

}

에스컬레이션 에이전트

def escalation_agent(state: CustomerSupportState) -> CustomerSupportState:

"""복잡한 문제를 상위 레벨로 에스컬레이션"""

return {

"escalated": True,

"messages": [

{"role": "system",

"content": f"Issue escalated for customer {state['customer_id']}"}

]

}

라우팅 함수

def route_issue(state: CustomerSupportState) -> Literal["technical", "billing", "general"]:

return state["issue_category"]

def check_escalation(state: CustomerSupportState) -> Literal["escalate", "resolve"]:

if state.get("escalated"):

return "escalate"

return "resolve"

그래프 구성

workflow = StateGraph(CustomerSupportState)

workflow.add_node("triage", triage_agent)

workflow.add_node("technical", technical_support_agent)

workflow.add_node("billing", billing_support_agent)

workflow.add_node("escalation", escalation_agent)

workflow.add_edge(START, "triage")

workflow.add_conditional_edges("triage", route_issue, {

"technical": "technical",

"billing": "billing",

"general": "billing" # 일반 문의도 billing이 처리

})

workflow.add_conditional_edges("technical", check_escalation, {

"escalate": "escalation",

"resolve": END

})

workflow.add_edge("billing", END)

workflow.add_edge("escalation", END)

app = workflow.compile()

실패 처리 전략

재시도 및 폴백 패턴

from tenacity import retry, stop_after_attempt, wait_exponential

from langgraph.graph import StateGraph

logger = logging.getLogger(__name__)

class AgentWithRetry:

"""재시도 로직이 포함된 에이전트 래퍼"""

def __init__(self, agent, max_retries=3, fallback_agent=None):

self.agent = agent

self.max_retries = max_retries

self.fallback_agent = fallback_agent

@retry(

stop=stop_after_attempt(3),

wait=wait_exponential(multiplier=1, min=2, max=30)

)

async def invoke_with_retry(self, state):

"""재시도 로직으로 에이전트 호출"""

try:

return await self.agent.ainvoke(state)

except Exception as e:

logger.error(f"Agent failed: {e}")

raise

async def invoke(self, state):

"""폴백 포함 에이전트 호출"""

try:

return await self.invoke_with_retry(state)

except Exception as e:

if self.fallback_agent:

logger.warning(f"Falling back to backup agent: {e}")

return await self.fallback_agent.ainvoke(state)

raise

서킷 브레이커 패턴

class CircuitBreaker:

"""서킷 브레이커 패턴"""

def __init__(self, failure_threshold=5, recovery_timeout=60):

self.failure_count = 0

self.failure_threshold = failure_threshold

self.recovery_timeout = recovery_timeout

self.state = "closed" # closed, open, half-open

self.last_failure_time = None

def can_execute(self) -> bool:

if self.state == "closed":

return True

if self.state == "open":

if time.time() - self.last_failure_time > self.recovery_timeout:

self.state = "half-open"

return True

return False

return True # half-open

def record_success(self):

self.failure_count = 0

self.state = "closed"

def record_failure(self):

self.failure_count += 1

self.last_failure_time = time.time()

if self.failure_count >= self.failure_threshold:

self.state = "open"

데드 레터 큐 패턴

from datetime import datetime

class DeadLetterQueue:

"""처리 실패한 메시지를 저장하는 데드 레터 큐"""

def __init__(self, storage_path="dead_letters.json"):

self.storage_path = storage_path

self.messages = []

def add(self, message: dict, error: str, agent_name: str):

"""실패한 메시지를 큐에 추가"""

entry = {

"timestamp": datetime.now().isoformat(),

"agent": agent_name,

"message": message,

"error": str(error),

"retry_count": 0

}

self.messages.append(entry)

self._persist()

def retry_all(self, agent_registry: dict):

"""큐의 모든 메시지 재시도"""

for entry in self.messages:

agent = agent_registry.get(entry["agent"])

if agent:

try:

agent.invoke(entry["message"])

self.messages.remove(entry)

except Exception as e:

entry["retry_count"] += 1

entry["last_error"] = str(e)

self._persist()

def _persist(self):

with open(self.storage_path, "w") as f:

json.dump(self.messages, f, indent=2)

관찰 가능성 (Observability)

LangSmith 통합

LangSmith 추적 활성화

os.environ["LANGCHAIN_TRACING_V2"] = "true"

os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"

os.environ["LANGCHAIN_PROJECT"] = "multi-agent-orchestration"

커스텀 메트릭 수집

from langsmith import Client

client = Client()

def track_agent_metrics(agent_name: str, duration: float, tokens: int, success: bool):

"""에이전트 실행 메트릭 추적"""

client.create_run(

name=f"agent-{agent_name}",

run_type="chain",

inputs={"agent": agent_name},

outputs={

"duration_ms": duration * 1000,

"total_tokens": tokens,

"success": success

}

)

OpenTelemetry 통합

from opentelemetry import trace

from opentelemetry.sdk.trace import TracerProvider

from opentelemetry.sdk.trace.export import BatchSpanProcessor

from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

트레이서 설정

provider = TracerProvider()

processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))

provider.add_span_processor(processor)

trace.set_tracer_provider(provider)

tracer = trace.get_tracer("multi-agent-system")

def traced_agent_node(agent_name: str):

"""OpenTelemetry 트레이싱이 포함된 에이전트 노드"""

def node_fn(state):

with tracer.start_as_current_span(f"agent.{agent_name}") as span:

span.set_attribute("agent.name", agent_name)

span.set_attribute("agent.input_messages", len(state["messages"]))

try:

result = agent.invoke(state)

span.set_attribute("agent.success", True)

return result

except Exception as e:

span.set_attribute("agent.success", False)

span.record_exception(e)

raise

return node_fn

프로덕션 배포 체크리스트

설계 단계

- [ ] 에이전트별 역할과 도구가 명확히 정의되었는가

- [ ] 에이전트 간 통신 프로토콜이 표준화되었는가

- [ ] 상태 관리 전략이 수립되었는가 (로컬 vs 분산)

- [ ] 장애 시나리오별 대응 전략이 있는가

- [ ] Human-in-the-Loop 필요 지점이 식별되었는가

구현 단계

- [ ] 각 에이전트에 적절한 모델이 할당되었는가 (비용 vs 성능)

- [ ] 도구 실행에 타임아웃이 설정되었는가

- [ ] 재시도 로직과 서킷 브레이커가 구현되었는가

- [ ] 데드 레터 큐로 실패한 작업을 추적하는가

- [ ] 입출력 검증 (guard rails)이 적용되었는가

배포 단계

- [ ] 관찰 가능성 파이프라인이 구성되었는가 (LangSmith / OTEL)

- [ ] 에이전트별 비용 추적이 가능한가

- [ ] 레이트 리밋이 적용되었는가

- [ ] 보안 감사 로그가 활성화되었는가

- [ ] 롤백 전략이 수립되었는가

운영 단계

- [ ] 에이전트 성능 대시보드가 구축되었는가

- [ ] 이상 탐지 알림이 설정되었는가

- [ ] 프롬프트 버전 관리가 적용되었는가

- [ ] A/B 테스트 프레임워크가 준비되었는가

- [ ] 정기적인 프롬프트 최적화 프로세스가 있는가

패턴 선택 가이드

의사결정 플로우차트

작업 유형 판단

|

├─ 단순 작업 (도구 5개 이하) ─────> 단일 에이전트

|

├─ 순서가 정해진 다단계 작업 ─────> 파이프라인

|

├─ 동적 라우팅이 필요한 작업 ─────> 계층형 (슈퍼바이저)

|

└─ 자율적 협업이 필요한 복잡 작업 ─> 스웜

패턴별 장단점 종합

| 패턴 | 장점 | 단점 | 복잡도 | 적합한 규모 |

| ----------------- | ------------------------ | ---------------------------- | --------- | ----------- |

| **단일 에이전트** | 구현 간단, 디버깅 용이 | 확장성 한계, 컨텍스트 포화 | 낮음 | 소규모 |

| **계층형** | 중앙 제어, 동적 라우팅 | 슈퍼바이저 병목, 단일 장애점 | 중간 | 중규모 |

| **파이프라인** | 예측 가능, 테스트 용이 | 유연성 부족, 순차 실행 지연 | 낮음-중간 | 중규모 |

| **스웜** | 높은 유연성, 자율적 협업 | 디버깅 어려움, 예측 불가 | 높음 | 대규모 |

보안 고려사항

에이전트 격리

class SandboxedAgent:

"""격리된 환경에서 실행되는 에이전트"""

def __init__(self, agent, allowed_tools: list, max_tokens: int = 4096):

self.agent = agent

self.allowed_tools = set(allowed_tools)

self.max_tokens = max_tokens

def invoke(self, state):

도구 접근 권한 검증

requested_tools = self._extract_tool_calls(state)

unauthorized = requested_tools - self.allowed_tools

if unauthorized:

raise PermissionError(

f"Agent attempted to use unauthorized tools: {unauthorized}"

)

토큰 사용량 제한

if self._estimate_tokens(state) > self.max_tokens:

raise ResourceError("Token limit exceeded")

return self.agent.invoke(state)

def _extract_tool_calls(self, state) -> set:

상태에서 도구 호출 추출

return set()

def _estimate_tokens(self, state) -> int:

토큰 사용량 추정

return 0

프롬프트 인젝션 방어

from langchain.output_parsers import PydanticOutputParser

from pydantic import BaseModel, validator

class SafeAgentOutput(BaseModel):

"""에이전트 출력 검증 스키마"""

response: str

confidence: float

sources: list[str]

@validator("response")

def validate_response(cls, v):

금지된 패턴 검출

forbidden_patterns = [

"ignore previous instructions",

"system prompt",

"bypass",

"jailbreak"

]

for pattern in forbidden_patterns:

if pattern.lower() in v.lower():

raise ValueError(f"Suspicious pattern detected: {pattern}")

return v

parser = PydanticOutputParser(pydantic_object=SafeAgentOutput)

성능 최적화

병렬 실행 전략

from langgraph.graph import StateGraph, START, END

class ParallelState(TypedDict):

messages: Annotated[list, operator.add]

research_result: str

analysis_result: str

async def parallel_execution(state):

"""독립적인 에이전트를 병렬로 실행"""

research_task = asyncio.create_task(

researcher.ainvoke({"messages": state["messages"]})

)

analysis_task = asyncio.create_task(

analyst.ainvoke({"messages": state["messages"]})

)

research_result, analysis_result = await asyncio.gather(

research_task, analysis_task

)

return {

"research_result": research_result["messages"][-1].content,

"analysis_result": analysis_result["messages"][-1].content

}

LangGraph의 fan-out 패턴

graph = StateGraph(ParallelState)

graph.add_node("research", researcher)

graph.add_node("analysis", analyst)

graph.add_node("synthesis", writer)

병렬 실행: START에서 두 노드로 동시 분기

graph.add_edge(START, "research")

graph.add_edge(START, "analysis")

두 결과가 모두 완료되면 synthesis로

graph.add_edge("research", "synthesis")

graph.add_edge("analysis", "synthesis")

graph.add_edge("synthesis", END)

캐싱 전략

from functools import lru_cache

class AgentCache:

"""에이전트 응답 캐싱"""

def __init__(self, ttl_seconds=3600):

self.cache = {}

self.ttl = ttl_seconds

def get_cache_key(self, state: dict) -> str:

"""상태에서 캐시 키 생성"""

state_str = json.dumps(state, sort_keys=True, default=str)

return hashlib.sha256(state_str.encode()).hexdigest()

def get(self, state: dict):

"""캐시에서 결과 조회"""

key = self.get_cache_key(state)

if key in self.cache:

entry = self.cache[key]

if time.time() - entry["timestamp"] < self.ttl:

return entry["result"]

del self.cache[key]

return None

def set(self, state: dict, result):

"""결과를 캐시에 저장"""

key = self.get_cache_key(state)

self.cache[key] = {

"result": result,

"timestamp": time.time()

}

Conclusion

멀티에이전트 오케스트레이션은 단순히 여러 에이전트를 연결하는 것이 아니라, **작업의 특성에 맞는 패턴을 선택**하고 **견고한 장애 처리와 관찰 가능성**을 갖추는 것이 핵심이다.

핵심 정리:

1. **단일 에이전트**로 시작하고, 복잡도가 증가할 때 멀티에이전트로 전환

2. **계층형 패턴**은 중앙 제어가 필요한 경우에 적합

3. **파이프라인 패턴**은 순서가 정해진 워크플로우에 최적

4. **스웜 패턴**은 높은 자율성이 필요한 복잡한 시나리오에 적합

5. 프레임워크는 **LangGraph**(유연성), **CrewAI**(빠른 프로토타이핑), **AutoGen**(대화 기반)을 용도에 맞게 선택

References

- [LangGraph 공식 문서](https://langchain-ai.github.io/langgraph/)

- [CrewAI 공식 문서](https://docs.crewai.com/)

- [AutoGen 공식 문서](https://microsoft.github.io/autogen/)

- [Anthropic MCP 스펙](https://modelcontextprotocol.io/)

- [Gartner: Agentic AI Will Reshape Enterprise Applications](https://www.gartner.com/en/articles/intelligent-agent-in-ai)

- [NIST AI Agent Standards Initiative](https://www.nist.gov/artificial-intelligence)

- [LangChain Multi-Agent Architectures](https://blog.langchain.dev/multi-agent-architectures/)

- [OpenAI Swarm Framework](https://github.com/openai/swarm)

현재 단락 (1/742)

2026년 에이전틱 AI가 기업 애플리케이션의 40%에 탑재될 전망이다(Gartner). 단일 범용 에이전트에서 도메인 특화 멀티에이전트 협업으로 패러다임이 전환되고 있다. NIS...

작성 글자: 0원문 글자: 24,120작성 단락: 0/742