Skip to content
Published on

LangChain 고급 패턴: LCEL과 LangGraph 완전 정복

Authors
  • Name
    Twitter

1. LCEL이란? Pipe 연산자와 Runnable 인터페이스

LCEL(LangChain Expression Language)은 LangChain에서 체인을 구성하기 위한 선언적 표현 언어다. 기존의 LLMChain, SequentialChain 등 레거시 방식을 대체하며, LangChain 공식 문서에서 체인 구축의 권장 패턴으로 자리 잡았다.

LCEL의 핵심 아이디어는 Pipe 연산자 | 를 통해 여러 컴포넌트를 연결하는 것이다. Python의 __or__ 매직 메서드를 오버로딩하여, Unix 파이프라인처럼 데이터가 왼쪽에서 오른쪽으로 흘러가는 직관적인 구조를 만든다.

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

prompt = ChatPromptTemplate.from_template("'{topic}'에 대해 간략히 설명해주세요.")
model = ChatOpenAI(model="gpt-4o")
output_parser = StrOutputParser()

# LCEL Pipe 연산자로 체인 구성
chain = prompt | model | output_parser

result = chain.invoke({"topic": "양자 컴퓨팅"})
print(result)

위 코드에서 prompt | model | output_parser는 내부적으로 RunnableSequence를 생성한다. 각 컴포넌트는 Runnable 인터페이스를 구현하고 있으며, 이 인터페이스는 모든 LCEL 컴포넌트의 기반이 되는 프로토콜이다.

Runnable 인터페이스의 주요 메서드

메서드설명반환 타입
invoke(input)단일 입력에 대해 동기 실행단일 출력
ainvoke(input)단일 입력에 대해 비동기 실행단일 출력 (awaitable)
batch(inputs)복수 입력에 대해 병렬 실행리스트
abatch(inputs)복수 입력에 대해 비동기 병렬 실행리스트 (awaitable)
stream(input)단일 입력에 대해 스트리밍 실행Iterator
astream(input)단일 입력에 대해 비동기 스트리밍AsyncIterator

모든 LangChain 컴포넌트 -- ChatModel, Retriever, OutputParser, Tool 등 -- 가 이 Runnable 인터페이스를 구현하기 때문에, 어떤 두 컴포넌트든 | 연산자로 연결할 수 있다. 이것이 LCEL의 합성 가능성(composability)의 근간이다.


2. 핵심 Runnable 타입 상세 분석

LCEL은 단순한 순차 체인 외에도, 병렬 실행, 조건 분기, 입력 전달, 커스텀 함수 래핑 등 다양한 패턴을 지원하는 Runnable 타입을 제공한다.

2.1 RunnableParallel

RunnableParallel은 여러 Runnable을 병렬로 실행하고, 각 결과를 딕셔너리 형태로 합산하여 반환한다. LCEL에서는 딕셔너리 리터럴을 사용하면 자동으로 RunnableParallel로 변환된다.

from langchain_core.runnables import RunnableParallel

# 방법 1: 딕셔너리 리터럴 (자동 변환)
chain = {
    "summary": prompt_summary | model | output_parser,
    "keywords": prompt_keywords | model | output_parser,
} | combine_results

# 방법 2: 명시적 RunnableParallel
parallel = RunnableParallel(
    summary=prompt_summary | model | output_parser,
    keywords=prompt_keywords | model | output_parser,
)
result = parallel.invoke({"text": "LangChain은 LLM 애플리케이션 프레임워크입니다."})
# result = {"summary": "...", "keywords": "..."}

RunnableParallel은 내부적으로 각 브랜치를 동시에 실행하므로, 서로 독립적인 LLM 호출을 병렬화하여 전체 지연 시간을 줄이는 데 효과적이다.

2.2 RunnableBranch

RunnableBranch는 조건에 따라 서로 다른 Runnable을 실행하는 라우팅 메커니즘이다. (condition, runnable) 튜플의 리스트와 기본(default) Runnable을 인자로 받는다.

