Skip to content

Split View: LangChain 고급 패턴: LCEL과 LangGraph 완전 정복

✨ Learn with Quiz
|

LangChain 고급 패턴: LCEL과 LangGraph 완전 정복

1. LCEL이란? Pipe 연산자와 Runnable 인터페이스

LCEL(LangChain Expression Language)은 LangChain에서 체인을 구성하기 위한 선언적 표현 언어다. 기존의 LLMChain, SequentialChain 등 레거시 방식을 대체하며, LangChain 공식 문서에서 체인 구축의 권장 패턴으로 자리 잡았다.

LCEL의 핵심 아이디어는 Pipe 연산자 | 를 통해 여러 컴포넌트를 연결하는 것이다. Python의 __or__ 매직 메서드를 오버로딩하여, Unix 파이프라인처럼 데이터가 왼쪽에서 오른쪽으로 흘러가는 직관적인 구조를 만든다.

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

prompt = ChatPromptTemplate.from_template("'{topic}'에 대해 간략히 설명해주세요.")
model = ChatOpenAI(model="gpt-4o")
output_parser = StrOutputParser()

# LCEL Pipe 연산자로 체인 구성
chain = prompt | model | output_parser

result = chain.invoke({"topic": "양자 컴퓨팅"})
print(result)

위 코드에서 prompt | model | output_parser는 내부적으로 RunnableSequence를 생성한다. 각 컴포넌트는 Runnable 인터페이스를 구현하고 있으며, 이 인터페이스는 모든 LCEL 컴포넌트의 기반이 되는 프로토콜이다.

Runnable 인터페이스의 주요 메서드

메서드설명반환 타입
invoke(input)단일 입력에 대해 동기 실행단일 출력
ainvoke(input)단일 입력에 대해 비동기 실행단일 출력 (awaitable)
batch(inputs)복수 입력에 대해 병렬 실행리스트
abatch(inputs)복수 입력에 대해 비동기 병렬 실행리스트 (awaitable)
stream(input)단일 입력에 대해 스트리밍 실행Iterator
astream(input)단일 입력에 대해 비동기 스트리밍AsyncIterator

모든 LangChain 컴포넌트 -- ChatModel, Retriever, OutputParser, Tool 등 -- 가 이 Runnable 인터페이스를 구현하기 때문에, 어떤 두 컴포넌트든 | 연산자로 연결할 수 있다. 이것이 LCEL의 합성 가능성(composability)의 근간이다.


2. 핵심 Runnable 타입 상세 분석

LCEL은 단순한 순차 체인 외에도, 병렬 실행, 조건 분기, 입력 전달, 커스텀 함수 래핑 등 다양한 패턴을 지원하는 Runnable 타입을 제공한다.

2.1 RunnableParallel

RunnableParallel은 여러 Runnable을 병렬로 실행하고, 각 결과를 딕셔너리 형태로 합산하여 반환한다. LCEL에서는 딕셔너리 리터럴을 사용하면 자동으로 RunnableParallel로 변환된다.

from langchain_core.runnables import RunnableParallel

# 방법 1: 딕셔너리 리터럴 (자동 변환)
chain = {
    "summary": prompt_summary | model | output_parser,
    "keywords": prompt_keywords | model | output_parser,
} | combine_results

# 방법 2: 명시적 RunnableParallel
parallel = RunnableParallel(
    summary=prompt_summary | model | output_parser,
    keywords=prompt_keywords | model | output_parser,
)
result = parallel.invoke({"text": "LangChain은 LLM 애플리케이션 프레임워크입니다."})
# result = {"summary": "...", "keywords": "..."}

RunnableParallel은 내부적으로 각 브랜치를 동시에 실행하므로, 서로 독립적인 LLM 호출을 병렬화하여 전체 지연 시간을 줄이는 데 효과적이다.

2.2 RunnableBranch

RunnableBranch는 조건에 따라 서로 다른 Runnable을 실행하는 라우팅 메커니즘이다. (condition, runnable) 튜플의 리스트와 기본(default) Runnable을 인자로 받는다.

from langchain_core.runnables import RunnableBranch

branch = RunnableBranch(
    (lambda x: "코드" in x["topic"], code_chain),
    (lambda x: "수학" in x["topic"], math_chain),
    general_chain,  # default
)

# "코드"가 topic에 포함되면 code_chain 실행, "수학"이면 math_chain, 그 외에는 general_chain
result = branch.invoke({"topic": "코드 리팩토링 방법"})

조건 함수는 순서대로 평가되며, 첫 번째로 True를 반환하는 조건에 해당하는 Runnable이 실행된다. 어떤 조건도 만족하지 않으면 default Runnable이 실행된다.

2.3 RunnablePassthrough

RunnablePassthrough는 입력을 그대로 다음 단계로 전달한다. 주로 RunnableParallel과 함께 사용하여, 원본 입력을 보존하면서 동시에 다른 처리를 수행할 때 유용하다.

from langchain_core.runnables import RunnablePassthrough

chain = RunnableParallel(
    context=retriever,                    # 검색 결과
    question=RunnablePassthrough(),       # 원본 질문 그대로 전달
) | prompt | model | output_parser

RunnablePassthrough.assign()을 사용하면 기존 입력에 새로운 키-값을 추가할 수 있다.

chain = RunnablePassthrough.assign(
    context=lambda x: retriever.invoke(x["question"])
)
# 입력: {"question": "LangChain이란?"}
# 출력: {"question": "LangChain이란?", "context": [Document(...), ...]}

2.4 RunnableLambda

RunnableLambda는 일반 Python 함수를 Runnable로 래핑한다. 이를 통해 임의의 로직을 LCEL 체인에 삽입할 수 있다.

from langchain_core.runnables import RunnableLambda

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# 함수를 Runnable로 변환
format_runnable = RunnableLambda(format_docs)

# 체인에서 사용
chain = retriever | format_runnable | prompt | model | output_parser

@chain 데코레이터를 사용하면 더욱 간결하게 표현할 수 있다.

from langchain_core.runnables import chain

@chain
def format_and_classify(input_dict):
    text = input_dict["text"]
    formatted = text.strip().lower()
    category = "technical" if "api" in formatted else "general"
    return {"formatted": formatted, "category": category}

3. invoke, batch, stream, ainvoke 메서드 분석

Runnable 인터페이스의 각 메서드는 서로 다른 실행 패턴을 지원한다. 공식 문서에서 정의하는 각 메서드의 동작을 살펴보자.

invoke -- 단일 동기 실행

가장 기본적인 실행 방식이다. 하나의 입력을 받아 하나의 출력을 반환한다.

result = chain.invoke({"topic": "머신러닝"})

batch -- 병렬 일괄 실행

여러 입력을 리스트로 전달하면 내부적으로 병렬 처리하여 결과 리스트를 반환한다. max_concurrency 파라미터로 동시 실행 수를 제한할 수 있다.

results = chain.batch(
    [{"topic": "AI"}, {"topic": "블록체인"}, {"topic": "양자컴퓨팅"}],
    config={"max_concurrency": 2}
)
# results = ["AI는...", "블록체인은...", "양자컴퓨팅은..."]

stream -- 실시간 스트리밍

LLM 응답을 토큰 단위로 스트리밍한다. 사용자 경험을 크게 개선하는 핵심 기능이다.

for chunk in chain.stream({"topic": "딥러닝"}):
    print(chunk, end="", flush=True)

