- Authors
- Name
- 1. LCELとは?パイプ演算子とRunnableインターフェース
- 2. コアRunnableタイプの詳細分析
- 3. invoke、batch、stream、ainvokeメソッドの分析
- 4. ストリーミングの実装:astream_events、astream_log
- 5. LCELでRAGチェーンを構築する例
- 6. LangGraphの基礎:StateGraph、Node、Edge
- 7. Conditional Edgeによる分岐処理(ルーターパターン)
- 8. チェックポイントと状態の永続化
- 9. Human-in-the-Loopパターン(interrupt、resume)
- 10. LangGraphでマルチエージェントシステムを構築する
- 11. LangGraph Studioの紹介
- 12. 実践例:ドキュメント要約 + QA複合ワークフロー
- 13. References
1. LCELとは?パイプ演算子とRunnableインターフェース
LCEL(LangChain Expression Language)は、LangChainでチェーンを構成するための宣言的な表現言語である。従来のLLMChain、SequentialChainなどのレガシー方式を置き換え、LangChain公式ドキュメントにおいてチェーン構築の推奨パターンとして定着した。
LCELの核心的なアイデアは、パイプ演算子 | を用いて複数のコンポーネントを接続することである。Pythonの__or__マジックメソッドをオーバーロードし、Unixパイプラインのようにデータが左から右へ流れる直感的な構造を作り出す。
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_template("'{topic}'에 대해 간략히 설명해주세요.")
model = ChatOpenAI(model="gpt-4o")
output_parser = StrOutputParser()
# LCEL パイプ演算子でチェーンを構成
chain = prompt | model | output_parser
result = chain.invoke({"topic": "양자 컴퓨팅"})
print(result)
上記のコードでprompt | model | output_parserは内部的にRunnableSequenceを生成する。各コンポーネントはRunnableインターフェースを実装しており、このインターフェースは全てのLCELコンポーネントの基盤となるプロトコルである。
Runnableインターフェースの主要メソッド
| メソッド | 説明 | 戻り値の型 |
|---|---|---|
invoke(input) | 単一入力に対する同期実行 | 単一出力 |
ainvoke(input) | 単一入力に対する非同期実行 | 単一出力 (awaitable) |
batch(inputs) | 複数入力に対する並列実行 | リスト |
abatch(inputs) | 複数入力に対する非同期並列実行 | リスト (awaitable) |
stream(input) | 単一入力に対するストリーミング実行 | Iterator |
astream(input) | 単一入力に対する非同期ストリーミング | AsyncIterator |
全てのLangChainコンポーネント -- ChatModel、Retriever、OutputParser、Toolなど -- がこのRunnableインターフェースを実装しているため、どの2つのコンポーネントでも|演算子で接続できる。これがLCELの合成可能性(composability)の基盤である。
2. コアRunnableタイプの詳細分析
LCELは単純な順次チェーン以外にも、並列実行、条件分岐、入力パススルー、カスタム関数のラッピングなど、様々なパターンをサポートするRunnableタイプを提供する。
2.1 RunnableParallel
RunnableParallelは複数のRunnableを並列に実行し、各結果をディクショナリ形式にまとめて返す。LCELではディクショナリリテラルを使用すると自動的にRunnableParallelに変換される。
from langchain_core.runnables import RunnableParallel
# 方法1:ディクショナリリテラル(自動変換)
chain = {
"summary": prompt_summary | model | output_parser,
"keywords": prompt_keywords | model | output_parser,
} | combine_results
# 方法2:明示的なRunnableParallel
parallel = RunnableParallel(
summary=prompt_summary | model | output_parser,
keywords=prompt_keywords | model | output_parser,
)
result = parallel.invoke({"text": "LangChain은 LLM 애플리케이션 프레임워크입니다."})
# result = {"summary": "...", "keywords": "..."}
RunnableParallelは内部的に各ブランチを同時に実行するため、互いに独立したLLM呼び出しを並列化して全体のレイテンシを削減するのに効果的である。
2.2 RunnableBranch
RunnableBranchは条件に応じて異なるRunnableを実行するルーティングメカニズムである。(condition, runnable)タプルのリストとデフォルトのRunnableを引数として受け取る。
from langchain_core.runnables import RunnableBranch
branch = RunnableBranch(
(lambda x: "코드" in x["topic"], code_chain),
(lambda x: "수학" in x["topic"], math_chain),
general_chain, # default
)
# "코드"がtopicに含まれればcode_chainを実行、"수학"ならmath_chain、それ以外はgeneral_chain
result = branch.invoke({"topic": "코드 리팩토링 방법"})
条件関数は順番に評価され、最初にTrueを返す条件に対応するRunnableが実行される。どの条件も満たさない場合、デフォルトのRunnableが実行される。
2.3 RunnablePassthrough
RunnablePassthroughは入力をそのまま次のステップに渡す。主にRunnableParallelと組み合わせて使用し、元の入力を保持しながら同時に他の処理を行う場合に便利である。
from langchain_core.runnables import RunnablePassthrough
chain = RunnableParallel(
context=retriever, # 検索結果
question=RunnablePassthrough(), # 元の質問をそのまま渡す
) | prompt | model | output_parser
RunnablePassthrough.assign()を使用すると、既存の入力に新しいキーと値のペアを追加できる。
chain = RunnablePassthrough.assign(
context=lambda x: retriever.invoke(x["question"])
)
# 入力: {"question": "LangChain이란?"}
# 出力: {"question": "LangChain이란?", "context": [Document(...), ...]}
2.4 RunnableLambda
RunnableLambdaは通常のPython関数をRunnableとしてラップする。これにより、任意のロジックをLCELチェーンに挿入できる。
from langchain_core.runnables import RunnableLambda
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
# 関数をRunnableに変換
format_runnable = RunnableLambda(format_docs)
# チェーンで使用
chain = retriever | format_runnable | prompt | model | output_parser
@chainデコレータを使用すると、より簡潔に表現できる。
from langchain_core.runnables import chain
@chain
def format_and_classify(input_dict):
text = input_dict["text"]
formatted = text.strip().lower()
category = "technical" if "api" in formatted else "general"
return {"formatted": formatted, "category": category}
3. invoke、batch、stream、ainvokeメソッドの分析
Runnableインターフェースの各メソッドは、それぞれ異なる実行パターンをサポートする。公式ドキュメントで定義されている各メソッドの動作を見ていこう。
invoke -- 単一同期実行
最も基本的な実行方式である。1つの入力を受け取り、1つの出力を返す。
result = chain.invoke({"topic": "머신러닝"})
batch -- 並列バッチ実行
複数の入力をリストで渡すと、内部的に並列処理されて結果のリストが返される。max_concurrencyパラメータで同時実行数を制限できる。
results = chain.batch(
[{"topic": "AI"}, {"topic": "블록체인"}, {"topic": "양자컴퓨팅"}],
config={"max_concurrency": 2}
)
# results = ["AI는...", "블록체인은...", "양자컴퓨팅은..."]
stream -- リアルタイムストリーミング
LLMの応答をトークン単位でストリーミングする。ユーザー体験を大幅に向上させるコア機能である。
for chunk in chain.stream({"topic": "딥러닝"}):
print(chunk, end="", flush=True)
ainvoke / astream -- 非同期実行
FastAPIやasyncio環境で使用できる非同期バージョンである。awaitキーワードとともに使用する。
import asyncio
async def main():
result = await chain.ainvoke({"topic": "강화학습"})
print(result)
async for chunk in chain.astream({"topic": "강화학습"}):
print(chunk, end="", flush=True)
asyncio.run(main())
4. ストリーミングの実装:astream_events、astream_log
単純なstream()は最終出力のみをストリーミングするが、複雑なチェーンでは中間ステップの結果もリアルタイムで確認したい場合がある。LangChainはこのためにastream_eventsとastream_logを提供している。
astream_events
astream_eventsは、チェーン実行中に発生する全てのイベントをStreamEventオブジェクトとしてストリーミングする。各イベントには、イベントの種類(on_chain_start、on_llm_stream、on_chain_endなど)、イベント名、データが含まれる。
async for event in chain.astream_events(
{"topic": "트랜스포머"},
version="v2"
):
kind = event["event"]
if kind == "on_chat_model_stream":
# LLMトークンストリーミング
print(event["data"]["chunk"].content, end="", flush=True)
elif kind == "on_chain_start":
print(f"\n--- チェーン開始: {event['name']} ---")
elif kind == "on_chain_end":
print(f"\n--- チェーン終了: {event['name']} ---")
astream_eventsは、中間ステップが最終入力でのみ動作する場合でも中間結果をストリーミングできるという点で非常に強力である。例えば、RAGチェーンにおいてRetrieverがドキュメントを取得するプロセス、Promptが生成されるプロセス、LLMが応答するプロセスを全てリアルタイムでモニタリングできる。
astream_log
astream_logは実行プロセスのログをJSON Patch形式でストリーミングする。include_namesやinclude_tagsパラメータを使って、特定のコンポーネントのログのみをフィルタリングできる。
async for log_patch in chain.astream_log(
{"topic": "어텐션 메커니즘"},
include_names=["ChatOpenAI"],
):
for op in log_patch.ops:
print(op)
一般的にastream_eventsの方がより直感的で使いやすいため、公式ドキュメントでもastream_eventsを先に推奨している。
5. LCELでRAGチェーンを構築する例
LCELの真の威力は実践的なパターンで発揮される。RAG(Retrieval-Augmented Generation)チェーンをLCELで実装すると以下のようになる。
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_community.vectorstores import FAISS
# 1. ベクトルストアとRetrieverの構成
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = FAISS.from_texts(
["LangChain은 LLM 애플리케이션 프레임워크다.",
"LCEL은 LangChain Expression Language의 약자이다.",
"LangGraph는 상태 기반 에이전트 프레임워크다."],
embeddings
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
# 2. ドキュメントフォーマット関数
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
# 3. プロンプトテンプレート
prompt = ChatPromptTemplate.from_template("""
다음 컨텍스트를 기반으로 질문에 답변하세요.
컨텍스트:
{context}
질문: {question}
답변:
""")
# 4. モデルとパーサー
model = ChatOpenAI(model="gpt-4o", temperature=0)
output_parser = StrOutputParser()
# 5. LCEL RAGチェーンの構成
rag_chain = (
RunnablePassthrough.assign(
context=lambda x: format_docs(retriever.invoke(x["question"]))
)
| prompt
| model
| output_parser
)
# 6. 実行
result = rag_chain.invoke({"question": "LCEL이 무엇인가요?"})
print(result)
このチェーンは以下のフローで動作する:
- 入力
{"question": "LCEL이 무엇인가요?"}がRunnablePassthrough.assign()に渡される。 questionはそのまま保持され、contextにはRetrieverが取得したドキュメントをフォーマットした文字列が割り当てられる。promptがcontextとquestionを組み合わせてプロンプトを生成する。modelがプロンプトを処理し、output_parserが最終的な文字列を抽出する。
6. LangGraphの基礎:StateGraph、Node、Edge
LangGraphはLangChainチームが作成した状態ベースのグラフ実行フレームワークで、複雑なエージェントワークフローを実装するために設計されている。LCELが線形的なチェーンに強みを持つのに対し、LangGraphは循環構造、条件分岐、状態管理が必要なシナリオに適している。
コアコンセプト
- StateGraph:ノード間で共有状態(State)を通じて通信するグラフ。状態定義(TypedDict)を引数として受け取り初期化する。
- Node:状態を入力として受け取り、状態の更新を返す関数。
add_node()でグラフに登録する。 - Edge:ノード間の接続。
add_edge()で定義し、実行フローを決定する。 - START / END:グラフのエントリーポイントと終了ポイントを表す特殊ノード。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from operator import add
# 1. 状態定義
class AgentState(TypedDict):
messages: Annotated[list[str], add] # add reducer:リストを累積
current_step: str
# 2. ノード関数の定義
def analyze_node(state: AgentState):
return {
"messages": ["분석 완료: 데이터를 처리했습니다."],
"current_step": "analyze"
}
def summarize_node(state: AgentState):
return {
"messages": ["요약 완료: 결과를 정리했습니다."],
"current_step": "summarize"
}
# 3. グラフの構築
workflow = StateGraph(AgentState)
workflow.add_node("analyze", analyze_node)
workflow.add_node("summarize", summarize_node)
# 4. エッジの定義
workflow.add_edge(START, "analyze")
workflow.add_edge("analyze", "summarize")
workflow.add_edge("summarize", END)
# 5. コンパイルと実行
graph = workflow.compile()
result = graph.invoke({
"messages": ["시작"],
"current_step": ""
})
print(result)
# {"messages": ["시작", "분석 완료: ...", "요약 완료: ..."], "current_step": "summarize"}
状態定義におけるAnnotated[list[str], add]はreducer関数を指定するものである。add reducerはノードが返すリストを既存のリストに累積(append)する。Reducerを指定しない場合、既存の値は上書きされる。
グラフは必ず.compile()を呼び出してCompiledStateGraphに変換してから実行する必要がある。コンパイルされたグラフはLCELのRunnableインターフェースを実装しているため、invoke()、stream()、ainvoke()などを全て使用できる。
7. Conditional Edgeによる分岐処理(ルーターパターン)
LangGraphのadd_conditional_edges()は、ノードの実行結果に応じて次に実行するノードを動的に決定するメカニズムである。これはエージェントの意思決定ロジックを実装するコア機能である。
from langgraph.graph import StateGraph, START, END
class RouterState(TypedDict):
query: str
category: str
result: str
def classify_node(state: RouterState):
query = state["query"]
if "코드" in query or "프로그래밍" in query:
category = "code"
elif "수학" in query or "계산" in query:
category = "math"
else:
category = "general"
return {"category": category}
def code_expert(state: RouterState):
return {"result": f"코드 전문가가 답변합니다: {state['query']}"}
def math_expert(state: RouterState):
return {"result": f"수학 전문가가 답변합니다: {state['query']}"}
def general_expert(state: RouterState):
return {"result": f"일반 전문가가 답변합니다: {state['query']}"}
# ルーティング関数
def route_query(state: RouterState) -> str:
return state["category"]
# グラフの構築
workflow = StateGraph(RouterState)
workflow.add_node("classify", classify_node)
workflow.add_node("code", code_expert)
workflow.add_node("math", math_expert)
workflow.add_node("general", general_expert)
workflow.add_edge(START, "classify")
# Conditional Edge:classifyノード後のルーティング
workflow.add_conditional_edges(
"classify", # ソースノード
route_query, # ルーティング関数
{ # 戻り値からターゲットノードへのマッピング
"code": "code",
"math": "math",
"general": "general",
}
)
workflow.add_edge("code", END)
workflow.add_edge("math", END)
workflow.add_edge("general", END)
graph = workflow.compile()
result = graph.invoke({"query": "Python 코드 리팩토링 방법", "category": "", "result": ""})
print(result["result"]) # "코드 전문가가 답변합니다: ..."
add_conditional_edges()は3つの引数を受け取る:
- source:分岐が始まるノードの名前
- path:状態を受け取り、次のノード名(またはキー)を返すルーティング関数
- path_map(オプション):ルーティング関数の戻り値を実際のノード名にマッピングするディクショナリ
ルーティング関数が返す文字列がpath_mapにない場合、ランタイムエラーが発生するため、全てのケースを漏れなくマッピングする必要がある。
8. チェックポイントと状態の永続化
LangGraphのCheckpointerは、グラフの実行状態を永続的に保存するパーシステンスレイヤーである。Checkpointerを使用すると、各実行ステップ(superstep)ごとに状態のスナップショットが保存され、実行を一時停止して後で再開したり、特定の時点に巻き戻したりできる。
ThreadとCheckpoint
- Thread:状態を保存・検索するための一意の識別子。
thread_idで指定する。 - Checkpoint:特定時点のグラフ状態のスナップショット。
StateSnapshotオブジェクトとして表現される。
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from typing import TypedDict, Annotated
from operator import add
class ConversationState(TypedDict):
messages: Annotated[list[str], add]
def chat_node(state: ConversationState):
last_message = state["messages"][-1]
response = f"'{last_message}'에 대한 응답입니다."
return {"messages": [response]}
workflow = StateGraph(ConversationState)
workflow.add_node("chat", chat_node)
workflow.add_edge(START, "chat")
workflow.add_edge("chat", END)
# Checkpointerの適用
checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)
# Thread IDを指定して実行
config = {"configurable": {"thread_id": "user-session-1"}}
# 最初の会話
graph.invoke({"messages": ["안녕하세요"]}, config)
# 2回目の会話(同じthread_id -- 前の状態が保持される)
result = graph.invoke({"messages": ["LangGraph에 대해 알려주세요"]}, config)
print(result["messages"])
# ["안녕하세요", "'안녕하세요'에 대한 응답입니다.",
# "LangGraph에 대해 알려주세요", "'LangGraph에 대해 알려주세요'에 대한 응답입니다."]
状態の照会とタイムトラベル
# 現在の状態を照会
snapshot = graph.get_state(config)
print(snapshot.values) # 現在の状態値
print(snapshot.next) # 次に実行されるノード
# 状態履歴の照会
for state in graph.get_state_history(config):
print(state.config["configurable"]["checkpoint_id"], state.values)
特定のcheckpointに戻って実行をフォーク(fork)することもできる。
# 特定のcheckpointから再実行
fork_config = {
"configurable": {
"thread_id": "user-session-1",
"checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"
}
}
graph.invoke(None, config=fork_config)
Checkpointerの種類
| 実装 | 用途 | パッケージ |
|---|---|---|
InMemorySaver | 開発・テスト | langgraphにデフォルトで含まれる |
SqliteSaver | ローカルワークフロー | langgraph-checkpoint-sqlite |
PostgresSaver | プロダクション環境 | langgraph-checkpoint-postgres |
プロダクション環境では、PostgresSaverのような耐久性のあるcheckpointerを必ず使用すべきである。
9. Human-in-the-Loopパターン(interrupt、resume)
LangGraphのinterrupt機能は、グラフの実行を特定のポイントで一時停止し、人間の入力を受けた後に再開するパターンをサポートする。これを機能させるには、必ずCheckpointerが設定されている必要がある。
interrupt関数
interrupt()関数はノード内部で呼び出すと、グラフの実行を即座に中断し、渡された値を呼び出し元に返す。その後Command(resume=value)で実行を再開すると、interrupt()の呼び出し地点に戻り、resume値が返される。
from langgraph.types import interrupt, Command
class ApprovalState(TypedDict):
action: str
approved: bool
result: str
def plan_node(state: ApprovalState):
return {"action": "중요 데이터베이스 마이그레이션 실행"}
def approval_node(state: ApprovalState):
# 実行を一時停止 -- 人間の承認を待つ
response = interrupt({
"question": "다음 작업을 승인하시겠습니까?",
"action": state["action"]
})
return {"approved": response == "승인"}
def execute_node(state: ApprovalState):
if state["approved"]:
return {"result": "작업이 성공적으로 실행되었습니다."}
return {"result": "작업이 취소되었습니다."}
workflow = StateGraph(ApprovalState)
workflow.add_node("plan", plan_node)
workflow.add_node("approval", approval_node)
workflow.add_node("execute", execute_node)
workflow.add_edge(START, "plan")
workflow.add_edge("plan", "approval")
workflow.add_edge("approval", "execute")
workflow.add_edge("execute", END)
checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "migration-1"}}
# ステップ1:グラフを実行 -> approvalノードで一時停止
result = graph.invoke(
{"action": "", "approved": False, "result": ""},
config
)
# resultに__interrupt__情報が含まれる
# ステップ2:人間がレビューして承認
final_result = graph.invoke(
Command(resume="승인"),
config
)
print(final_result["result"]) # "작업이 성공적으로 실행되었습니다."
interrupt使用時の注意事項
公式ドキュメントで強調されている重要なルールである。
- try/exceptで囲まないこと:
interrupt()は内部的に例外を発生させて実行を中断するため、裸のtry/exceptがこれをキャッチすると正常に動作しない。 - interruptの呼び出し順序を変えないこと:Resume時にインデックスベースでマッチングされるため、条件によってinterruptの順序が変わってはならない。
- interruptの前に非冪等(non-idempotent)な操作を行わないこと:Resume時にノードは最初から再実行されるため、interrupt前のコードも再実行される。
- JSONシリアライズ可能な値のみを渡すこと:関数やクラスインスタンスなどは渡せない。
10. LangGraphでマルチエージェントシステムを構築する
LangGraphは複数のエージェントが協力するマルチエージェントシステムの構築に適している。代表的なパターンはSupervisorパターンで、中央のSupervisorエージェントがタスクを分配し、結果を集約する。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated, Literal
from operator import add
class MultiAgentState(TypedDict):
query: str
messages: Annotated[list[str], add]
next_agent: str
final_answer: str
# Supervisor:どのエージェントにタスクを割り当てるかを決定
def supervisor_node(state: MultiAgentState):
query = state["query"].lower()
if "검색" in query or "찾아" in query:
return {"next_agent": "researcher", "messages": ["Supervisor: 연구 에이전트에게 할당"]}
elif "작성" in query or "써줘" in query:
return {"next_agent": "writer", "messages": ["Supervisor: 작성 에이전트에게 할당"]}
else:
return {"next_agent": "analyst", "messages": ["Supervisor: 분석 에이전트에게 할당"]}
# 専門エージェント
def researcher_node(state: MultiAgentState):
return {
"messages": ["Researcher: 관련 자료를 검색하여 수집했습니다."],
"final_answer": f"검색 결과: '{state['query']}'에 대한 자료를 찾았습니다."
}
def writer_node(state: MultiAgentState):
return {
"messages": ["Writer: 요청에 맞게 콘텐츠를 작성했습니다."],
"final_answer": f"작성 결과: '{state['query']}'에 대한 글을 작성했습니다."
}
def analyst_node(state: MultiAgentState):
return {
"messages": ["Analyst: 데이터를 분석하여 인사이트를 도출했습니다."],
"final_answer": f"분석 결과: '{state['query']}'에 대한 분석을 완료했습니다."
}
def route_to_agent(state: MultiAgentState) -> str:
return state["next_agent"]
# グラフの構築
workflow = StateGraph(MultiAgentState)
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("researcher", researcher_node)
workflow.add_node("writer", writer_node)
workflow.add_node("analyst", analyst_node)
workflow.add_edge(START, "supervisor")
workflow.add_conditional_edges(
"supervisor",
route_to_agent,
{
"researcher": "researcher",
"writer": "writer",
"analyst": "analyst",
}
)
workflow.add_edge("researcher", END)
workflow.add_edge("writer", END)
workflow.add_edge("analyst", END)
graph = workflow.compile()
result = graph.invoke({
"query": "최신 AI 트렌드를 검색해줘",
"messages": [],
"next_agent": "",
"final_answer": ""
})
print(result["final_answer"])
LangChainチームはlanggraph-supervisorライブラリも別途提供しており、これを使ってSupervisorパターンをより簡潔に実装できる。しかし、公式ドキュメントではStateGraphで直接実装する方がcontext engineeringに対するよりきめ細かい制御が可能であると案内している。
マルチエージェントの拡張パターン
- 階層的構造:Supervisorの下にSub-Supervisorを配置して大規模なエージェント組織を構成
- ツールの共有:複数のエージェントが共通のツールを共有しつつ、各自の専門ツールも保持
- メモリの共有:
Storeインターフェースを通じてエージェント間で長期メモリを共有
11. LangGraph Studioの紹介
LangGraph Studioは、LangChainチームが開発した初の**エージェントIDE(統合開発環境)**である。LangGraphで構築したエージェントワークフローを視覚的に確認し、リアルタイムで対話し、デバッグできるデスクトップアプリケーションである。
主要機能
- ビジュアルグラフレンダリング:StateGraphのノード、エッジ、条件分岐を視覚的に表現
- リアルタイム実行トラッキング:各ノードの入出力と状態変化をリアルタイムで観察
- インタラクティブデバッグ:特定のノードで実行を停止し、状態を修正して再実行
- Human-in-the-Loopテスト:interruptポイントで直接入力を提供してワークフローを検証
- LangSmith統合:Tracing、Evaluation、Prompt Engineeringとの連携
設定方法
LangGraph Studioを使用するには、プロジェクトルートにlanggraph.json設定ファイルが必要である。
{
"graphs": {
"my_agent": "./agent.py:graph"
},
"dependencies": ["langchain", "langgraph", "langchain-openai"],
"env": ".env"
}
このファイルはエージェントグラフの場所、必要な依存関係、環境変数ファイルを指定する。LangGraph Studioは現在Apple Silicon Macで使用可能で、LangSmithアカウント(無料を含む)でアクセスできる。
12. 実践例:ドキュメント要約 + QA複合ワークフロー
最後に、LCELとLangGraphを組み合わせた実践的なワークフローを構築してみよう。このワークフローはドキュメントを受け取って要約した後、ユーザーの質問に回答する複合システムである。
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import TypedDict, Annotated
from operator import add
# 状態定義
class DocWorkflowState(TypedDict):
document: str
summary: str
questions: Annotated[list[str], add]
answers: Annotated[list[str], add]
current_mode: str # "summarize" | "qa" | "done"
# LLMとチェーンの準備
llm = ChatOpenAI(model="gpt-4o", temperature=0)
summary_prompt = ChatPromptTemplate.from_template(
"다음 문서를 3줄로 요약하세요:\n\n{document}"
)
summary_chain = summary_prompt | llm | StrOutputParser()
qa_prompt = ChatPromptTemplate.from_template(
"다음 문서를 참고하여 질문에 답변하세요.\n\n"
"문서:\n{document}\n\n"
"요약:\n{summary}\n\n"
"질문: {question}\n\n답변:"
)
qa_chain = qa_prompt | llm | StrOutputParser()
# ノードの定義
def summarize_node(state: DocWorkflowState):
summary = summary_chain.invoke({"document": state["document"]})
return {"summary": summary, "current_mode": "qa"}
def ask_question_node(state: DocWorkflowState):
# Human-in-the-Loop:ユーザーから質問を受け取る
user_input = interrupt({
"message": "문서에 대해 궁금한 점을 질문하세요. '종료'를 입력하면 끝납니다.",
"summary": state["summary"]
})
if user_input == "종료":
return {"current_mode": "done"}
return {"questions": [user_input], "current_mode": "qa"}
def answer_node(state: DocWorkflowState):
question = state["questions"][-1]
answer = qa_chain.invoke({
"document": state["document"],
"summary": state["summary"],
"question": question,
})
return {"answers": [answer]}
# ルーティング関数
def route_after_answer(state: DocWorkflowState) -> str:
return state["current_mode"]
def route_after_question(state: DocWorkflowState) -> str:
if state["current_mode"] == "done":
return "done"
return "answer"
# グラフの構築
workflow = StateGraph(DocWorkflowState)
workflow.add_node("summarize", summarize_node)
workflow.add_node("ask_question", ask_question_node)
workflow.add_node("answer", answer_node)
workflow.add_edge(START, "summarize")
workflow.add_edge("summarize", "ask_question")
workflow.add_conditional_edges(
"ask_question",
route_after_question,
{"answer": "answer", "done": END}
)
# 回答後、再び質問ステップに戻る(循環構造)
workflow.add_edge("answer", "ask_question")
# コンパイル
checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)
# 実行
config = {"configurable": {"thread_id": "doc-workflow-1"}}
# ステップ1:ドキュメントを渡す -> 要約を生成 -> 質問待ち(interrupt)
result = graph.invoke(
{
"document": "LangGraph는 LangChain 팀이 개발한 상태 기반 에이전트 프레임워크로...",
"summary": "",
"questions": [],
"answers": [],
"current_mode": "summarize",
},
config
)
# ステップ2:ユーザーの質問 -> 回答 -> 再び質問待ち
result = graph.invoke(Command(resume="LangGraph의 주요 장점은?"), config)
# ステップ3:追加の質問
result = graph.invoke(Command(resume="LCEL과의 차이점은?"), config)
# ステップ4:終了
result = graph.invoke(Command(resume="종료"), config)
このワークフローはLangGraphの以下の全てのコア機能を活用している:
- StateGraph:ワークフロー全体の状態管理
- Conditional Edge:ユーザー入力に基づく動的分岐
- 循環構造(Cycle):回答後に再び質問に戻るループ構造
- Checkpointer:会話状態の永続的な保存
- interrupt / Command:Human-in-the-Loopパターン
- LCELチェーン:要約とQAのためのLLMチェーン
このように、LCELで個々のLLMチェーンを構成し、LangGraphでワークフロー全体のフローと状態を管理するのが、LangChainエコシステムで推奨されるアーキテクチャパターンである。
13. References
- LangChain Expression Language (LCEL) 公式ドキュメント
- LangChain Runnables API Reference
- LangChain Streaming 公式ガイド
- How to Stream Runnables
- LangGraph Overview 公式ドキュメント
- LangGraph Graph API 公式ドキュメント
- LangGraph Persistence 公式ドキュメント
- LangGraph Interrupts 公式ドキュメント
- LangGraph StateGraph API Reference
- LangGraph Checkpoint Reference
- LangGraph Supervisor ライブラリ
- LangGraph Studio 公式ドキュメント
- LangGraph GitHub Repository
- LangChain 公式ブログ - LCEL
- LangChain 公式ブログ - LangGraph