from langchain_core.runnables import RunnableBranch

branch = RunnableBranch(
    (lambda x: "코드" in x["topic"], code_chain),
    (lambda x: "수학" in x["topic"], math_chain),
    general_chain,  # default
)

# "코드"가 topic에 포함되면 code_chain 실행, "수학"이면 math_chain, 그 외에는 general_chain
result = branch.invoke({"topic": "코드 리팩토링 방법"})

조건 함수는 순서대로 평가되며, 첫 번째로 True를 반환하는 조건에 해당하는 Runnable이 실행된다. 어떤 조건도 만족하지 않으면 default Runnable이 실행된다.

2.3 RunnablePassthrough

RunnablePassthrough는 입력을 그대로 다음 단계로 전달한다. 주로 RunnableParallel과 함께 사용하여, 원본 입력을 보존하면서 동시에 다른 처리를 수행할 때 유용하다.

from langchain_core.runnables import RunnablePassthrough

chain = RunnableParallel(
    context=retriever,                    # 검색 결과
    question=RunnablePassthrough(),       # 원본 질문 그대로 전달
) | prompt | model | output_parser

RunnablePassthrough.assign()을 사용하면 기존 입력에 새로운 키-값을 추가할 수 있다.

chain = RunnablePassthrough.assign(
    context=lambda x: retriever.invoke(x["question"])
)
# 입력: {"question": "LangChain이란?"}
# 출력: {"question": "LangChain이란?", "context": [Document(...), ...]}

2.4 RunnableLambda

RunnableLambda는 일반 Python 함수를 Runnable로 래핑한다. 이를 통해 임의의 로직을 LCEL 체인에 삽입할 수 있다.

from langchain_core.runnables import RunnableLambda

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# 함수를 Runnable로 변환
format_runnable = RunnableLambda(format_docs)

# 체인에서 사용
chain = retriever | format_runnable | prompt | model | output_parser

@chain 데코레이터를 사용하면 더욱 간결하게 표현할 수 있다.

from langchain_core.runnables import chain

@chain
def format_and_classify(input_dict):
    text = input_dict["text"]
    formatted = text.strip().lower()
    category = "technical" if "api" in formatted else "general"
    return {"formatted": formatted, "category": category}

3. invoke, batch, stream, ainvoke 메서드 분석

Runnable 인터페이스의 각 메서드는 서로 다른 실행 패턴을 지원한다. 공식 문서에서 정의하는 각 메서드의 동작을 살펴보자.

invoke -- 단일 동기 실행

가장 기본적인 실행 방식이다. 하나의 입력을 받아 하나의 출력을 반환한다.

result = chain.invoke({"topic": "머신러닝"})

batch -- 병렬 일괄 실행

여러 입력을 리스트로 전달하면 내부적으로 병렬 처리하여 결과 리스트를 반환한다. max_concurrency 파라미터로 동시 실행 수를 제한할 수 있다.

results = chain.batch(
    [{"topic": "AI"}, {"topic": "블록체인"}, {"topic": "양자컴퓨팅"}],
    config={"max_concurrency": 2}
)
# results = ["AI는...", "블록체인은...", "양자컴퓨팅은..."]

stream -- 실시간 스트리밍

LLM 응답을 토큰 단위로 스트리밍한다. 사용자 경험을 크게 개선하는 핵심 기능이다.

for chunk in chain.stream({"topic": "딥러닝"}):
    print(chunk, end="", flush=True)

ainvoke / astream -- 비동기 실행

FastAPI, asyncio 환경에서 사용할 수 있는 비동기 버전이다. await 키워드와 함께 사용한다.

import asyncio

async def main():
    result = await chain.ainvoke({"topic": "강화학습"})
    print(result)

    async for chunk in chain.astream({"topic": "강화학습"}):
        print(chunk, end="", flush=True)

asyncio.run(main())

4. Streaming 구현: astream_events, astream_log

