Skip to content
Published on

Advanced LangChain Patterns: Mastering LCEL and LangGraph

Authors
  • Name
    Twitter

1. What Is LCEL? Pipe Operator and Runnable Interface

LCEL (LangChain Expression Language) is a declarative expression language for composing chains in LangChain. It replaces legacy approaches such as LLMChain and SequentialChain, and has established itself as the recommended pattern for building chains in the official LangChain documentation.

The core idea of LCEL is connecting multiple components through the Pipe operator |. By overloading Python's __or__ magic method, it creates an intuitive structure where data flows from left to right, similar to Unix pipelines.

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

prompt = ChatPromptTemplate.from_template("'{topic}'에 대해 간략히 설명해주세요.")
model = ChatOpenAI(model="gpt-4o")
output_parser = StrOutputParser()

# Composing a chain with the LCEL Pipe operator
chain = prompt | model | output_parser

result = chain.invoke({"topic": "양자 컴퓨팅"})
print(result)

In the code above, prompt | model | output_parser internally creates a RunnableSequence. Each component implements the Runnable interface, which is the foundational protocol underlying all LCEL components.

Key Methods of the Runnable Interface

MethodDescriptionReturn Type
invoke(input)Synchronous execution for a single inputSingle output
ainvoke(input)Asynchronous execution for a single inputSingle output (awaitable)
batch(inputs)Parallel execution for multiple inputsList
abatch(inputs)Async parallel execution for multiple inputsList (awaitable)
stream(input)Streaming execution for a single inputIterator
astream(input)Async streaming for a single inputAsyncIterator

Since all LangChain components -- ChatModel, Retriever, OutputParser, Tool, etc. -- implement this Runnable interface, any two components can be connected using the | operator. This is the foundation of LCEL's composability.


2. Detailed Analysis of Core Runnable Types

Beyond simple sequential chains, LCEL provides Runnable types that support various patterns including parallel execution, conditional branching, input passthrough, and custom function wrapping.

2.1 RunnableParallel

RunnableParallel executes multiple Runnables in parallel and returns their results combined into a dictionary. In LCEL, using a dictionary literal automatically converts it into a RunnableParallel.

from langchain_core.runnables import RunnableParallel

# Method 1: Dictionary literal (automatic conversion)
chain = {
    "summary": prompt_summary | model | output_parser,
    "keywords": prompt_keywords | model | output_parser,
} | combine_results

# Method 2: Explicit RunnableParallel
parallel = RunnableParallel(
    summary=prompt_summary | model | output_parser,
    keywords=prompt_keywords | model | output_parser,
)
result = parallel.invoke({"text": "LangChain은 LLM 애플리케이션 프레임워크입니다."})
# result = {"summary": "...", "keywords": "..."}

RunnableParallel internally executes each branch concurrently, making it effective for parallelizing independent LLM calls to reduce overall latency.

2.2 RunnableBranch

RunnableBranch is a routing mechanism that executes different Runnables based on conditions. It takes a list of (condition, runnable) tuples and a default Runnable as arguments.

from langchain_core.runnables import RunnableBranch

branch = RunnableBranch(
    (lambda x: "코드" in x["topic"], code_chain),
    (lambda x: "수학" in x["topic"], math_chain),
    general_chain,  # default
)

# If "코드" is in the topic, code_chain runs; if "수학", math_chain runs; otherwise general_chain
result = branch.invoke({"topic": "코드 리팩토링 방법"})

Condition functions are evaluated in order, and the Runnable corresponding to the first condition that returns True is executed. If no condition is satisfied, the default Runnable is executed.

2.3 RunnablePassthrough

RunnablePassthrough passes the input as-is to the next step. It is primarily used with RunnableParallel to preserve the original input while simultaneously performing other processing.

from langchain_core.runnables import RunnablePassthrough

chain = RunnableParallel(
    context=retriever,                    # Search results
    question=RunnablePassthrough(),       # Pass the original question as-is
) | prompt | model | output_parser

Using RunnablePassthrough.assign(), you can add new key-value pairs to the existing input.

chain = RunnablePassthrough.assign(
    context=lambda x: retriever.invoke(x["question"])
)
# Input: {"question": "LangChain이란?"}
# Output: {"question": "LangChain이란?", "context": [Document(...), ...]}

2.4 RunnableLambda

RunnableLambda wraps a regular Python function as a Runnable. This allows you to insert arbitrary logic into an LCEL chain.