ainvoke / astream -- 비동기 실행

FastAPI, asyncio 환경에서 사용할 수 있는 비동기 버전이다. await 키워드와 함께 사용한다.

import asyncio

async def main():
    result = await chain.ainvoke({"topic": "강화학습"})
    print(result)

    async for chunk in chain.astream({"topic": "강화학습"}):
        print(chunk, end="", flush=True)

asyncio.run(main())

4. Streaming 구현: astream_events, astream_log

단순한 stream()은 최종 출력만 스트리밍하지만, 복잡한 체인에서는 중간 단계의 결과도 실시간으로 확인하고 싶을 때가 있다. LangChain은 이를 위해 astream_eventsastream_log를 제공한다.

astream_events

astream_events는 체인 실행 과정에서 발생하는 모든 이벤트를 StreamEvent 객체로 스트리밍한다. 각 이벤트에는 이벤트 종류(on_chain_start, on_llm_stream, on_chain_end 등), 이벤트 이름, 데이터가 포함된다.

async for event in chain.astream_events(
    {"topic": "트랜스포머"},
    version="v2"
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        # LLM 토큰 스트리밍
        print(event["data"]["chunk"].content, end="", flush=True)
    elif kind == "on_chain_start":
        print(f"\n--- 체인 시작: {event['name']} ---")
    elif kind == "on_chain_end":
        print(f"\n--- 체인 종료: {event['name']} ---")

astream_events는 중간 단계가 최종 입력에서만 동작하는 경우에도 중간 결과를 스트리밍할 수 있다는 점에서 매우 강력하다. 예를 들어, RAG 체인에서 Retriever가 문서를 가져오는 과정, Prompt가 생성되는 과정, LLM이 응답하는 과정을 모두 실시간으로 모니터링할 수 있다.

astream_log

astream_log는 실행 과정의 로그를 JSON Patch 형태로 스트리밍한다. include_namesinclude_tags 파라미터를 통해 특정 컴포넌트의 로그만 필터링할 수 있다.

async for log_patch in chain.astream_log(
    {"topic": "어텐션 메커니즘"},
    include_names=["ChatOpenAI"],
):
    for op in log_patch.ops:
        print(op)

일반적으로 astream_events가 더 직관적이고 사용하기 편리하므로, 공식 문서에서도 astream_events를 먼저 권장하고 있다.


5. LCEL로 RAG 체인 구축 예제

LCEL의 진정한 위력은 실전 패턴에서 발휘된다. RAG(Retrieval-Augmented Generation) 체인을 LCEL로 구현하면 다음과 같다.

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_community.vectorstores import FAISS

# 1. 벡터 스토어 및 Retriever 구성
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = FAISS.from_texts(
    ["LangChain은 LLM 애플리케이션 프레임워크다.",
     "LCEL은 LangChain Expression Language의 약자이다.",
     "LangGraph는 상태 기반 에이전트 프레임워크다."],
    embeddings
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

# 2. 문서 포맷팅 함수
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# 3. Prompt 템플릿
prompt = ChatPromptTemplate.from_template("""
다음 컨텍스트를 기반으로 질문에 답변하세요.

컨텍스트:
{context}

질문: {question}

답변:
""")

# 4. 모델 및 파서
model = ChatOpenAI(model="gpt-4o", temperature=0)
output_parser = StrOutputParser()

# 5. LCEL RAG 체인 구성
rag_chain = (
    RunnablePassthrough.assign(
        context=lambda x: format_docs(retriever.invoke(x["question"]))
    )
    | prompt
    | model
    | output_parser
)

# 6. 실행
result = rag_chain.invoke({"question": "LCEL이 무엇인가요?"})
print(result)

이 체인은 다음과 같은 흐름으로 동작한다.

  1. 입력 {"question": "LCEL이 무엇인가요?"}RunnablePassthrough.assign()에 전달된다.
  2. question은 그대로 유지되고, context에는 Retriever가 가져온 문서를 포맷한 문자열이 할당된다.
  3. promptcontextquestion을 결합하여 프롬프트를 생성한다.
  4. model이 프롬프트를 처리하고, output_parser가 최종 문자열을 추출한다.

6. LangGraph 기초: StateGraph, Node, Edge

LangGraph는 LangChain 팀이 만든 상태 기반 그래프 실행 프레임워크로, 복잡한 에이전트 워크플로우를 구현하기 위해 설계되었다. LCEL이 선형적인 체인에 강점을 보인다면, LangGraph는 순환(cyclic) 구조, 조건 분기, 상태 관리가 필요한 시나리오에 적합하다.

핵심 개념

  • StateGraph: 노드 간에 공유 상태(State)를 통해 통신하는 그래프. 상태 정의(TypedDict)를 인자로 받아 초기화한다.
  • Node: 상태를 입력으로 받아 상태 업데이트를 반환하는 함수. add_node()로 그래프에 등록한다.
  • Edge: 노드 간의 연결. add_edge()로 정의하며, 실행 흐름을 결정한다.
  • START / END: 그래프의 시작점과 종료점을 나타내는 특수 노드.
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from operator import add

# 1. 상태 정의
class AgentState(TypedDict):
    messages: Annotated[list[str], add]   # add reducer: 리스트를 누적
    current_step: str

# 2. 노드 함수 정의
def analyze_node(state: AgentState):
    return {
        "messages": ["분석 완료: 데이터를 처리했습니다."],
        "current_step": "analyze"
    }

def summarize_node(state: AgentState):
    return {
        "messages": ["요약 완료: 결과를 정리했습니다."],
        "current_step": "summarize"
    }

# 3. 그래프 구성
workflow = StateGraph(AgentState)
workflow.add_node("analyze", analyze_node)
workflow.add_node("summarize", summarize_node)

# 4. 엣지 정의
workflow.add_edge(START, "analyze")
workflow.add_edge("analyze", "summarize")
workflow.add_edge("summarize", END)

# 5. 컴파일 및 실행
graph = workflow.compile()
result = graph.invoke({
    "messages": ["시작"],
    "current_step": ""
})
print(result)
# {"messages": ["시작", "분석 완료: ...", "요약 완료: ..."], "current_step": "summarize"}

상태 정의에서 Annotated[list[str], add]reducer 함수를 지정하는 것이다. add reducer는 노드가 반환하는 리스트를 기존 리스트에 누적(append)한다. Reducer를 지정하지 않으면 기존 값을 덮어쓴다.

그래프는 반드시 .compile()을 호출하여 CompiledStateGraph로 변환해야 실행할 수 있으며, 컴파일된 그래프는 LCEL의 Runnable 인터페이스를 구현하므로 invoke(), stream(), ainvoke() 등을 모두 사용할 수 있다.


7. Conditional Edge로 분기 처리 (라우터 패턴)

LangGraph의 add_conditional_edges()는 노드의 실행 결과에 따라 다음으로 실행할 노드를 동적으로 결정하는 메커니즘이다. 이는 에이전트의 의사결정 로직을 구현하는 핵심 기능이다.

from langgraph.graph import StateGraph, START, END

class RouterState(TypedDict):
    query: str
    category: str
    result: str

def classify_node(state: RouterState):
    query = state["query"]
    if "코드" in query or "프로그래밍" in query:
        category = "code"
    elif "수학" in query or "계산" in query:
        category = "math"
    else:
        category = "general"
    return {"category": category}

def code_expert(state: RouterState):
    return {"result": f"코드 전문가가 답변합니다: {state['query']}"}

def math_expert(state: RouterState):
    return {"result": f"수학 전문가가 답변합니다: {state['query']}"}

def general_expert(state: RouterState):
    return {"result": f"일반 전문가가 답변합니다: {state['query']}"}

# 라우팅 함수
def route_query(state: RouterState) -> str:
    return state["category"]

# 그래프 구성
workflow = StateGraph(RouterState)
workflow.add_node("classify", classify_node)
workflow.add_node("code", code_expert)
workflow.add_node("math", math_expert)
workflow.add_node("general", general_expert)

workflow.add_edge(START, "classify")

# Conditional Edge: classify 노드 이후 라우팅
workflow.add_conditional_edges(
    "classify",           # 소스 노드
    route_query,          # 라우팅 함수
    {                     # 반환값 -> 대상 노드 매핑
        "code": "code",
        "math": "math",
        "general": "general",
    }
)

workflow.add_edge("code", END)
workflow.add_edge("math", END)
workflow.add_edge("general", END)

graph = workflow.compile()
result = graph.invoke({"query": "Python 코드 리팩토링 방법", "category": "", "result": ""})
print(result["result"])  # "코드 전문가가 답변합니다: ..."

add_conditional_edges()는 세 개의 인자를 받는다.

  1. source: 분기가 시작되는 노드 이름
  2. path: 상태를 받아 다음 노드 이름(또는 키)을 반환하는 라우팅 함수
  3. path_map (선택): 라우팅 함수의 반환값을 실제 노드 이름에 매핑하는 딕셔너리

라우팅 함수가 반환하는 문자열이 path_map에 없으면 런타임 에러가 발생하므로, 모든 경우의 수를 빠짐없이 매핑해야 한다.


8. Checkpointing과 State Persistence

LangGraph의 Checkpointer는 그래프 실행 상태를 영속적으로 저장하는 퍼시스턴스 레이어다. Checkpointer를 사용하면 그래프의 각 실행 단계(superstep)마다 상태 스냅샷이 저장되어, 실행을 일시 정지하고 나중에 재개하거나, 특정 시점으로 되돌아갈 수 있다.

Thread와 Checkpoint

  • Thread: 상태를 저장하고 검색하는 고유 식별자. thread_id를 통해 지정한다.
  • Checkpoint: 특정 시점의 그래프 상태 스냅샷. StateSnapshot 객체로 표현된다.
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from typing import TypedDict, Annotated
from operator import add

class ConversationState(TypedDict):
    messages: Annotated[list[str], add]

def chat_node(state: ConversationState):
    last_message = state["messages"][-1]
    response = f"'{last_message}'에 대한 응답입니다."
    return {"messages": [response]}

workflow = StateGraph(ConversationState)
workflow.add_node("chat", chat_node)
workflow.add_edge(START, "chat")
workflow.add_edge("chat", END)

# Checkpointer 적용
checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

# Thread ID를 지정하여 실행
config = {"configurable": {"thread_id": "user-session-1"}}

# 첫 번째 대화
graph.invoke({"messages": ["안녕하세요"]}, config)

# 두 번째 대화 (같은 thread_id -- 이전 상태가 유지됨)
result = graph.invoke({"messages": ["LangGraph에 대해 알려주세요"]}, config)
print(result["messages"])
# ["안녕하세요", "'안녕하세요'에 대한 응답입니다.",
#  "LangGraph에 대해 알려주세요", "'LangGraph에 대해 알려주세요'에 대한 응답입니다."]

상태 조회 및 타임 트래블

# 현재 상태 조회
snapshot = graph.get_state(config)
print(snapshot.values)   # 현재 상태 값
print(snapshot.next)     # 다음 실행될 노드

# 상태 히스토리 조회
for state in graph.get_state_history(config):
    print(state.config["configurable"]["checkpoint_id"], state.values)

특정 checkpoint로 되돌아가서 실행을 포크(fork)할 수도 있다.

# 특정 checkpoint에서 재실행
fork_config = {
    "configurable": {
        "thread_id": "user-session-1",
        "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"
    }
}
graph.invoke(None, config=fork_config)

Checkpointer 종류

구현체용도패키지
InMemorySaver개발/테스트langgraph 기본 포함
SqliteSaver로컬 워크플로우langgraph-checkpoint-sqlite
PostgresSaver프로덕션 배포langgraph-checkpoint-postgres

프로덕션 환경에서는 반드시 PostgresSaver와 같은 내구성 있는 checkpointer를 사용해야 한다.


9. Human-in-the-Loop 패턴 (interrupt, resume)

LangGraph의 interrupt 기능은 그래프 실행을 특정 지점에서 일시 정지하고, 사람의 입력을 받은 후 재개하는 패턴을 지원한다. 이를 위해서는 반드시 Checkpointer가 설정되어 있어야 한다.

interrupt 함수

interrupt() 함수는 노드 내부에서 호출하면 그래프 실행을 즉시 중단하고, 전달한 값을 호출자에게 반환한다. 이후 Command(resume=value)로 실행을 재개하면, interrupt() 호출 지점으로 돌아가 resume 값이 반환된다.

from langgraph.types import interrupt, Command

class ApprovalState(TypedDict):
    action: str
    approved: bool
    result: str

def plan_node(state: ApprovalState):
    return {"action": "중요 데이터베이스 마이그레이션 실행"}

def approval_node(state: ApprovalState):
    # 실행 일시 정지 -- 사람의 승인을 기다림
    response = interrupt({
        "question": "다음 작업을 승인하시겠습니까?",
        "action": state["action"]
    })
    return {"approved": response == "승인"}

def execute_node(state: ApprovalState):
    if state["approved"]:
        return {"result": "작업이 성공적으로 실행되었습니다."}
    return {"result": "작업이 취소되었습니다."}

workflow = StateGraph(ApprovalState)
workflow.add_node("plan", plan_node)
workflow.add_node("approval", approval_node)
workflow.add_node("execute", execute_node)

workflow.add_edge(START, "plan")
workflow.add_edge("plan", "approval")
workflow.add_edge("approval", "execute")
workflow.add_edge("execute", END)

checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "migration-1"}}