단순한 stream()은 최종 출력만 스트리밍하지만, 복잡한 체인에서는 중간 단계의 결과도 실시간으로 확인하고 싶을 때가 있다. LangChain은 이를 위해 astream_eventsastream_log를 제공한다.

astream_events

astream_events는 체인 실행 과정에서 발생하는 모든 이벤트를 StreamEvent 객체로 스트리밍한다. 각 이벤트에는 이벤트 종류(on_chain_start, on_llm_stream, on_chain_end 등), 이벤트 이름, 데이터가 포함된다.

async for event in chain.astream_events(
    {"topic": "트랜스포머"},
    version="v2"
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        # LLM 토큰 스트리밍
        print(event["data"]["chunk"].content, end="", flush=True)
    elif kind == "on_chain_start":
        print(f"\n--- 체인 시작: {event['name']} ---")
    elif kind == "on_chain_end":
        print(f"\n--- 체인 종료: {event['name']} ---")

astream_events는 중간 단계가 최종 입력에서만 동작하는 경우에도 중간 결과를 스트리밍할 수 있다는 점에서 매우 강력하다. 예를 들어, RAG 체인에서 Retriever가 문서를 가져오는 과정, Prompt가 생성되는 과정, LLM이 응답하는 과정을 모두 실시간으로 모니터링할 수 있다.

astream_log

astream_log는 실행 과정의 로그를 JSON Patch 형태로 스트리밍한다. include_namesinclude_tags 파라미터를 통해 특정 컴포넌트의 로그만 필터링할 수 있다.

async for log_patch in chain.astream_log(
    {"topic": "어텐션 메커니즘"},
    include_names=["ChatOpenAI"],
):
    for op in log_patch.ops:
        print(op)

일반적으로 astream_events가 더 직관적이고 사용하기 편리하므로, 공식 문서에서도 astream_events를 먼저 권장하고 있다.


5. LCEL로 RAG 체인 구축 예제

LCEL의 진정한 위력은 실전 패턴에서 발휘된다. RAG(Retrieval-Augmented Generation) 체인을 LCEL로 구현하면 다음과 같다.

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_community.vectorstores import FAISS

# 1. 벡터 스토어 및 Retriever 구성
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = FAISS.from_texts(
    ["LangChain은 LLM 애플리케이션 프레임워크다.",
     "LCEL은 LangChain Expression Language의 약자이다.",
     "LangGraph는 상태 기반 에이전트 프레임워크다."],
    embeddings
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

# 2. 문서 포맷팅 함수
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# 3. Prompt 템플릿
prompt = ChatPromptTemplate.from_template("""
다음 컨텍스트를 기반으로 질문에 답변하세요.

컨텍스트:
{context}

질문: {question}

답변:
""")

# 4. 모델 및 파서
model = ChatOpenAI(model="gpt-4o", temperature=0)
output_parser = StrOutputParser()

# 5. LCEL RAG 체인 구성
rag_chain = (
    RunnablePassthrough.assign(
        context=lambda x: format_docs(retriever.invoke(x["question"]))
    )
    | prompt
    | model
    | output_parser
)

# 6. 실행
result = rag_chain.invoke({"question": "LCEL이 무엇인가요?"})
print(result)

이 체인은 다음과 같은 흐름으로 동작한다.

  1. 입력 {"question": "LCEL이 무엇인가요?"}RunnablePassthrough.assign()에 전달된다.
  2. question은 그대로 유지되고, context에는 Retriever가 가져온 문서를 포맷한 문자열이 할당된다.
  3. promptcontextquestion을 결합하여 프롬프트를 생성한다.
  4. model이 프롬프트를 처리하고, output_parser가 최종 문자열을 추출한다.

6. LangGraph 기초: StateGraph, Node, Edge

LangGraph는 LangChain 팀이 만든 상태 기반 그래프 실행 프레임워크로, 복잡한 에이전트 워크플로우를 구현하기 위해 설계되었다. LCEL이 선형적인 체인에 강점을 보인다면, LangGraph는 순환(cyclic) 구조, 조건 분기, 상태 관리가 필요한 시나리오에 적합하다.

핵심 개념

  • StateGraph: 노드 간에 공유 상태(State)를 통해 통신하는 그래프. 상태 정의(TypedDict)를 인자로 받아 초기화한다.
  • Node: 상태를 입력으로 받아 상태 업데이트를 반환하는 함수. add_node()로 그래프에 등록한다.
  • Edge: 노드 간의 연결. add_edge()로 정의하며, 실행 흐름을 결정한다.
  • START / END: 그래프의 시작점과 종료점을 나타내는 특수 노드.
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from operator import add

# 1. 상태 정의
class AgentState(TypedDict):
    messages: Annotated[list[str], add]   # add reducer: 리스트를 누적
    current_step: str

# 2. 노드 함수 정의
def analyze_node(state: AgentState):
    return {
        "messages": ["분석 완료: 데이터를 처리했습니다."],
        "current_step": "analyze"
    }

def summarize_node(state: AgentState):
    return {
        "messages": ["요약 완료: 결과를 정리했습니다."],
        "current_step": "summarize"
    }

# 3. 그래프 구성
workflow = StateGraph(AgentState)
workflow.add_node("analyze", analyze_node)
workflow.add_node("summarize", summarize_node)

# 4. 엣지 정의
workflow.add_edge(START, "analyze")
workflow.add_edge("analyze", "summarize")
workflow.add_edge("summarize", END)

# 5. 컴파일 및 실행
graph = workflow.compile()
result = graph.invoke({
    "messages": ["시작"],
    "current_step": ""
})
print(result)
# {"messages": ["시작", "분석 완료: ...", "요약 완료: ..."], "current_step": "summarize"}

상태 정의에서 Annotated[list[str], add]reducer 함수를 지정하는 것이다. add reducer는 노드가 반환하는 리스트를 기존 리스트에 누적(append)한다. Reducer를 지정하지 않으면 기존 값을 덮어쓴다.

그래프는 반드시 .compile()을 호출하여 CompiledStateGraph로 변환해야 실행할 수 있으며, 컴파일된 그래프는 LCEL의 Runnable 인터페이스를 구현하므로 invoke(), stream(), ainvoke() 등을 모두 사용할 수 있다.


7. Conditional Edge로 분기 처리 (라우터 패턴)

LangGraph의 add_conditional_edges()는 노드의 실행 결과에 따라 다음으로 실행할 노드를 동적으로 결정하는 메커니즘이다. 이는 에이전트의 의사결정 로직을 구현하는 핵심 기능이다.

from langgraph.graph import StateGraph, START, END

class RouterState(TypedDict):
    query: str
    category: str
    result: str

def classify_node(state: RouterState):
    query = state["query"]
    if "코드" in query or "프로그래밍" in query:
        category = "code"
    elif "수학" in query or "계산" in query:
        category = "math"
    else:
        category = "general"
    return {"category": category}

def code_expert(state: RouterState):
    return {"result": f"코드 전문가가 답변합니다: {state['query']}"}

def math_expert(state: RouterState):
    return {"result": f"수학 전문가가 답변합니다: {state['query']}"}

def general_expert(state: RouterState):
    return {"result": f"일반 전문가가 답변합니다: {state['query']}"}

# 라우팅 함수
def route_query(state: RouterState) -> str:
    return state["category"]

# 그래프 구성
workflow = StateGraph(RouterState)
workflow.add_node("classify", classify_node)
workflow.add_node("code", code_expert)
workflow.add_node("math", math_expert)
workflow.add_node("general", general_expert)

workflow.add_edge(START, "classify")

# Conditional Edge: classify 노드 이후 라우팅
workflow.add_conditional_edges(
    "classify",           # 소스 노드
    route_query,          # 라우팅 함수
    {                     # 반환값 -> 대상 노드 매핑
        "code": "code",
        "math": "math",
        "general": "general",
    }
)

workflow.add_edge("code", END)
workflow.add_edge("math", END)
workflow.add_edge("general", END)

graph = workflow.compile()
result = graph.invoke({"query": "Python 코드 리팩토링 방법", "category": "", "result": ""})
print(result["result"])  # "코드 전문가가 답변합니다: ..."

add_conditional_edges()는 세 개의 인자를 받는다.

  1. source: 분기가 시작되는 노드 이름
  2. path: 상태를 받아 다음 노드 이름(또는 키)을 반환하는 라우팅 함수
  3. path_map (선택): 라우팅 함수의 반환값을 실제 노드 이름에 매핑하는 딕셔너리

라우팅 함수가 반환하는 문자열이 path_map에 없으면 런타임 에러가 발생하므로, 모든 경우의 수를 빠짐없이 매핑해야 한다.


8. Checkpointing과 State Persistence

LangGraph의 Checkpointer는 그래프 실행 상태를 영속적으로 저장하는 퍼시스턴스 레이어다. Checkpointer를 사용하면 그래프의 각 실행 단계(superstep)마다 상태 스냅샷이 저장되어, 실행을 일시 정지하고 나중에 재개하거나, 특정 시점으로 되돌아갈 수 있다.

Thread와 Checkpoint

  • Thread: 상태를 저장하고 검색하는 고유 식별자. thread_id를 통해 지정한다.
  • Checkpoint: 특정 시점의 그래프 상태 스냅샷. StateSnapshot 객체로 표현된다.
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from typing import TypedDict, Annotated
from operator import add

class ConversationState(TypedDict):
    messages: Annotated[list[str], add]

def chat_node(state: ConversationState):
    last_message = state["messages"][-1]
    response = f"'{last_message}'에 대한 응답입니다."
    return {"messages": [response]}

workflow = StateGraph(ConversationState)
workflow.add_node("chat", chat_node)
workflow.add_edge(START, "chat")
workflow.add_edge("chat", END)

# Checkpointer 적용
checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

# Thread ID를 지정하여 실행
config = {"configurable": {"thread_id": "user-session-1"}}

# 첫 번째 대화
graph.invoke({"messages": ["안녕하세요"]}, config)

# 두 번째 대화 (같은 thread_id -- 이전 상태가 유지됨)
result = graph.invoke({"messages": ["LangGraph에 대해 알려주세요"]}, config)
print(result["messages"])
# ["안녕하세요", "'안녕하세요'에 대한 응답입니다.",
#  "LangGraph에 대해 알려주세요", "'LangGraph에 대해 알려주세요'에 대한 응답입니다."]

상태 조회 및 타임 트래블

# 현재 상태 조회
snapshot = graph.get_state(config)
print(snapshot.values)   # 현재 상태 값
print(snapshot.next)     # 다음 실행될 노드

# 상태 히스토리 조회
for state in graph.get_state_history(config):
    print(state.config["configurable"]["checkpoint_id"], state.values)

특정 checkpoint로 되돌아가서 실행을 포크(fork)할 수도 있다.

# 특정 checkpoint에서 재실행
fork_config = {
    "configurable": {
        "thread_id": "user-session-1",
        "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"
    }
}
graph.invoke(None, config=fork_config)

Checkpointer 종류

구현체용도패키지
InMemorySaver개발/테스트langgraph 기본 포함
SqliteSaver로컬 워크플로우langgraph-checkpoint-sqlite
PostgresSaver프로덕션 배포langgraph-checkpoint-postgres

프로덕션 환경에서는 반드시 PostgresSaver와 같은 내구성 있는 checkpointer를 사용해야 한다.


9. Human-in-the-Loop 패턴 (interrupt, resume)

LangGraph의 interrupt 기능은 그래프 실행을 특정 지점에서 일시 정지하고, 사람의 입력을 받은 후 재개하는 패턴을 지원한다. 이를 위해서는 반드시 Checkpointer가 설정되어 있어야 한다.

interrupt 함수

interrupt() 함수는 노드 내부에서 호출하면 그래프 실행을 즉시 중단하고, 전달한 값을 호출자에게 반환한다. 이후 Command(resume=value)로 실행을 재개하면, interrupt() 호출 지점으로 돌아가 resume 값이 반환된다.

from langgraph.types import interrupt, Command

class ApprovalState(TypedDict):
    action: str
    approved: bool
    result: str

def plan_node(state: ApprovalState):
    return {"action": "중요 데이터베이스 마이그레이션 실행"}

def approval_node(state: ApprovalState):
    # 실행 일시 정지 -- 사람의 승인을 기다림
    response = interrupt({
        "question": "다음 작업을 승인하시겠습니까?",
        "action": state["action"]
    })
    return {"approved": response == "승인"}

def execute_node(state: ApprovalState):
    if state["approved"]:
        return {"result": "작업이 성공적으로 실행되었습니다."}
    return {"result": "작업이 취소되었습니다."}

workflow = StateGraph(ApprovalState)
workflow.add_node("plan", plan_node)
workflow.add_node("approval", approval_node)
workflow.add_node("execute", execute_node)

workflow.add_edge(START, "plan")
workflow.add_edge("plan", "approval")
workflow.add_edge("approval", "execute")
workflow.add_edge("execute", END)

checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "migration-1"}}