from langchain_core.runnables import RunnableLambda

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# Convert function to Runnable
format_runnable = RunnableLambda(format_docs)

# Use in a chain
chain = retriever | format_runnable | prompt | model | output_parser

Using the @chain decorator allows for even more concise expression.

from langchain_core.runnables import chain

@chain
def format_and_classify(input_dict):
    text = input_dict["text"]
    formatted = text.strip().lower()
    category = "technical" if "api" in formatted else "general"
    return {"formatted": formatted, "category": category}

3. Analysis of invoke, batch, stream, and ainvoke Methods

Each method of the Runnable interface supports different execution patterns. Let us examine the behavior of each method as defined in the official documentation.

invoke -- Single Synchronous Execution

This is the most basic execution method. It takes a single input and returns a single output.

result = chain.invoke({"topic": "머신러닝"})

batch -- Parallel Batch Execution

When multiple inputs are passed as a list, they are processed in parallel internally and a list of results is returned. You can limit the number of concurrent executions with the max_concurrency parameter.

results = chain.batch(
    [{"topic": "AI"}, {"topic": "블록체인"}, {"topic": "양자컴퓨팅"}],
    config={"max_concurrency": 2}
)
# results = ["AI는...", "블록체인은...", "양자컴퓨팅은..."]

stream -- Real-time Streaming

Streams LLM responses token by token. This is a key feature that significantly improves user experience.

for chunk in chain.stream({"topic": "딥러닝"}):
    print(chunk, end="", flush=True)

ainvoke / astream -- Asynchronous Execution

These are asynchronous versions that can be used in FastAPI and asyncio environments. They are used with the await keyword.

import asyncio

async def main():
    result = await chain.ainvoke({"topic": "강화학습"})
    print(result)

    async for chunk in chain.astream({"topic": "강화학습"}):
        print(chunk, end="", flush=True)

asyncio.run(main())

4. Streaming Implementation: astream_events, astream_log

While a simple stream() only streams the final output, in complex chains you may want to monitor intermediate step results in real time. LangChain provides astream_events and astream_log for this purpose.

astream_events

astream_events streams all events that occur during chain execution as StreamEvent objects. Each event includes the event type (on_chain_start, on_llm_stream, on_chain_end, etc.), event name, and data.