# 1단계: 그래프 실행 -> approval 노드에서 일시 정지
result = graph.invoke(
    {"action": "", "approved": False, "result": ""},
    config
)
# result에 __interrupt__ 정보가 포함됨

# 2단계: 사람이 검토 후 승인
final_result = graph.invoke(
    Command(resume="승인"),
    config
)
print(final_result["result"])  # "작업이 성공적으로 실행되었습니다."

interrupt 사용 시 주의사항

공식 문서에서 강조하는 중요한 규칙들이다.

  1. try/except로 감싸지 말 것: interrupt()는 내부적으로 예외를 발생시켜 실행을 중단하므로, bare try/except가 이를 잡아버리면 정상 동작하지 않는다.
  2. interrupt 호출 순서를 바꾸지 말 것: Resume 시 인덱스 기반으로 매칭하므로, 조건에 따라 interrupt 순서가 달라지면 안 된다.
  3. interrupt 전에 비멱등(non-idempotent) 작업을 하지 말 것: Resume 시 노드가 처음부터 다시 실행되므로, interrupt 이전의 코드도 재실행된다.
  4. JSON 직렬화 가능한 값만 전달할 것: 함수, 클래스 인스턴스 등은 전달할 수 없다.

10. LangGraph로 Multi-Agent 시스템 구축