# 1단계: 그래프 실행 -> approval 노드에서 일시 정지
result = graph.invoke(
    {"action": "", "approved": False, "result": ""},
    config
)
# result에 __interrupt__ 정보가 포함됨

# 2단계: 사람이 검토 후 승인
final_result = graph.invoke(
    Command(resume="승인"),
    config
)
print(final_result["result"])  # "작업이 성공적으로 실행되었습니다."

interrupt 사용 시 주의사항

공식 문서에서 강조하는 중요한 규칙들이다.

  1. try/except로 감싸지 말 것: interrupt()는 내부적으로 예외를 발생시켜 실행을 중단하므로, bare try/except가 이를 잡아버리면 정상 동작하지 않는다.
  2. interrupt 호출 순서를 바꾸지 말 것: Resume 시 인덱스 기반으로 매칭하므로, 조건에 따라 interrupt 순서가 달라지면 안 된다.
  3. interrupt 전에 비멱등(non-idempotent) 작업을 하지 말 것: Resume 시 노드가 처음부터 다시 실행되므로, interrupt 이전의 코드도 재실행된다.
  4. JSON 직렬화 가능한 값만 전달할 것: 함수, 클래스 인스턴스 등은 전달할 수 없다.

10. LangGraph로 Multi-Agent 시스템 구축