async for event in chain.astream_events(
    {"topic": "트랜스포머"},
    version="v2"
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        # LLM token streaming
        print(event["data"]["chunk"].content, end="", flush=True)
    elif kind == "on_chain_start":
        print(f"\n--- Chain started: {event['name']} ---")
    elif kind == "on_chain_end":
        print(f"\n--- Chain ended: {event['name']} ---")

astream_events is extremely powerful in that it can stream intermediate results even when intermediate steps only operate on the final input. For example, in a RAG chain, you can monitor in real time the process of the Retriever fetching documents, the Prompt being generated, and the LLM responding.

astream_log

astream_log streams execution process logs in JSON Patch format. You can filter logs for specific components using the include_names or include_tags parameters.

async for log_patch in chain.astream_log(
    {"topic": "어텐션 메커니즘"},
    include_names=["ChatOpenAI"],
):
    for op in log_patch.ops:
        print(op)

Generally, astream_events is more intuitive and convenient to use, so the official documentation recommends astream_events first.


5. Building a RAG Chain with LCEL -- Example

The true power of LCEL shines in real-world patterns. Here is how to implement a RAG (Retrieval-Augmented Generation) chain with LCEL.

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_community.vectorstores import FAISS

# 1. Vector store and Retriever setup
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = FAISS.from_texts(
    ["LangChain은 LLM 애플리케이션 프레임워크다.",
     "LCEL은 LangChain Expression Language의 약자이다.",
     "LangGraph는 상태 기반 에이전트 프레임워크다."],
    embeddings
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

# 2. Document formatting function
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# 3. Prompt template
prompt = ChatPromptTemplate.from_template("""
다음 컨텍스트를 기반으로 질문에 답변하세요.

컨텍스트:
{context}

질문: {question}

답변:
""")

# 4. Model and parser
model = ChatOpenAI(model="gpt-4o", temperature=0)
output_parser = StrOutputParser()

# 5. LCEL RAG chain composition
rag_chain = (
    RunnablePassthrough.assign(
        context=lambda x: format_docs(retriever.invoke(x["question"]))
    )
    | prompt
    | model
    | output_parser
)

# 6. Execute
result = rag_chain.invoke({"question": "LCEL이 무엇인가요?"})
print(result)

This chain operates with the following flow:

  1. The input {"question": "LCEL이 무엇인가요?"} is passed to RunnablePassthrough.assign().
  2. The question is preserved as-is, and context is assigned the formatted string of documents retrieved by the Retriever.
  3. The prompt combines context and question to generate the prompt.
  4. The model processes the prompt, and the output_parser extracts the final string.

6. LangGraph Basics: StateGraph, Node, Edge

LangGraph is a state-based graph execution framework created by the LangChain team, designed for implementing complex agent workflows. While LCEL excels at linear chains, LangGraph is suited for scenarios requiring cyclic structures, conditional branching, and state management.

Core Concepts

  • StateGraph: A graph where nodes communicate through shared state. It is initialized with a state definition (TypedDict) as an argument.
  • Node: A function that takes state as input and returns state updates. Registered to the graph with add_node().
  • Edge: Connections between nodes. Defined with add_edge(), they determine the execution flow.
  • START / END: Special nodes representing the entry and exit points of the graph.
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from operator import add

# 1. State definition
class AgentState(TypedDict):
    messages: Annotated[list[str], add]   # add reducer: accumulates lists
    current_step: str

# 2. Node function definitions
def analyze_node(state: AgentState):
    return {
        "messages": ["분석 완료: 데이터를 처리했습니다."],
        "current_step": "analyze"
    }

def summarize_node(state: AgentState):
    return {
        "messages": ["요약 완료: 결과를 정리했습니다."],
        "current_step": "summarize"
    }

# 3. Graph construction
workflow = StateGraph(AgentState)
workflow.add_node("analyze", analyze_node)
workflow.add_node("summarize", summarize_node)

# 4. Edge definition
workflow.add_edge(START, "analyze")
workflow.add_edge("analyze", "summarize")
workflow.add_edge("summarize", END)

# 5. Compile and execute
graph = workflow.compile()
result = graph.invoke({
    "messages": ["시작"],
    "current_step": ""
})
print(result)
# {"messages": ["시작", "분석 완료: ...", "요약 완료: ..."], "current_step": "summarize"}

In the state definition, Annotated[list[str], add] specifies a reducer function. The add reducer accumulates (appends) the list returned by a node to the existing list. Without a reducer, existing values are overwritten.

The graph must be converted to a CompiledStateGraph by calling .compile() before it can be executed. The compiled graph implements LCEL's Runnable interface, so invoke(), stream(), ainvoke(), and other methods are all available.


7. Branching with Conditional Edges (Router Pattern)

LangGraph's add_conditional_edges() is a mechanism that dynamically determines which node to execute next based on a node's execution result. This is a core feature for implementing agent decision-making logic.

from langgraph.graph import StateGraph, START, END

class RouterState(TypedDict):
    query: str
    category: str
    result: str

def classify_node(state: RouterState):
    query = state["query"]
    if "코드" in query or "프로그래밍" in query:
        category = "code"
    elif "수학" in query or "계산" in query:
        category = "math"
    else:
        category = "general"
    return {"category": category}

def code_expert(state: RouterState):
    return {"result": f"코드 전문가가 답변합니다: {state['query']}"}

def math_expert(state: RouterState):
    return {"result": f"수학 전문가가 답변합니다: {state['query']}"}

def general_expert(state: RouterState):
    return {"result": f"일반 전문가가 답변합니다: {state['query']}"}

# Routing function
def route_query(state: RouterState) -> str:
    return state["category"]

# Graph construction
workflow = StateGraph(RouterState)
workflow.add_node("classify", classify_node)
workflow.add_node("code", code_expert)
workflow.add_node("math", math_expert)
workflow.add_node("general", general_expert)

workflow.add_edge(START, "classify")

# Conditional Edge: routing after the classify node
workflow.add_conditional_edges(
    "classify",           # Source node
    route_query,          # Routing function
    {                     # Return value to target node mapping
        "code": "code",
        "math": "math",
        "general": "general",
    }
)

workflow.add_edge("code", END)
workflow.add_edge("math", END)
workflow.add_edge("general", END)

graph = workflow.compile()
result = graph.invoke({"query": "Python 코드 리팩토링 방법", "category": "", "result": ""})
print(result["result"])  # "코드 전문가가 답변합니다: ..."

add_conditional_edges() takes three arguments:

  1. source: The name of the node where branching begins
  2. path: A routing function that takes the state and returns the next node name (or key)
  3. path_map (optional): A dictionary that maps the routing function's return values to actual node names

If the string returned by the routing function is not in the path_map, a runtime error occurs, so all possible cases must be mapped without omission.


8. Checkpointing and State Persistence

LangGraph's Checkpointer is a persistence layer that durably stores graph execution state. With a Checkpointer, state snapshots are saved at each execution step (superstep), allowing you to pause execution and resume later, or revert to a specific point in time.

Thread and Checkpoint

  • Thread: A unique identifier for storing and retrieving state. Specified through thread_id.
  • Checkpoint: A snapshot of the graph state at a specific point in time. Represented as a StateSnapshot object.
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from typing import TypedDict, Annotated
from operator import add

class ConversationState(TypedDict):
    messages: Annotated[list[str], add]

def chat_node(state: ConversationState):
    last_message = state["messages"][-1]
    response = f"'{last_message}'에 대한 응답입니다."
    return {"messages": [response]}

workflow = StateGraph(ConversationState)
workflow.add_node("chat", chat_node)
workflow.add_edge(START, "chat")
workflow.add_edge("chat", END)

# Apply Checkpointer
checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

# Execute with a Thread ID
config = {"configurable": {"thread_id": "user-session-1"}}

# First conversation
graph.invoke({"messages": ["안녕하세요"]}, config)

# Second conversation (same thread_id -- previous state is preserved)
result = graph.invoke({"messages": ["LangGraph에 대해 알려주세요"]}, config)
print(result["messages"])
# ["안녕하세요", "'안녕하세요'에 대한 응답입니다.",
#  "LangGraph에 대해 알려주세요", "'LangGraph에 대해 알려주세요'에 대한 응답입니다."]

State Inspection and Time Travel

# Inspect current state
snapshot = graph.get_state(config)
print(snapshot.values)   # Current state values
print(snapshot.next)     # Next node to be executed

# Inspect state history
for state in graph.get_state_history(config):
    print(state.config["configurable"]["checkpoint_id"], state.values)

You can also fork execution by reverting to a specific checkpoint.

# Re-execute from a specific checkpoint
fork_config = {
    "configurable": {
        "thread_id": "user-session-1",
        "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"
    }
}
graph.invoke(None, config=fork_config)

Types of Checkpointers

ImplementationUse CasePackage
InMemorySaverDevelopment/TestingIncluded in langgraph by default
SqliteSaverLocal workflowslanggraph-checkpoint-sqlite
PostgresSaverProduction deploymentlanggraph-checkpoint-postgres

In production environments, you must use a durable checkpointer such as PostgresSaver.


9. Human-in-the-Loop Pattern (interrupt, resume)

LangGraph's interrupt feature supports a pattern where graph execution is paused at a specific point, waits for human input, and then resumes. A Checkpointer must be configured for this to work.

The interrupt Function

The interrupt() function, when called inside a node, immediately halts graph execution and returns the passed value to the caller. When execution is resumed with Command(resume=value), it returns to the interrupt() call point and the resume value is returned.

from langgraph.types import interrupt, Command

class ApprovalState(TypedDict):
    action: str
    approved: bool
    result: str

def plan_node(state: ApprovalState):
    return {"action": "중요 데이터베이스 마이그레이션 실행"}

def approval_node(state: ApprovalState):
    # Pause execution -- wait for human approval
    response = interrupt({
        "question": "다음 작업을 승인하시겠습니까?",
        "action": state["action"]
    })
    return {"approved": response == "승인"}

def execute_node(state: ApprovalState):
    if state["approved"]:
        return {"result": "작업이 성공적으로 실행되었습니다."}
    return {"result": "작업이 취소되었습니다."}

workflow = StateGraph(ApprovalState)
workflow.add_node("plan", plan_node)
workflow.add_node("approval", approval_node)
workflow.add_node("execute", execute_node)

workflow.add_edge(START, "plan")
workflow.add_edge("plan", "approval")
workflow.add_edge("approval", "execute")
workflow.add_edge("execute", END)

checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "migration-1"}}