LangGraph는 여러 에이전트가 협력하는 Multi-Agent 시스템을 구축하는 데 적합하다. 대표적인 패턴은 Supervisor 패턴으로, 중앙의 Supervisor 에이전트가 작업을 분배하고 결과를 취합한다.

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated, Literal
from operator import add

class MultiAgentState(TypedDict):
    query: str
    messages: Annotated[list[str], add]
    next_agent: str
    final_answer: str

# Supervisor: 어떤 에이전트에게 작업을 할당할지 결정
def supervisor_node(state: MultiAgentState):
    query = state["query"].lower()
    if "검색" in query or "찾아" in query:
        return {"next_agent": "researcher", "messages": ["Supervisor: 연구 에이전트에게 할당"]}
    elif "작성" in query or "써줘" in query:
        return {"next_agent": "writer", "messages": ["Supervisor: 작성 에이전트에게 할당"]}
    else:
        return {"next_agent": "analyst", "messages": ["Supervisor: 분석 에이전트에게 할당"]}

# 전문 에이전트들
def researcher_node(state: MultiAgentState):
    return {
        "messages": ["Researcher: 관련 자료를 검색하여 수집했습니다."],
        "final_answer": f"검색 결과: '{state['query']}'에 대한 자료를 찾았습니다."
    }

def writer_node(state: MultiAgentState):
    return {
        "messages": ["Writer: 요청에 맞게 콘텐츠를 작성했습니다."],
        "final_answer": f"작성 결과: '{state['query']}'에 대한 글을 작성했습니다."
    }

def analyst_node(state: MultiAgentState):
    return {
        "messages": ["Analyst: 데이터를 분석하여 인사이트를 도출했습니다."],
        "final_answer": f"분석 결과: '{state['query']}'에 대한 분석을 완료했습니다."
    }

def route_to_agent(state: MultiAgentState) -> str:
    return state["next_agent"]

# 그래프 구성
workflow = StateGraph(MultiAgentState)
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("researcher", researcher_node)
workflow.add_node("writer", writer_node)
workflow.add_node("analyst", analyst_node)

workflow.add_edge(START, "supervisor")
workflow.add_conditional_edges(
    "supervisor",
    route_to_agent,
    {
        "researcher": "researcher",
        "writer": "writer",
        "analyst": "analyst",
    }
)
workflow.add_edge("researcher", END)
workflow.add_edge("writer", END)
workflow.add_edge("analyst", END)

graph = workflow.compile()

result = graph.invoke({
    "query": "최신 AI 트렌드를 검색해줘",
    "messages": [],
    "next_agent": "",
    "final_answer": ""
})
print(result["final_answer"])

LangChain 팀은 langgraph-supervisor 라이브러리도 별도로 제공하고 있으며, 이를 통해 더 간결하게 Supervisor 패턴을 구현할 수 있다. 하지만 공식 문서에서는 직접 StateGraph로 구현하는 것이 context engineering에 대한 더 세밀한 제어를 가능하게 한다고 안내하고 있다.

Multi-Agent의 확장 패턴

  • 계층적 구조: Supervisor 아래에 Sub-Supervisor를 두어 대규모 에이전트 조직을 구성
  • 도구 공유: 여러 에이전트가 공통 도구를 공유하되, 각자의 전문 도구도 보유
  • 메모리 공유: Store 인터페이스를 통해 에이전트 간 장기 메모리 공유

11. LangGraph Studio 소개

LangGraph Studio는 LangChain 팀이 개발한 최초의 **에이전트 IDE(Integrated Development Environment)**다. LangGraph로 구축한 에이전트 워크플로우를 시각적으로 확인하고, 실시간으로 상호작용하며, 디버깅할 수 있는 데스크톱 애플리케이션이다.

주요 기능

  • 시각적 그래프 렌더링: StateGraph의 노드, 엣지, 조건 분기를 시각적으로 표현
  • 실시간 실행 추적: 각 노드의 입출력과 상태 변화를 실시간으로 관찰
  • 대화형 디버깅: 특정 노드에서 실행을 멈추고, 상태를 수정한 후 재실행
  • Human-in-the-Loop 테스트: interrupt 지점에서 직접 입력을 제공하여 워크플로우 검증
  • LangSmith 통합: Tracing, Evaluation, Prompt Engineering과의 연동

설정 방법

LangGraph Studio를 사용하려면 프로젝트 루트에 langgraph.json 설정 파일이 필요하다.

{
  "graphs": {
    "my_agent": "./agent.py:graph"
  },
  "dependencies": ["langchain", "langgraph", "langchain-openai"],
  "env": ".env"
}

이 파일은 에이전트 그래프의 위치, 필요한 의존성, 환경 변수 파일을 지정한다. LangGraph Studio는 현재 Apple Silicon Mac에서 사용 가능하며, LangSmith 계정(무료 포함)으로 접근할 수 있다.


12. 실전 예제: 문서 요약 + QA 복합 워크플로우

마지막으로, LCEL과 LangGraph를 결합한 실전 워크플로우를 구축해 보자. 이 워크플로우는 문서를 받아서 요약한 뒤, 사용자의 질문에 답변하는 복합 시스템이다.

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import TypedDict, Annotated
from operator import add

# 상태 정의
class DocWorkflowState(TypedDict):
    document: str
    summary: str
    questions: Annotated[list[str], add]
    answers: Annotated[list[str], add]
    current_mode: str  # "summarize" | "qa" | "done"

# LLM 및 체인 준비
llm = ChatOpenAI(model="gpt-4o", temperature=0)

summary_prompt = ChatPromptTemplate.from_template(
    "다음 문서를 3줄로 요약하세요:\n\n{document}"
)
summary_chain = summary_prompt | llm | StrOutputParser()

qa_prompt = ChatPromptTemplate.from_template(
    "다음 문서를 참고하여 질문에 답변하세요.\n\n"
    "문서:\n{document}\n\n"
    "요약:\n{summary}\n\n"
    "질문: {question}\n\n답변:"
)
qa_chain = qa_prompt | llm | StrOutputParser()

# 노드 정의
def summarize_node(state: DocWorkflowState):
    summary = summary_chain.invoke({"document": state["document"]})
    return {"summary": summary, "current_mode": "qa"}

def ask_question_node(state: DocWorkflowState):
    # Human-in-the-Loop: 사용자에게 질문을 받음
    user_input = interrupt({
        "message": "문서에 대해 궁금한 점을 질문하세요. '종료'를 입력하면 끝납니다.",
        "summary": state["summary"]
    })
    if user_input == "종료":
        return {"current_mode": "done"}
    return {"questions": [user_input], "current_mode": "qa"}

def answer_node(state: DocWorkflowState):
    question = state["questions"][-1]
    answer = qa_chain.invoke({
        "document": state["document"],
        "summary": state["summary"],
        "question": question,
    })
    return {"answers": [answer]}

# 라우팅 함수
def route_after_answer(state: DocWorkflowState) -> str:
    return state["current_mode"]

def route_after_question(state: DocWorkflowState) -> str:
    if state["current_mode"] == "done":
        return "done"
    return "answer"