LangGraph는 여러 에이전트가 협력하는 Multi-Agent 시스템을 구축하는 데 적합하다. 대표적인 패턴은 Supervisor 패턴으로, 중앙의 Supervisor 에이전트가 작업을 분배하고 결과를 취합한다.

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated, Literal
from operator import add

class MultiAgentState(TypedDict):
    query: str
    messages: Annotated[list[str], add]
    next_agent: str
    final_answer: str

# Supervisor: 어떤 에이전트에게 작업을 할당할지 결정
def supervisor_node(state: MultiAgentState):
    query = state["query"].lower()
    if "검색" in query or "찾아" in query:
        return {"next_agent": "researcher", "messages": ["Supervisor: 연구 에이전트에게 할당"]}
    elif "작성" in query or "써줘" in query:
        return {"next_agent": "writer", "messages": ["Supervisor: 작성 에이전트에게 할당"]}
    else:
        return {"next_agent": "analyst", "messages": ["Supervisor: 분석 에이전트에게 할당"]}

# 전문 에이전트들
def researcher_node(state: MultiAgentState):
    return {
        "messages": ["Researcher: 관련 자료를 검색하여 수집했습니다."],
        "final_answer": f"검색 결과: '{state['query']}'에 대한 자료를 찾았습니다."
    }