# Step 1: Execute graph -> pause at approval node
result = graph.invoke(
    {"action": "", "approved": False, "result": ""},
    config
)
# result contains __interrupt__ information

# Step 2: Human reviews and approves
final_result = graph.invoke(
    Command(resume="승인"),
    config
)
print(final_result["result"])  # "작업이 성공적으로 실행되었습니다."

Important Notes When Using interrupt

These are important rules emphasized in the official documentation.

  1. Do not wrap in try/except: Since interrupt() internally raises an exception to halt execution, a bare try/except will catch it and prevent normal operation.
  2. Do not change the order of interrupt calls: Resume uses index-based matching, so the interrupt order must not vary based on conditions.
  3. Do not perform non-idempotent operations before interrupt: When resuming, the node is re-executed from the beginning, so code before the interrupt is also re-executed.
  4. Only pass JSON-serializable values: Functions, class instances, etc. cannot be passed.

10. Building Multi-Agent Systems with LangGraph

LangGraph is well-suited for building Multi-Agent systems where multiple agents collaborate. The most representative pattern is the Supervisor pattern, where a central Supervisor agent distributes tasks and aggregates results.

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated, Literal
from operator import add

class MultiAgentState(TypedDict):
    query: str
    messages: Annotated[list[str], add]
    next_agent: str
    final_answer: str

