Skip to content
Published on

LangGraphエージェントワークフロー実践ガイド:マルチエージェントオーケストレーションからプロダクションデプロイまで

Authors
  • Name
    Twitter

1. LangGraphとは

LangGraphはLangChainチームが開発した状態ベース(Stateful)エージェントオーケストレーションフレームワークだ。既存のLangChainのChain/Agentが線形的だったのに対し、LangGraphはグラフ構造で複雑なワークフローを表現する。

1.1 なぜLangGraphなのか?

特性LangChain AgentLangGraph
フロー制御単純ループDAG + 条件分岐
状態管理限定的TypedDict基盤の明示的状態
マルチエージェント困難ネイティブサポート
人間の介入カスタム実装interrupt()内蔵
ストリーミング基本トークン/イベント/状態ストリーミング
デバッグ困難LangGraph Studio

2. 基本概念

2.1 StateGraph

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END

# 1) 状態定義
class AgentState(TypedDict):
    messages: Annotated[list, "add_messages"]  # メッセージ蓄積
    next_action: str
    result: str

# 2) ノード関数定義
def classify_intent(state: AgentState) -> dict:
    """ユーザーの意図を分類"""
    last_msg = state["messages"][-1].content
    # LLMで意図分類
    intent = llm.invoke(f"Classify intent: {last_msg}")
    return {"next_action": intent.content}

def handle_question(state: AgentState) -> dict:
    """質問処理"""
    answer = llm.invoke(state["messages"])
    return {"result": answer.content}

def handle_task(state: AgentState) -> dict:
    """タスク実行"""
    result = agent_executor.invoke(state["messages"])
    return {"result": result}

# 3) グラフ構築
graph = StateGraph(AgentState)
graph.add_node("classify", classify_intent)
graph.add_node("question", handle_question)
graph.add_node("task", handle_task)

# 4) エッジ接続
graph.add_edge(START, "classify")
graph.add_conditional_edges(
    "classify",
    lambda state: state["next_action"],
    {
        "question": "question",
        "task": "task",
    }
)
graph.add_edge("question", END)
graph.add_edge("task", END)

# 5) コンパイル&実行
app = graph.compile()
result = app.invoke({"messages": [HumanMessage("What is Kubernetes?")]})

2.2 Tool使用エージェント

from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent

@tool
def search_docs(query: str) -> str:
    """Search internal documentation."""
    # ベクトルDB検索
    results = vectorstore.similarity_search(query, k=3)
    return "\n".join([r.page_content for r in results])

@tool
def run_query(sql: str) -> str:
    """Execute a read-only SQL query."""
    return db.execute(sql).fetchall()

@tool
def create_ticket(title: str, description: str) -> str:
    """Create a Jira ticket."""
    return jira.create_issue(title=title, description=description)

llm = ChatOpenAI(model="gpt-4o")

# ReActエージェントを自動生成
agent = create_react_agent(
    llm,
    tools=[search_docs, run_query, create_ticket],
    state_modifier="You are a helpful DevOps assistant."
)

# 実行
result = agent.invoke({
    "messages": [HumanMessage("Check if order-service has errors and create a ticket")]
})

3. 上級パターン

3.1 マルチエージェントオーケストレーション

from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent

class SupervisorState(TypedDict):
    messages: Annotated[list, "add_messages"]
    next_agent: str
    final_answer: str

# 専門エージェントたち
researcher = create_react_agent(llm, tools=[search_web, search_docs])
coder = create_react_agent(llm, tools=[run_code, read_file])
reviewer = create_react_agent(llm, tools=[analyze_code, lint_code])

def supervisor(state: SupervisorState) -> dict:
    """どのエージェントに委任するか決定"""
    response = llm.invoke([
        SystemMessage("You are a supervisor. Route to: researcher, coder, reviewer, or FINISH"),
        *state["messages"]
    ])
    return {"next_agent": response.content.strip()}

def run_researcher(state):
    result = researcher.invoke({"messages": state["messages"]})
    return {"messages": result["messages"]}

def run_coder(state):
    result = coder.invoke({"messages": state["messages"]})
    return {"messages": result["messages"]}

def run_reviewer(state):
    result = reviewer.invoke({"messages": state["messages"]})
    return {"messages": result["messages"]}

# グラフ構成
workflow = StateGraph(SupervisorState)
workflow.add_node("supervisor", supervisor)
workflow.add_node("researcher", run_researcher)
workflow.add_node("coder", run_coder)
workflow.add_node("reviewer", run_reviewer)

workflow.add_edge(START, "supervisor")
workflow.add_conditional_edges(
    "supervisor",
    lambda s: s["next_agent"],
    {
        "researcher": "researcher",
        "coder": "coder",
        "reviewer": "reviewer",
        "FINISH": END,
    }
)
# 各エージェント実行後supervisorに復帰
for agent_name in ["researcher", "coder", "reviewer"]:
    workflow.add_edge(agent_name, "supervisor")