def writer_node(state: MultiAgentState):
    return {
        "messages": ["Writer: 요청에 맞게 콘텐츠를 작성했습니다."],
        "final_answer": f"작성 결과: '{state['query']}'에 대한 글을 작성했습니다."
    }

def analyst_node(state: MultiAgentState):
    return {
        "messages": ["Analyst: 데이터를 분석하여 인사이트를 도출했습니다."],
        "final_answer": f"분석 결과: '{state['query']}'에 대한 분석을 완료했습니다."
    }

def route_to_agent(state: MultiAgentState) -> str:
    return state["next_agent"]

# 그래프 구성
workflow = StateGraph(MultiAgentState)
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("researcher", researcher_node)
workflow.add_node("writer", writer_node)
workflow.add_node("analyst", analyst_node)

workflow.add_edge(START, "supervisor")
workflow.add_conditional_edges(
    "supervisor",
    route_to_agent,
    {
        "researcher": "researcher",
        "writer": "writer",
        "analyst": "analyst",
    }
)
workflow.add_edge("researcher", END)
workflow.add_edge("writer", END)
workflow.add_edge("analyst", END)

graph = workflow.compile()

result = graph.invoke({
    "query": "최신 AI 트렌드를 검색해줘",
    "messages": [],
    "next_agent": "",
    "final_answer": ""
})
print(result["final_answer"])