# Supervisor: decides which agent to assign the task to
def supervisor_node(state: MultiAgentState):
    query = state["query"].lower()
    if "검색" in query or "찾아" in query:
        return {"next_agent": "researcher", "messages": ["Supervisor: 연구 에이전트에게 할당"]}
    elif "작성" in query or "써줘" in query:
        return {"next_agent": "writer", "messages": ["Supervisor: 작성 에이전트에게 할당"]}
    else:
        return {"next_agent": "analyst", "messages": ["Supervisor: 분석 에이전트에게 할당"]}

# Specialized agents
def researcher_node(state: MultiAgentState):
    return {
        "messages": ["Researcher: 관련 자료를 검색하여 수집했습니다."],
        "final_answer": f"검색 결과: '{state['query']}'에 대한 자료를 찾았습니다."
    }

def writer_node(state: MultiAgentState):
    return {
        "messages": ["Writer: 요청에 맞게 콘텐츠를 작성했습니다."],
        "final_answer": f"작성 결과: '{state['query']}'에 대한 글을 작성했습니다."
    }

def analyst_node(state: MultiAgentState):
    return {
        "messages": ["Analyst: 데이터를 분석하여 인사이트를 도출했습니다."],
        "final_answer": f"분석 결과: '{state['query']}'에 대한 분석을 완료했습니다."
    }

def route_to_agent(state: MultiAgentState) -> str:
    return state["next_agent"]

# Graph construction
workflow = StateGraph(MultiAgentState)
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("researcher", researcher_node)
workflow.add_node("writer", writer_node)
workflow.add_node("analyst", analyst_node)

workflow.add_edge(START, "supervisor")
workflow.add_conditional_edges(
    "supervisor",
    route_to_agent,
    {
        "researcher": "researcher",
        "writer": "writer",
        "analyst": "analyst",
    }
)
workflow.add_edge("researcher", END)
workflow.add_edge("writer", END)
workflow.add_edge("analyst", END)

graph = workflow.compile()

result = graph.invoke({
    "query": "최신 AI 트렌드를 검색해줘",
    "messages": [],
    "next_agent": "",
    "final_answer": ""
})
print(result["final_answer"])

The LangChain team also provides a separate langgraph-supervisor library for implementing the Supervisor pattern more concisely. However, the official documentation notes that building directly with StateGraph enables finer-grained control over context engineering.

Extended Multi-Agent Patterns

  • Hierarchical structure: Placing Sub-Supervisors under a Supervisor to organize large-scale agent organizations
  • Tool sharing: Multiple agents share common tools while also maintaining their own specialized tools
  • Memory sharing: Sharing long-term memory between agents through the Store interface

11. Introduction to LangGraph Studio

LangGraph Studio is the first agent IDE (Integrated Development Environment) developed by the LangChain team. It is a desktop application that lets you visually inspect agent workflows built with LangGraph, interact with them in real time, and debug them.

Key Features

  • Visual graph rendering: Visually represents StateGraph nodes, edges, and conditional branches
  • Real-time execution tracking: Observe each node's input/output and state changes in real time
  • Interactive debugging: Pause execution at a specific node, modify the state, and re-execute
  • Human-in-the-Loop testing: Provide direct input at interrupt points to verify workflows
  • LangSmith integration: Integration with Tracing, Evaluation, and Prompt Engineering

Configuration