# 그래프 구성
workflow = StateGraph(DocWorkflowState)
workflow.add_node("summarize", summarize_node)
workflow.add_node("ask_question", ask_question_node)
workflow.add_node("answer", answer_node)

workflow.add_edge(START, "summarize")
workflow.add_edge("summarize", "ask_question")

workflow.add_conditional_edges(
    "ask_question",
    route_after_question,
    {"answer": "answer", "done": END}
)

# 답변 후 다시 질문 단계로 돌아감 (순환 구조)
workflow.add_edge("answer", "ask_question")

# 컴파일
checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

# 실행
config = {"configurable": {"thread_id": "doc-workflow-1"}}

# 1단계: 문서 전달 -> 요약 생성 -> 질문 대기 (interrupt)
result = graph.invoke(
    {
        "document": "LangGraph는 LangChain 팀이 개발한 상태 기반 에이전트 프레임워크로...",
        "summary": "",
        "questions": [],
        "answers": [],
        "current_mode": "summarize",
    },
    config
)

# 2단계: 사용자 질문 -> 답변 -> 다시 질문 대기
result = graph.invoke(Command(resume="LangGraph의 주요 장점은?"), config)

# 3단계: 추가 질문
result = graph.invoke(Command(resume="LCEL과의 차이점은?"), config)

# 4단계: 종료
result = graph.invoke(Command(resume="종료"), config)

이 워크플로우는 다음과 같은 LangGraph의 핵심 기능을 모두 활용한다.

  • StateGraph: 전체 워크플로우의 상태 관리
  • Conditional Edge: 사용자 입력에 따른 동적 분기
  • 순환 구조(Cycle): 답변 후 다시 질문으로 돌아가는 반복 구조
  • Checkpointer: 대화 상태의 영속적 저장
  • interrupt / Command: Human-in-the-Loop 패턴
  • LCEL 체인: 요약 및 QA를 위한 LLM 체인

이처럼 LCEL로 개별 LLM 체인을 구성하고, LangGraph로 전체 워크플로우의 흐름과 상태를 관리하는 것이 LangChain 생태계에서 권장하는 아키텍처 패턴이다.


13. References

Advanced LangChain Patterns: Mastering LCEL and LangGraph

1. What Is LCEL? Pipe Operator and Runnable Interface

LCEL (LangChain Expression Language) is a declarative expression language for composing chains in LangChain. It replaces legacy approaches such as LLMChain and SequentialChain, and has established itself as the recommended pattern for building chains in the official LangChain documentation.

The core idea of LCEL is connecting multiple components through the Pipe operator |. By overloading Python's __or__ magic method, it creates an intuitive structure where data flows from left to right, similar to Unix pipelines.

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

prompt = ChatPromptTemplate.from_template("'{topic}'에 대해 간략히 설명해주세요.")
model = ChatOpenAI(model="gpt-4o")
output_parser = StrOutputParser()

# Composing a chain with the LCEL Pipe operator
chain = prompt | model | output_parser

result = chain.invoke({"topic": "양자 컴퓨팅"})
print(result)

In the code above, prompt | model | output_parser internally creates a RunnableSequence. Each component implements the Runnable interface, which is the foundational protocol underlying all LCEL components.

Key Methods of the Runnable Interface

MethodDescriptionReturn Type
invoke(input)Synchronous execution for a single inputSingle output
ainvoke(input)Asynchronous execution for a single inputSingle output (awaitable)
batch(inputs)Parallel execution for multiple inputsList
abatch(inputs)Async parallel execution for multiple inputsList (awaitable)
stream(input)Streaming execution for a single inputIterator
astream(input)Async streaming for a single inputAsyncIterator

Since all LangChain components -- ChatModel, Retriever, OutputParser, Tool, etc. -- implement this Runnable interface, any two components can be connected using the | operator. This is the foundation of LCEL's composability.


2. Detailed Analysis of Core Runnable Types

Beyond simple sequential chains, LCEL provides Runnable types that support various patterns including parallel execution, conditional branching, input passthrough, and custom function wrapping.

2.1 RunnableParallel

RunnableParallel executes multiple Runnables in parallel and returns their results combined into a dictionary. In LCEL, using a dictionary literal automatically converts it into a RunnableParallel.

from langchain_core.runnables import RunnableParallel

# Method 1: Dictionary literal (automatic conversion)
chain = {
    "summary": prompt_summary | model | output_parser,
    "keywords": prompt_keywords | model | output_parser,
} | combine_results

# Method 2: Explicit RunnableParallel
parallel = RunnableParallel(
    summary=prompt_summary | model | output_parser,
    keywords=prompt_keywords | model | output_parser,
)
result = parallel.invoke({"text": "LangChain은 LLM 애플리케이션 프레임워크입니다."})
# result = {"summary": "...", "keywords": "..."}

RunnableParallel internally executes each branch concurrently, making it effective for parallelizing independent LLM calls to reduce overall latency.

2.2 RunnableBranch

RunnableBranch is a routing mechanism that executes different Runnables based on conditions. It takes a list of (condition, runnable) tuples and a default Runnable as arguments.

from langchain_core.runnables import RunnableBranch

branch = RunnableBranch(
    (lambda x: "코드" in x["topic"], code_chain),
    (lambda x: "수학" in x["topic"], math_chain),
    general_chain,  # default
)

# If "코드" is in the topic, code_chain runs; if "수학", math_chain runs; otherwise general_chain
result = branch.invoke({"topic": "코드 리팩토링 방법"})

Condition functions are evaluated in order, and the Runnable corresponding to the first condition that returns True is executed. If no condition is satisfied, the default Runnable is executed.

2.3 RunnablePassthrough

RunnablePassthrough passes the input as-is to the next step. It is primarily used with RunnableParallel to preserve the original input while simultaneously performing other processing.

from langchain_core.runnables import RunnablePassthrough

chain = RunnableParallel(
    context=retriever,                    # Search results
    question=RunnablePassthrough(),       # Pass the original question as-is
) | prompt | model | output_parser

Using RunnablePassthrough.assign(), you can add new key-value pairs to the existing input.

chain = RunnablePassthrough.assign(
    context=lambda x: retriever.invoke(x["question"])
)
# Input: {"question": "LangChain이란?"}
# Output: {"question": "LangChain이란?", "context": [Document(...), ...]}

2.4 RunnableLambda

RunnableLambda wraps a regular Python function as a Runnable. This allows you to insert arbitrary logic into an LCEL chain.

from langchain_core.runnables import RunnableLambda

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# Convert function to Runnable
format_runnable = RunnableLambda(format_docs)

# Use in a chain
chain = retriever | format_runnable | prompt | model | output_parser

Using the @chain decorator allows for even more concise expression.

from langchain_core.runnables import chain

@chain
def format_and_classify(input_dict):
    text = input_dict["text"]
    formatted = text.strip().lower()
    category = "technical" if "api" in formatted else "general"
    return {"formatted": formatted, "category": category}

3. Analysis of invoke, batch, stream, and ainvoke Methods

Each method of the Runnable interface supports different execution patterns. Let us examine the behavior of each method as defined in the official documentation.

invoke -- Single Synchronous Execution

This is the most basic execution method. It takes a single input and returns a single output.

result = chain.invoke({"topic": "머신러닝"})

batch -- Parallel Batch Execution

When multiple inputs are passed as a list, they are processed in parallel internally and a list of results is returned. You can limit the number of concurrent executions with the max_concurrency parameter.