LangChain 팀은 langgraph-supervisor 라이브러리도 별도로 제공하고 있으며, 이를 통해 더 간결하게 Supervisor 패턴을 구현할 수 있다. 하지만 공식 문서에서는 직접 StateGraph로 구현하는 것이 context engineering에 대한 더 세밀한 제어를 가능하게 한다고 안내하고 있다.

Multi-Agent의 확장 패턴

  • 계층적 구조: Supervisor 아래에 Sub-Supervisor를 두어 대규모 에이전트 조직을 구성
  • 도구 공유: 여러 에이전트가 공통 도구를 공유하되, 각자의 전문 도구도 보유
  • 메모리 공유: Store 인터페이스를 통해 에이전트 간 장기 메모리 공유

11. LangGraph Studio 소개

LangGraph Studio는 LangChain 팀이 개발한 최초의 **에이전트 IDE(Integrated Development Environment)**다. LangGraph로 구축한 에이전트 워크플로우를 시각적으로 확인하고, 실시간으로 상호작용하며, 디버깅할 수 있는 데스크톱 애플리케이션이다.

주요 기능

  • 시각적 그래프 렌더링: StateGraph의 노드, 엣지, 조건 분기를 시각적으로 표현
  • 실시간 실행 추적: 각 노드의 입출력과 상태 변화를 실시간으로 관찰
  • 대화형 디버깅: 특정 노드에서 실행을 멈추고, 상태를 수정한 후 재실행
  • Human-in-the-Loop 테스트: interrupt 지점에서 직접 입력을 제공하여 워크플로우 검증
  • LangSmith 통합: Tracing, Evaluation, Prompt Engineering과의 연동

설정 방법

LangGraph Studio를 사용하려면 프로젝트 루트에 langgraph.json 설정 파일이 필요하다.

{
  "graphs": {
    "my_agent": "./agent.py:graph"
  },
  "dependencies": ["langchain", "langgraph", "langchain-openai"],
  "env": ".env"
}

이 파일은 에이전트 그래프의 위치, 필요한 의존성, 환경 변수 파일을 지정한다. LangGraph Studio는 현재 Apple Silicon Mac에서 사용 가능하며, LangSmith 계정(무료 포함)으로 접근할 수 있다.


12. 실전 예제: 문서 요약 + QA 복합 워크플로우

마지막으로, LCEL과 LangGraph를 결합한 실전 워크플로우를 구축해 보자. 이 워크플로우는 문서를 받아서 요약한 뒤, 사용자의 질문에 답변하는 복합 시스템이다.

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import TypedDict, Annotated
from operator import add

# 상태 정의
class DocWorkflowState(TypedDict):
    document: str
    summary: str
    questions: Annotated[list[str], add]
    answers: Annotated[list[str], add]
    current_mode: str  # "summarize" | "qa" | "done"