app = workflow.compile()

3.2 Human-in-the-Loop

from langgraph.types import interrupt, Command

def sensitive_action(state):
    """機密操作の前に人間の承認を要求"""
    action = state["pending_action"]

    # 実行を中断し人間の承認を待機
    approval = interrupt({
        "question": f"Approve this action?\n{action}",
        "options": ["approve", "reject", "modify"]
    })

    if approval == "approve":
        return execute_action(action)
    elif approval == "reject":
        return {"result": "Action cancelled by user"}
    else:
        return {"result": f"Action modified: {approval}"}

# チェックポインターとともにコンパイル
from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer)

# 実行(interruptで停止)
config = {"configurable": {"thread_id": "user-123"}}
result = app.invoke({"messages": [...]}, config)
# result = {"__interrupt__": {...}}

# 人間が承認後に再開
app.invoke(Command(resume="approve"), config)

3.3 並列実行

from langgraph.graph import StateGraph

class ParallelState(TypedDict):
    query: str
    web_results: str
    db_results: str
    combined: str

def search_web_node(state):
    return {"web_results": search_web(state["query"])}

def search_db_node(state):
    return {"db_results": search_db(state["query"])}

def combine_results(state):
    combined = f"Web: {state['web_results']}\nDB: {state['db_results']}"
    return {"combined": combined}

graph = StateGraph(ParallelState)
graph.add_node("web", search_web_node)
graph.add_node("db", search_db_node)
graph.add_node("combine", combine_results)

# 並列実行: START → [web, db] → combine
graph.add_edge(START, "web")
graph.add_edge(START, "db")
graph.add_edge("web", "combine")
graph.add_edge("db", "combine")
graph.add_edge("combine", END)

app = graph.compile()

4. ストリーミング

# トークン単位ストリーミング
async for event in app.astream_events(
    {"messages": [HumanMessage("Explain CQRS")]},
    version="v2"
):
    if event["event"] == "on_chat_model_stream":
        print(event["data"]["chunk"].content, end="", flush=True)

# ノード単位ストリーミング
for chunk in app.stream({"messages": [HumanMessage("...")]}):
    for node_name, output in chunk.items():
        print(f"[{node_name}]: {output}")

5. メモリとチェックポインティング

from langgraph.checkpoint.postgres import PostgresSaver

# PostgreSQLチェックポインター(プロダクション)
checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost/langgraph"
)

app = workflow.compile(checkpointer=checkpointer)

# 会話の永続化(thread_idで区別)
config = {"configurable": {"thread_id": "user-456"}}

# 最初のメッセージ
app.invoke({"messages": [HumanMessage("Hi")]}, config)

# 2番目のメッセージ(以前の会話コンテキストを維持)
app.invoke({"messages": [HumanMessage("What did I just say?")]}, config)

6. LangGraph Platformデプロイ

6.1 langgraph.json

{
  "dependencies": ["."],
  "graphs": {
    "agent": "./agent.py:app"
  },
  "env": ".env"
}

6.2 デプロイ

# LangGraph CLIインストール
pip install langgraph-cli

# ローカルテスト
langgraph dev

# Dockerビルド
langgraph build -t my-agent:latest

# LangGraph Cloudデプロイ
langgraph deploy --app my-agent

7. クイズ

Q1. LangGraphのStateGraphにおけるState(状態)の役割は?

ノード間で共有されるデータコンテナ。TypedDictで定義し、各ノードが状態を読み取り更新する。Annotated reducerでリスト蓄積などのマージ戦略を定義可能。

Q2. add_conditional_edgesの用途は?

条件分岐。現在の状態に応じて次のノードを動的に決定する。ルーティング関数が状態を受け取り、次のノード名を返す。

Q3. interrupt()の役割は?

Human-in-the-Loopの実装。ワークフローの実行を中断し、人間の入力(承認/拒否/修正)を待った後、Command(resume=...)で再開する。

Q4. マルチエージェントにおけるSupervisorパターンの利点は?

中央のSupervisorが全体のフローを制御し、(1)各エージェントの専門性を活用、(2)タスク順序の動的決定、(3)実行後の次のステップ判断が可能。

Q5. チェックポインター(Checkpointer)を使用する理由は?

(1)会話の永続化 — thread_idごとに状態を保存。(2)障害復旧 — 中断点から再開。(3)Human-in-the-Loop — interrupt後の状態保存。

Q6. LangGraphで並列実行はどのように実装するか?

同じソースノード(例:START)から複数のノードへエッジを接続すると自動的に並列実行される。結果をまとめるノードで、すべての先行ノードが完了するまで待機する。

Q7. create_react_agentと直接StateGraphを構成する場合の違いは?

create_react_agentReAct(Reasoning + Acting)パターンのプリビルト。シンプルなツール使用エージェントに適している。複雑な分岐、マルチエージェント、カスタム状態が必要な場合は直接StateGraphを構成する。