results = chain.batch(
    [{"topic": "AI"}, {"topic": "블록체인"}, {"topic": "양자컴퓨팅"}],
    config={"max_concurrency": 2}
)
# results = ["AI는...", "블록체인은...", "양자컴퓨팅은..."]

stream -- Real-time Streaming

Streams LLM responses token by token. This is a key feature that significantly improves user experience.

for chunk in chain.stream({"topic": "딥러닝"}):
    print(chunk, end="", flush=True)

ainvoke / astream -- Asynchronous Execution

These are asynchronous versions that can be used in FastAPI and asyncio environments. They are used with the await keyword.

import asyncio

async def main():
    result = await chain.ainvoke({"topic": "강화학습"})
    print(result)

    async for chunk in chain.astream({"topic": "강화학습"}):
        print(chunk, end="", flush=True)

asyncio.run(main())

4. Streaming Implementation: astream_events, astream_log

While a simple stream() only streams the final output, in complex chains you may want to monitor intermediate step results in real time. LangChain provides astream_events and astream_log for this purpose.

astream_events

astream_events streams all events that occur during chain execution as StreamEvent objects. Each event includes the event type (on_chain_start, on_llm_stream, on_chain_end, etc.), event name, and data.

async for event in chain.astream_events(
    {"topic": "트랜스포머"},
    version="v2"
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        # LLM token streaming
        print(event["data"]["chunk"].content, end="", flush=True)
    elif kind == "on_chain_start":
        print(f"\n--- Chain started: {event['name']} ---")
    elif kind == "on_chain_end":
        print(f"\n--- Chain ended: {event['name']} ---")

astream_events is extremely powerful in that it can stream intermediate results even when intermediate steps only operate on the final input. For example, in a RAG chain, you can monitor in real time the process of the Retriever fetching documents, the Prompt being generated, and the LLM responding.

astream_log

astream_log streams execution process logs in JSON Patch format. You can filter logs for specific components using the include_names or include_tags parameters.

async for log_patch in chain.astream_log(
    {"topic": "어텐션 메커니즘"},
    include_names=["ChatOpenAI"],
):
    for op in log_patch.ops:
        print(op)

Generally, astream_events is more intuitive and convenient to use, so the official documentation recommends astream_events first.


5. Building a RAG Chain with LCEL -- Example

The true power of LCEL shines in real-world patterns. Here is how to implement a RAG (Retrieval-Augmented Generation) chain with LCEL.

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_community.vectorstores import FAISS

# 1. Vector store and Retriever setup
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = FAISS.from_texts(
    ["LangChain은 LLM 애플리케이션 프레임워크다.",
     "LCEL은 LangChain Expression Language의 약자이다.",
     "LangGraph는 상태 기반 에이전트 프레임워크다."],
    embeddings
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

# 2. Document formatting function
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# 3. Prompt template
prompt = ChatPromptTemplate.from_template("""
다음 컨텍스트를 기반으로 질문에 답변하세요.

컨텍스트:
{context}

질문: {question}

답변:
""")

# 4. Model and parser
model = ChatOpenAI(model="gpt-4o", temperature=0)
output_parser = StrOutputParser()

# 5. LCEL RAG chain composition
rag_chain = (
    RunnablePassthrough.assign(
        context=lambda x: format_docs(retriever.invoke(x["question"]))
    )
    | prompt
    | model
    | output_parser
)

# 6. Execute
result = rag_chain.invoke({"question": "LCEL이 무엇인가요?"})
print(result)

This chain operates with the following flow:

  1. The input {"question": "LCEL이 무엇인가요?"} is passed to RunnablePassthrough.assign().
  2. The question is preserved as-is, and context is assigned the formatted string of documents retrieved by the Retriever.
  3. The prompt combines context and question to generate the prompt.
  4. The model processes the prompt, and the output_parser extracts the final string.

6. LangGraph Basics: StateGraph, Node, Edge

LangGraph is a state-based graph execution framework created by the LangChain team, designed for implementing complex agent workflows. While LCEL excels at linear chains, LangGraph is suited for scenarios requiring cyclic structures, conditional branching, and state management.

Core Concepts

  • StateGraph: A graph where nodes communicate through shared state. It is initialized with a state definition (TypedDict) as an argument.
  • Node: A function that takes state as input and returns state updates. Registered to the graph with add_node().
  • Edge: Connections between nodes. Defined with add_edge(), they determine the execution flow.
  • START / END: Special nodes representing the entry and exit points of the graph.
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from operator import add

# 1. State definition
class AgentState(TypedDict):
    messages: Annotated[list[str], add]   # add reducer: accumulates lists
    current_step: str

# 2. Node function definitions
def analyze_node(state: AgentState):
    return {
        "messages": ["분석 완료: 데이터를 처리했습니다."],
        "current_step": "analyze"
    }

def summarize_node(state: AgentState):
    return {
        "messages": ["요약 완료: 결과를 정리했습니다."],
        "current_step": "summarize"
    }

# 3. Graph construction
workflow = StateGraph(AgentState)
workflow.add_node("analyze", analyze_node)
workflow.add_node("summarize", summarize_node)

# 4. Edge definition
workflow.add_edge(START, "analyze")
workflow.add_edge("analyze", "summarize")
workflow.add_edge("summarize", END)

# 5. Compile and execute
graph = workflow.compile()
result = graph.invoke({
    "messages": ["시작"],
    "current_step": ""
})
print(result)
# {"messages": ["시작", "분석 완료: ...", "요약 완료: ..."], "current_step": "summarize"}

In the state definition, Annotated[list[str], add] specifies a reducer function. The add reducer accumulates (appends) the list returned by a node to the existing list. Without a reducer, existing values are overwritten.

The graph must be converted to a CompiledStateGraph by calling .compile() before it can be executed. The compiled graph implements LCEL's Runnable interface, so invoke(), stream(), ainvoke(), and other methods are all available.


7. Branching with Conditional Edges (Router Pattern)

LangGraph's add_conditional_edges() is a mechanism that dynamically determines which node to execute next based on a node's execution result. This is a core feature for implementing agent decision-making logic.

from langgraph.graph import StateGraph, START, END

class RouterState(TypedDict):
    query: str
    category: str
    result: str

def classify_node(state: RouterState):
    query = state["query"]
    if "코드" in query or "프로그래밍" in query:
        category = "code"
    elif "수학" in query or "계산" in query:
        category = "math"
    else:
        category = "general"
    return {"category": category}

def code_expert(state: RouterState):
    return {"result": f"코드 전문가가 답변합니다: {state['query']}"}

def math_expert(state: RouterState):
    return {"result": f"수학 전문가가 답변합니다: {state['query']}"}

def general_expert(state: RouterState):
    return {"result": f"일반 전문가가 답변합니다: {state['query']}"}

# Routing function
def route_query(state: RouterState) -> str:
    return state["category"]

# Graph construction
workflow = StateGraph(RouterState)
workflow.add_node("classify", classify_node)
workflow.add_node("code", code_expert)
workflow.add_node("math", math_expert)
workflow.add_node("general", general_expert)

workflow.add_edge(START, "classify")

# Conditional Edge: routing after the classify node
workflow.add_conditional_edges(
    "classify",           # Source node
    route_query,          # Routing function
    {                     # Return value to target node mapping
        "code": "code",
        "math": "math",
        "general": "general",
    }
)

workflow.add_edge("code", END)
workflow.add_edge("math", END)
workflow.add_edge("general", END)

graph = workflow.compile()
result = graph.invoke({"query": "Python 코드 리팩토링 방법", "category": "", "result": ""})
print(result["result"])  # "코드 전문가가 답변합니다: ..."

add_conditional_edges() takes three arguments:

  1. source: The name of the node where branching begins
  2. path: A routing function that takes the state and returns the next node name (or key)
  3. path_map (optional): A dictionary that maps the routing function's return values to actual node names

If the string returned by the routing function is not in the path_map, a runtime error occurs, so all possible cases must be mapped without omission.


8. Checkpointing and State Persistence

LangGraph's Checkpointer is a persistence layer that durably stores graph execution state. With a Checkpointer, state snapshots are saved at each execution step (superstep), allowing you to pause execution and resume later, or revert to a specific point in time.

Thread and Checkpoint

  • Thread: A unique identifier for storing and retrieving state. Specified through thread_id.
  • Checkpoint: A snapshot of the graph state at a specific point in time. Represented as a StateSnapshot object.
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from typing import TypedDict, Annotated
from operator import add

class ConversationState(TypedDict):
    messages: Annotated[list[str], add]

def chat_node(state: ConversationState):
    last_message = state["messages"][-1]
    response = f"'{last_message}'에 대한 응답입니다."
    return {"messages": [response]}

workflow = StateGraph(ConversationState)
workflow.add_node("chat", chat_node)
workflow.add_edge(START, "chat")
workflow.add_edge("chat", END)

# Apply Checkpointer
checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

# Execute with a Thread ID
config = {"configurable": {"thread_id": "user-session-1"}}

# First conversation
graph.invoke({"messages": ["안녕하세요"]}, config)

# Second conversation (same thread_id -- previous state is preserved)
result = graph.invoke({"messages": ["LangGraph에 대해 알려주세요"]}, config)
print(result["messages"])
# ["안녕하세요", "'안녕하세요'에 대한 응답입니다.",
#  "LangGraph에 대해 알려주세요", "'LangGraph에 대해 알려주세요'에 대한 응답입니다."]

State Inspection and Time Travel

# Inspect current state
snapshot = graph.get_state(config)
print(snapshot.values)   # Current state values
print(snapshot.next)     # Next node to be executed

# Inspect state history
for state in graph.get_state_history(config):
    print(state.config["configurable"]["checkpoint_id"], state.values)

You can also fork execution by reverting to a specific checkpoint.

# Re-execute from a specific checkpoint
fork_config = {
    "configurable": {
        "thread_id": "user-session-1",
        "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"
    }
}
graph.invoke(None, config=fork_config)

Types of Checkpointers

ImplementationUse CasePackage
InMemorySaverDevelopment/TestingIncluded in langgraph by default
SqliteSaverLocal workflowslanggraph-checkpoint-sqlite
PostgresSaverProduction deploymentlanggraph-checkpoint-postgres

In production environments, you must use a durable checkpointer such as PostgresSaver.


9. Human-in-the-Loop Pattern (interrupt, resume)

LangGraph's interrupt feature supports a pattern where graph execution is paused at a specific point, waits for human input, and then resumes. A Checkpointer must be configured for this to work.

The interrupt Function

The interrupt() function, when called inside a node, immediately halts graph execution and returns the passed value to the caller. When execution is resumed with Command(resume=value), it returns to the interrupt() call point and the resume value is returned.

from langgraph.types import interrupt, Command

class ApprovalState(TypedDict):
    action: str
    approved: bool
    result: str

def plan_node(state: ApprovalState):
    return {"action": "중요 데이터베이스 마이그레이션 실행"}

def approval_node(state: ApprovalState):
    # Pause execution -- wait for human approval
    response = interrupt({
        "question": "다음 작업을 승인하시겠습니까?",
        "action": state["action"]
    })
    return {"approved": response == "승인"}

def execute_node(state: ApprovalState):
    if state["approved"]:
        return {"result": "작업이 성공적으로 실행되었습니다."}
    return {"result": "작업이 취소되었습니다."}

workflow = StateGraph(ApprovalState)
workflow.add_node("plan", plan_node)
workflow.add_node("approval", approval_node)
workflow.add_node("execute", execute_node)

workflow.add_edge(START, "plan")
workflow.add_edge("plan", "approval")
workflow.add_edge("approval", "execute")
workflow.add_edge("execute", END)

checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "migration-1"}}