# LLM 및 체인 준비
llm = ChatOpenAI(model="gpt-4o", temperature=0)

summary_prompt = ChatPromptTemplate.from_template(
    "다음 문서를 3줄로 요약하세요:\n\n{document}"
)
summary_chain = summary_prompt | llm | StrOutputParser()

qa_prompt = ChatPromptTemplate.from_template(
    "다음 문서를 참고하여 질문에 답변하세요.\n\n"
    "문서:\n{document}\n\n"
    "요약:\n{summary}\n\n"
    "질문: {question}\n\n답변:"
)
qa_chain = qa_prompt | llm | StrOutputParser()

# 노드 정의
def summarize_node(state: DocWorkflowState):
    summary = summary_chain.invoke({"document": state["document"]})
    return {"summary": summary, "current_mode": "qa"}

def ask_question_node(state: DocWorkflowState):
    # Human-in-the-Loop: 사용자에게 질문을 받음
    user_input = interrupt({
        "message": "문서에 대해 궁금한 점을 질문하세요. '종료'를 입력하면 끝납니다.",
        "summary": state["summary"]
    })
    if user_input == "종료":
        return {"current_mode": "done"}
    return {"questions": [user_input], "current_mode": "qa"}

def answer_node(state: DocWorkflowState):
    question = state["questions"][-1]
    answer = qa_chain.invoke({
        "document": state["document"],
        "summary": state["summary"],
        "question": question,
    })
    return {"answers": [answer]}

# 라우팅 함수
def route_after_answer(state: DocWorkflowState) -> str:
    return state["current_mode"]

def route_after_question(state: DocWorkflowState) -> str:
    if state["current_mode"] == "done":
        return "done"
    return "answer"

# 그래프 구성
workflow = StateGraph(DocWorkflowState)
workflow.add_node("summarize", summarize_node)
workflow.add_node("ask_question", ask_question_node)
workflow.add_node("answer", answer_node)

workflow.add_edge(START, "summarize")
workflow.add_edge("summarize", "ask_question")

workflow.add_conditional_edges(
    "ask_question",
    route_after_question,
    {"answer": "answer", "done": END}
)

# 답변 후 다시 질문 단계로 돌아감 (순환 구조)
workflow.add_edge("answer", "ask_question")

# 컴파일
checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

# 실행
config = {"configurable": {"thread_id": "doc-workflow-1"}}

# 1단계: 문서 전달 -> 요약 생성 -> 질문 대기 (interrupt)
result = graph.invoke(
    {
        "document": "LangGraph는 LangChain 팀이 개발한 상태 기반 에이전트 프레임워크로...",
        "summary": "",
        "questions": [],
        "answers": [],
        "current_mode": "summarize",
    },
    config
)

# 2단계: 사용자 질문 -> 답변 -> 다시 질문 대기
result = graph.invoke(Command(resume="LangGraph의 주요 장점은?"), config)

# 3단계: 추가 질문
result = graph.invoke(Command(resume="LCEL과의 차이점은?"), config)

# 4단계: 종료
result = graph.invoke(Command(resume="종료"), config)

이 워크플로우는 다음과 같은 LangGraph의 핵심 기능을 모두 활용한다.

  • StateGraph: 전체 워크플로우의 상태 관리
  • Conditional Edge: 사용자 입력에 따른 동적 분기
  • 순환 구조(Cycle): 답변 후 다시 질문으로 돌아가는 반복 구조
  • Checkpointer: 대화 상태의 영속적 저장
  • interrupt / Command: Human-in-the-Loop 패턴
  • LCEL 체인: 요약 및 QA를 위한 LLM 체인

이처럼 LCEL로 개별 LLM 체인을 구성하고, LangGraph로 전체 워크플로우의 흐름과 상태를 관리하는 것이 LangChain 생태계에서 권장하는 아키텍처 패턴이다.


13. References