- Authors
- Name
1. LangGraphとは
LangGraphはLangChainチームが開発した状態ベース(Stateful)エージェントオーケストレーションフレームワークだ。既存のLangChainのChain/Agentが線形的だったのに対し、LangGraphはグラフ構造で複雑なワークフローを表現する。
1.1 なぜLangGraphなのか?
| 特性 | LangChain Agent | LangGraph |
|---|---|---|
| フロー制御 | 単純ループ | DAG + 条件分岐 |
| 状態管理 | 限定的 | TypedDict基盤の明示的状態 |
| マルチエージェント | 困難 | ネイティブサポート |
| 人間の介入 | カスタム実装 | interrupt()内蔵 |
| ストリーミング | 基本 | トークン/イベント/状態ストリーミング |
| デバッグ | 困難 | LangGraph Studio |
2. 基本概念
2.1 StateGraph
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
# 1) 状態定義
class AgentState(TypedDict):
messages: Annotated[list, "add_messages"] # メッセージ蓄積
next_action: str
result: str
# 2) ノード関数定義
def classify_intent(state: AgentState) -> dict:
"""ユーザーの意図を分類"""
last_msg = state["messages"][-1].content
# LLMで意図分類
intent = llm.invoke(f"Classify intent: {last_msg}")
return {"next_action": intent.content}
def handle_question(state: AgentState) -> dict:
"""質問処理"""
answer = llm.invoke(state["messages"])
return {"result": answer.content}
def handle_task(state: AgentState) -> dict:
"""タスク実行"""
result = agent_executor.invoke(state["messages"])
return {"result": result}
# 3) グラフ構築
graph = StateGraph(AgentState)
graph.add_node("classify", classify_intent)
graph.add_node("question", handle_question)
graph.add_node("task", handle_task)
# 4) エッジ接続
graph.add_edge(START, "classify")
graph.add_conditional_edges(
"classify",
lambda state: state["next_action"],
{
"question": "question",
"task": "task",
}
)
graph.add_edge("question", END)
graph.add_edge("task", END)
# 5) コンパイル&実行
app = graph.compile()
result = app.invoke({"messages": [HumanMessage("What is Kubernetes?")]})
2.2 Tool使用エージェント
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent
@tool
def search_docs(query: str) -> str:
"""Search internal documentation."""
# ベクトルDB検索
results = vectorstore.similarity_search(query, k=3)
return "\n".join([r.page_content for r in results])
@tool
def run_query(sql: str) -> str:
"""Execute a read-only SQL query."""
return db.execute(sql).fetchall()
@tool
def create_ticket(title: str, description: str) -> str:
"""Create a Jira ticket."""
return jira.create_issue(title=title, description=description)
llm = ChatOpenAI(model="gpt-4o")
# ReActエージェントを自動生成
agent = create_react_agent(
llm,
tools=[search_docs, run_query, create_ticket],
state_modifier="You are a helpful DevOps assistant."
)
# 実行
result = agent.invoke({
"messages": [HumanMessage("Check if order-service has errors and create a ticket")]
})
3. 上級パターン
3.1 マルチエージェントオーケストレーション
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
class SupervisorState(TypedDict):
messages: Annotated[list, "add_messages"]
next_agent: str
final_answer: str
# 専門エージェントたち
researcher = create_react_agent(llm, tools=[search_web, search_docs])
coder = create_react_agent(llm, tools=[run_code, read_file])
reviewer = create_react_agent(llm, tools=[analyze_code, lint_code])
def supervisor(state: SupervisorState) -> dict:
"""どのエージェントに委任するか決定"""
response = llm.invoke([
SystemMessage("You are a supervisor. Route to: researcher, coder, reviewer, or FINISH"),
*state["messages"]
])
return {"next_agent": response.content.strip()}
def run_researcher(state):
result = researcher.invoke({"messages": state["messages"]})
return {"messages": result["messages"]}
def run_coder(state):
result = coder.invoke({"messages": state["messages"]})
return {"messages": result["messages"]}
def run_reviewer(state):
result = reviewer.invoke({"messages": state["messages"]})
return {"messages": result["messages"]}
# グラフ構成
workflow = StateGraph(SupervisorState)
workflow.add_node("supervisor", supervisor)
workflow.add_node("researcher", run_researcher)
workflow.add_node("coder", run_coder)
workflow.add_node("reviewer", run_reviewer)
workflow.add_edge(START, "supervisor")
workflow.add_conditional_edges(
"supervisor",
lambda s: s["next_agent"],
{
"researcher": "researcher",
"coder": "coder",
"reviewer": "reviewer",
"FINISH": END,
}
)
# 各エージェント実行後supervisorに復帰
for agent_name in ["researcher", "coder", "reviewer"]:
workflow.add_edge(agent_name, "supervisor")
app = workflow.compile()
3.2 Human-in-the-Loop
from langgraph.types import interrupt, Command
def sensitive_action(state):
"""機密操作の前に人間の承認を要求"""
action = state["pending_action"]
# 実行を中断し人間の承認を待機
approval = interrupt({
"question": f"Approve this action?\n{action}",
"options": ["approve", "reject", "modify"]
})
if approval == "approve":
return execute_action(action)
elif approval == "reject":
return {"result": "Action cancelled by user"}
else:
return {"result": f"Action modified: {approval}"}
# チェックポインターとともにコンパイル
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer)
# 実行(interruptで停止)
config = {"configurable": {"thread_id": "user-123"}}
result = app.invoke({"messages": [...]}, config)
# result = {"__interrupt__": {...}}
# 人間が承認後に再開
app.invoke(Command(resume="approve"), config)
3.3 並列実行
from langgraph.graph import StateGraph
class ParallelState(TypedDict):
query: str
web_results: str
db_results: str
combined: str
def search_web_node(state):
return {"web_results": search_web(state["query"])}
def search_db_node(state):
return {"db_results": search_db(state["query"])}
def combine_results(state):
combined = f"Web: {state['web_results']}\nDB: {state['db_results']}"
return {"combined": combined}
graph = StateGraph(ParallelState)
graph.add_node("web", search_web_node)
graph.add_node("db", search_db_node)
graph.add_node("combine", combine_results)
# 並列実行: START → [web, db] → combine
graph.add_edge(START, "web")
graph.add_edge(START, "db")
graph.add_edge("web", "combine")
graph.add_edge("db", "combine")
graph.add_edge("combine", END)
app = graph.compile()
4. ストリーミング
# トークン単位ストリーミング
async for event in app.astream_events(
{"messages": [HumanMessage("Explain CQRS")]},
version="v2"
):
if event["event"] == "on_chat_model_stream":
print(event["data"]["chunk"].content, end="", flush=True)
# ノード単位ストリーミング
for chunk in app.stream({"messages": [HumanMessage("...")]}):
for node_name, output in chunk.items():
print(f"[{node_name}]: {output}")
5. メモリとチェックポインティング
from langgraph.checkpoint.postgres import PostgresSaver
# PostgreSQLチェックポインター(プロダクション)
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/langgraph"
)
app = workflow.compile(checkpointer=checkpointer)
# 会話の永続化(thread_idで区別)
config = {"configurable": {"thread_id": "user-456"}}
# 最初のメッセージ
app.invoke({"messages": [HumanMessage("Hi")]}, config)
# 2番目のメッセージ(以前の会話コンテキストを維持)
app.invoke({"messages": [HumanMessage("What did I just say?")]}, config)
6. LangGraph Platformデプロイ
6.1 langgraph.json
{
"dependencies": ["."],
"graphs": {
"agent": "./agent.py:app"
},
"env": ".env"
}
6.2 デプロイ
# LangGraph CLIインストール
pip install langgraph-cli
# ローカルテスト
langgraph dev
# Dockerビルド
langgraph build -t my-agent:latest
# LangGraph Cloudデプロイ
langgraph deploy --app my-agent
7. クイズ
Q1. LangGraphのStateGraphにおけるState(状態)の役割は?
ノード間で共有されるデータコンテナ。TypedDictで定義し、各ノードが状態を読み取り更新する。Annotated reducerでリスト蓄積などのマージ戦略を定義可能。
Q2. add_conditional_edgesの用途は?
条件分岐。現在の状態に応じて次のノードを動的に決定する。ルーティング関数が状態を受け取り、次のノード名を返す。
Q3. interrupt()の役割は?
Human-in-the-Loopの実装。ワークフローの実行を中断し、人間の入力(承認/拒否/修正)を待った後、Command(resume=...)で再開する。
Q4. マルチエージェントにおけるSupervisorパターンの利点は?
中央のSupervisorが全体のフローを制御し、(1)各エージェントの専門性を活用、(2)タスク順序の動的決定、(3)実行後の次のステップ判断が可能。
Q5. チェックポインター(Checkpointer)を使用する理由は?
(1)会話の永続化 — thread_idごとに状態を保存。(2)障害復旧 — 中断点から再開。(3)Human-in-the-Loop — interrupt後の状態保存。
Q6. LangGraphで並列実行はどのように実装するか?
同じソースノード(例:START)から複数のノードへエッジを接続すると自動的に並列実行される。結果をまとめるノードで、すべての先行ノードが完了するまで待機する。
Q7. create_react_agentと直接StateGraphを構成する場合の違いは?
create_react_agentはReAct(Reasoning + Acting)パターンのプリビルト。シンプルなツール使用エージェントに適している。複雑な分岐、マルチエージェント、カスタム状態が必要な場合は直接StateGraphを構成する。