# Step 1: Execute graph -> pause at approval node
result = graph.invoke(
    {"action": "", "approved": False, "result": ""},
    config
)
# result contains __interrupt__ information

# Step 2: Human reviews and approves
final_result = graph.invoke(
    Command(resume="승인"),
    config
)
print(final_result["result"])  # "작업이 성공적으로 실행되었습니다."

Important Notes When Using interrupt

These are important rules emphasized in the official documentation.

  1. Do not wrap in try/except: Since interrupt() internally raises an exception to halt execution, a bare try/except will catch it and prevent normal operation.
  2. Do not change the order of interrupt calls: Resume uses index-based matching, so the interrupt order must not vary based on conditions.
  3. Do not perform non-idempotent operations before interrupt: When resuming, the node is re-executed from the beginning, so code before the interrupt is also re-executed.
  4. Only pass JSON-serializable values: Functions, class instances, etc. cannot be passed.

10. Building Multi-Agent Systems with LangGraph

LangGraph is well-suited for building Multi-Agent systems where multiple agents collaborate. The most representative pattern is the Supervisor pattern, where a central Supervisor agent distributes tasks and aggregates results.

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated, Literal
from operator import add

class MultiAgentState(TypedDict):
    query: str
    messages: Annotated[list[str], add]
    next_agent: str
    final_answer: str

# Supervisor: decides which agent to assign the task to
def supervisor_node(state: MultiAgentState):
    query = state["query"].lower()
    if "검색" in query or "찾아" in query:
        return {"next_agent": "researcher", "messages": ["Supervisor: 연구 에이전트에게 할당"]}
    elif "작성" in query or "써줘" in query:
        return {"next_agent": "writer", "messages": ["Supervisor: 작성 에이전트에게 할당"]}
    else:
        return {"next_agent": "analyst", "messages": ["Supervisor: 분석 에이전트에게 할당"]}

# Specialized agents
def researcher_node(state: MultiAgentState):
    return {
        "messages": ["Researcher: 관련 자료를 검색하여 수집했습니다."],
        "final_answer": f"검색 결과: '{state['query']}'에 대한 자료를 찾았습니다."
    }

def writer_node(state: MultiAgentState):
    return {
        "messages": ["Writer: 요청에 맞게 콘텐츠를 작성했습니다."],
        "final_answer": f"작성 결과: '{state['query']}'에 대한 글을 작성했습니다."
    }

def analyst_node(state: MultiAgentState):
    return {
        "messages": ["Analyst: 데이터를 분석하여 인사이트를 도출했습니다."],
        "final_answer": f"분석 결과: '{state['query']}'에 대한 분석을 완료했습니다."
    }

def route_to_agent(state: MultiAgentState) -> str:
    return state["next_agent"]

# Graph construction
workflow = StateGraph(MultiAgentState)
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("researcher", researcher_node)
workflow.add_node("writer", writer_node)
workflow.add_node("analyst", analyst_node)