To use LangGraph Studio, a langgraph.json configuration file is required at the project root.

{
  "graphs": {
    "my_agent": "./agent.py:graph"
  },
  "dependencies": ["langchain", "langgraph", "langchain-openai"],
  "env": ".env"
}

This file specifies the location of the agent graph, required dependencies, and the environment variables file. LangGraph Studio is currently available on Apple Silicon Macs and can be accessed with a LangSmith account (including free tier).


12. Practical Example: Document Summarization + QA Composite Workflow

Finally, let us build a practical workflow that combines LCEL and LangGraph. This workflow receives a document, summarizes it, and then answers user questions about it.

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import TypedDict, Annotated
from operator import add

# State definition
class DocWorkflowState(TypedDict):
    document: str
    summary: str
    questions: Annotated[list[str], add]
    answers: Annotated[list[str], add]
    current_mode: str  # "summarize" | "qa" | "done"

# LLM and chain preparation
llm = ChatOpenAI(model="gpt-4o", temperature=0)

summary_prompt = ChatPromptTemplate.from_template(
    "다음 문서를 3줄로 요약하세요:\n\n{document}"
)
summary_chain = summary_prompt | llm | StrOutputParser()

qa_prompt = ChatPromptTemplate.from_template(
    "다음 문서를 참고하여 질문에 답변하세요.\n\n"
    "문서:\n{document}\n\n"
    "요약:\n{summary}\n\n"
    "질문: {question}\n\n답변:"
)
qa_chain = qa_prompt | llm | StrOutputParser()

# Node definitions
def summarize_node(state: DocWorkflowState):
    summary = summary_chain.invoke({"document": state["document"]})
    return {"summary": summary, "current_mode": "qa"}

def ask_question_node(state: DocWorkflowState):
    # Human-in-the-Loop: receive a question from the user
    user_input = interrupt({
        "message": "문서에 대해 궁금한 점을 질문하세요. '종료'를 입력하면 끝납니다.",
        "summary": state["summary"]
    })
    if user_input == "종료":
        return {"current_mode": "done"}
    return {"questions": [user_input], "current_mode": "qa"}

def answer_node(state: DocWorkflowState):
    question = state["questions"][-1]
    answer = qa_chain.invoke({
        "document": state["document"],
        "summary": state["summary"],
        "question": question,
    })
    return {"answers": [answer]}

# Routing functions
def route_after_answer(state: DocWorkflowState) -> str:
    return state["current_mode"]

def route_after_question(state: DocWorkflowState) -> str:
    if state["current_mode"] == "done":
        return "done"
    return "answer"

# Graph construction
workflow = StateGraph(DocWorkflowState)
workflow.add_node("summarize", summarize_node)
workflow.add_node("ask_question", ask_question_node)
workflow.add_node("answer", answer_node)

workflow.add_edge(START, "summarize")
workflow.add_edge("summarize", "ask_question")

workflow.add_conditional_edges(
    "ask_question",
    route_after_question,
    {"answer": "answer", "done": END}
)

# After answering, loop back to the question step (cyclic structure)
workflow.add_edge("answer", "ask_question")

# Compile
checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

# Execute
config = {"configurable": {"thread_id": "doc-workflow-1"}}

# Step 1: Pass document -> generate summary -> wait for question (interrupt)
result = graph.invoke(
    {
        "document": "LangGraph는 LangChain 팀이 개발한 상태 기반 에이전트 프레임워크로...",
        "summary": "",
        "questions": [],
        "answers": [],
        "current_mode": "summarize",
    },
    config
)

# Step 2: User question -> answer -> wait for next question
result = graph.invoke(Command(resume="LangGraph의 주요 장점은?"), config)

# Step 3: Additional question
result = graph.invoke(Command(resume="LCEL과의 차이점은?"), config)

# Step 4: Terminate
result = graph.invoke(Command(resume="종료"), config)

This workflow leverages all of the following core LangGraph features:

  • StateGraph: State management for the entire workflow
  • Conditional Edge: Dynamic branching based on user input
  • Cyclic structure: A loop that returns to the question step after answering
  • Checkpointer: Persistent storage of conversation state
  • interrupt / Command: Human-in-the-Loop pattern
  • LCEL chains: LLM chains for summarization and QA

This is the architecture pattern recommended in the LangChain ecosystem: compose individual LLM chains with LCEL, and manage the overall workflow flow and state with LangGraph.


13. References