workflow.add_edge(START, "supervisor")
workflow.add_conditional_edges(
    "supervisor",
    route_to_agent,
    {
        "researcher": "researcher",
        "writer": "writer",
        "analyst": "analyst",
    }
)
workflow.add_edge("researcher", END)
workflow.add_edge("writer", END)
workflow.add_edge("analyst", END)

graph = workflow.compile()

result = graph.invoke({
    "query": "최신 AI 트렌드를 검색해줘",
    "messages": [],
    "next_agent": "",
    "final_answer": ""
})
print(result["final_answer"])

The LangChain team also provides a separate langgraph-supervisor library for implementing the Supervisor pattern more concisely. However, the official documentation notes that building directly with StateGraph enables finer-grained control over context engineering.

Extended Multi-Agent Patterns

  • Hierarchical structure: Placing Sub-Supervisors under a Supervisor to organize large-scale agent organizations
  • Tool sharing: Multiple agents share common tools while also maintaining their own specialized tools
  • Memory sharing: Sharing long-term memory between agents through the Store interface

11. Introduction to LangGraph Studio

LangGraph Studio is the first agent IDE (Integrated Development Environment) developed by the LangChain team. It is a desktop application that lets you visually inspect agent workflows built with LangGraph, interact with them in real time, and debug them.

Key Features

  • Visual graph rendering: Visually represents StateGraph nodes, edges, and conditional branches
  • Real-time execution tracking: Observe each node's input/output and state changes in real time
  • Interactive debugging: Pause execution at a specific node, modify the state, and re-execute
  • Human-in-the-Loop testing: Provide direct input at interrupt points to verify workflows
  • LangSmith integration: Integration with Tracing, Evaluation, and Prompt Engineering

Configuration

To use LangGraph Studio, a langgraph.json configuration file is required at the project root.

{
  "graphs": {
    "my_agent": "./agent.py:graph"
  },
  "dependencies": ["langchain", "langgraph", "langchain-openai"],
  "env": ".env"
}

This file specifies the location of the agent graph, required dependencies, and the environment variables file. LangGraph Studio is currently available on Apple Silicon Macs and can be accessed with a LangSmith account (including free tier).


12. Practical Example: Document Summarization + QA Composite Workflow

Finally, let us build a practical workflow that combines LCEL and LangGraph. This workflow receives a document, summarizes it, and then answers user questions about it.

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import TypedDict, Annotated
from operator import add

# State definition
class DocWorkflowState(TypedDict):
    document: str
    summary: str
    questions: Annotated[list[str], add]
    answers: Annotated[list[str], add]
    current_mode: str  # "summarize" | "qa" | "done"

# LLM and chain preparation
llm = ChatOpenAI(model="gpt-4o", temperature=0)

summary_prompt = ChatPromptTemplate.from_template(
    "다음 문서를 3줄로 요약하세요:\n\n{document}"
)
summary_chain = summary_prompt | llm | StrOutputParser()

qa_prompt = ChatPromptTemplate.from_template(
    "다음 문서를 참고하여 질문에 답변하세요.\n\n"
    "문서:\n{document}\n\n"
    "요약:\n{summary}\n\n"
    "질문: {question}\n\n답변:"
)
qa_chain = qa_prompt | llm | StrOutputParser()

# Node definitions
def summarize_node(state: DocWorkflowState):
    summary = summary_chain.invoke({"document": state["document"]})
    return {"summary": summary, "current_mode": "qa"}

def ask_question_node(state: DocWorkflowState):
    # Human-in-the-Loop: receive a question from the user
    user_input = interrupt({
        "message": "문서에 대해 궁금한 점을 질문하세요. '종료'를 입력하면 끝납니다.",
        "summary": state["summary"]
    })
    if user_input == "종료":
        return {"current_mode": "done"}
    return {"questions": [user_input], "current_mode": "qa"}

def answer_node(state: DocWorkflowState):
    question = state["questions"][-1]
    answer = qa_chain.invoke({
        "document": state["document"],
        "summary": state["summary"],
        "question": question,
    })
    return {"answers": [answer]}

# Routing functions
def route_after_answer(state: DocWorkflowState) -> str:
    return state["current_mode"]

def route_after_question(state: DocWorkflowState) -> str:
    if state["current_mode"] == "done":
        return "done"
    return "answer"

# Graph construction
workflow = StateGraph(DocWorkflowState)
workflow.add_node("summarize", summarize_node)
workflow.add_node("ask_question", ask_question_node)
workflow.add_node("answer", answer_node)

workflow.add_edge(START, "summarize")
workflow.add_edge("summarize", "ask_question")

workflow.add_conditional_edges(
    "ask_question",
    route_after_question,
    {"answer": "answer", "done": END}
)

# After answering, loop back to the question step (cyclic structure)
workflow.add_edge("answer", "ask_question")

# Compile
checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

# Execute
config = {"configurable": {"thread_id": "doc-workflow-1"}}

# Step 1: Pass document -> generate summary -> wait for question (interrupt)
result = graph.invoke(
    {
        "document": "LangGraph는 LangChain 팀이 개발한 상태 기반 에이전트 프레임워크로...",
        "summary": "",
        "questions": [],
        "answers": [],
        "current_mode": "summarize",
    },
    config
)

# Step 2: User question -> answer -> wait for next question
result = graph.invoke(Command(resume="LangGraph의 주요 장점은?"), config)

# Step 3: Additional question
result = graph.invoke(Command(resume="LCEL과의 차이점은?"), config)

# Step 4: Terminate
result = graph.invoke(Command(resume="종료"), config)

This workflow leverages all of the following core LangGraph features:

  • StateGraph: State management for the entire workflow
  • Conditional Edge: Dynamic branching based on user input
  • Cyclic structure: A loop that returns to the question step after answering
  • Checkpointer: Persistent storage of conversation state
  • interrupt / Command: Human-in-the-Loop pattern
  • LCEL chains: LLM chains for summarization and QA

This is the architecture pattern recommended in the LangChain ecosystem: compose individual LLM chains with LCEL, and manage the overall workflow flow and state with LangGraph.


13. References

Quiz

Q1: What is the main topic covered in "Advanced LangChain Patterns: Mastering LCEL and LangGraph"?

A systematic analysis of the LCEL (LangChain Expression Language) Runnable interface and LangGraph StateGraph, based on the official LangChain documentation.

Q2: What Is LCEL? Pipe Operator and Runnable Interface? LCEL (LangChain Expression Language) is a declarative expression language for composing chains in LangChain. It replaces legacy approaches such as LLMChain and SequentialChain, and has established itself as the recommended pattern for building chains in the official LangChain doc...

Q3: Explain the core concept of Detailed Analysis of Core Runnable Types. Beyond simple sequential chains, LCEL provides Runnable types that support various patterns including parallel execution, conditional branching, input passthrough, and custom function wrapping.

Q4: What are the key aspects of Analysis of invoke, batch, stream, and ainvoke Methods?

Each method of the Runnable interface supports different execution patterns. Let us examine the behavior of each method as defined in the official documentation. invoke -- Single Synchronous Execution This is the most basic execution method.

Q5: How does Streaming Implementation: astreamevents, astreamlog work? While a simple stream() only streams the final output, in complex chains you may want to monitor intermediate step results in real time. LangChain provides astream_events and astream_log for this purpose.