- Published on
チャットボット マルチターン対話メモリ管理ガイド:LangChain・LangGraphで実装するコンテキスト維持戦略
- Authors
- Name
- はじめに
- マルチターン対話の中核課題
- LangChainメモリタイプ
- メモリタイプ比較
- LangGraphステートベースエージェント
- 永続メモリの実装
- コンテキスト圧縮技法
- RAG連携メモリ
- セッション管理パターン
- トラブルシューティング
- 運用ノート
- プロダクションチェックリスト
- 参考資料

はじめに
チャットボットを構築する際、最も基本的でありながら困難な問題はマルチターン対話でコンテキストを維持することである。単純な質疑応答(シングルターン)は各リクエストを独立に処理すればよいが、実際の対話は前の内容に基づいて展開される。「それはいくらですか?」という質問に答えるには、「それ」が何を指すのか前の対話から把握する必要がある。
LLMのコンテキストウィンドウは有限である。GPT-4oの128Kトークンでも数百ターンの対話をすべて収容するのは困難で、トークンコストも急激に増加する。そのため、どの情報をどれだけ維持するかというメモリ管理戦略が不可欠である。
本記事では、LangChainの各種メモリタイプを比較し、LangGraphを活用したステートベースエージェントの構築、データベースを活用した永続メモリ、RAG連携まで、プロダクションレベルのマルチターン対話システム実装方法を解説する。
マルチターン対話の中核課題
コンテキストウィンドウの限界
LLMは1回のAPI呼び出しで処理できるトークン数に制限がある。対話が長くなるほど初期の対話内容が切り捨てられるか、コストが急増する。
# 問題シナリオ: 対話が長くなると初期コンテキストが失われる
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# 100ターンの対話をシミュレーション
messages = []
for i in range(100):
messages.append(HumanMessage(content=f"Turn {i}: これは{i}番目の質問です。"))
messages.append(AIMessage(content=f"Turn {i}: はい、{i}番目の回答です。"))
# すべてのメッセージを送信するとトークン上限超過のリスク
# 解決策: メモリ管理戦略を適用
print(f"総メッセージ数: {len(messages)}")
関連性の低下問題
対話が進むにつれ、初期の対話内容の関連性は低下する。すべての対話を同じ重みで送信するのは非効率的である。
| 問題 | 説明 | 影響 |
|---|---|---|
| トークン上限超過 | 長い対話がコンテキストウィンドウを超える | APIエラーまたは初期対話の欠落 |
| コスト増加 | 不要な過去の対話も毎回送信 | トークンコストが急増 |
| 関連性の希釈 | 核心情報が不要な対話に埋もれる | 応答品質の低下 |
| 遅延時間の増加 | 長いプロンプトの処理に時間がかかる | ユーザー体験の悪化 |
| ハルシネーション増加 | 過剰なコンテキストからの誤った推論 | 信頼性の低下 |
LangChainメモリタイプ
ConversationBufferMemory
最もシンプルなメモリタイプで、すべての対話内容をそのまま保存する。
from langchain_openai import ChatOpenAI
from langchain.chains import ConversationChain
from langchain.memory import ConversationBufferMemory
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
# Buffer Memory: すべての対話内容をそのまま保持
memory = ConversationBufferMemory(return_messages=True)
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True,
)
# 対話を進行
response1 = conversation.predict(input="こんにちは、私は田中です。Pythonエンジニアです。")
print(f"AI: {response1}")
response2 = conversation.predict(input="一番好きなフレームワークはFastAPIです。")
print(f"AI: {response2}")
response3 = conversation.predict(input="私の名前は何でしたっけ?")
print(f"AI: {response3}")
# AIは前の対話を記憶して「田中」と回答
# メモリ内容の確認
print("\n=== Memory Contents ===")
for msg in memory.chat_memory.messages:
print(f" {type(msg).__name__}: {msg.content[:80]}...")
ConversationBufferWindowMemory
直近N件の対話のみを保持するスライディングウィンドウ方式である。
from langchain.memory import ConversationBufferWindowMemory
# 直近5ターンのみ保持
memory = ConversationBufferWindowMemory(
k=5,
return_messages=True,
)
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True,
)
# 10ターンの対話を進行
for i in range(10):
response = conversation.predict(input=f"これは{i+1}番目のメッセージです。")
print(f"Turn {i+1}: {response[:50]}...")
# k=5なので6番目のメッセージから1番目のメッセージが削除される
print(f"\nメモリに保存されたメッセージ数: {len(memory.chat_memory.messages)}")
ConversationSummaryMemory
対話内容をLLMで要約して保存する。長い対話でも核心情報を維持できる。
from langchain.memory import ConversationSummaryMemory
# Summary Memory: LLMが対話を要約して保存
memory = ConversationSummaryMemory(
llm=llm,
return_messages=True,
)
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True,
)
# 複数ターンの対話
conversation.predict(input="こんにちは、東京在住のバックエンドエンジニアです。")
conversation.predict(input="主にPythonとGoを使い、Kubernetes環境で仕事をしています。")
conversation.predict(input="最近はLangChainでチャットボットを開発しています。")
conversation.predict(input="RAGパイプラインの構築が主な業務です。")
# 要約内容の確認
print("\n=== Summary ===")
print(memory.buffer)
# 全対話ではなく要約文が保存される
ConversationSummaryBufferMemory
要約とバッファを組み合わせたハイブリッド方式である。最近の対話は原文のまま保持し、古い対話は要約する。
from langchain.memory import ConversationSummaryBufferMemory
# Summary + Buffer ハイブリッド: 最近の対話は原文、過去の対話は要約
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=300, # この上限を超えると古い対話を要約
return_messages=True,
)
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True,
)
# 対話を進行
conversation.predict(input="プロジェクトAはECプラットフォームです。")
conversation.predict(input="技術スタックはNext.js、FastAPI、PostgreSQLです。")
conversation.predict(input="現在決済モジュールを実装中です。")
conversation.predict(input="PG連携でのwebhook処理が大変です。")
conversation.predict(input="テスト環境の構築も必要です。")
# メモリ状態の確認
print("\n=== Moving Summary ===")
print(memory.moving_summary_buffer)
print(f"\n現在のバッファメッセージ数: {len(memory.chat_memory.messages)}")
EntityMemory
対話からエンティティ(人物、場所、概念など)を抽出して管理する。
from langchain.memory import ConversationEntityMemory
from langchain.memory.prompt import ENTITY_MEMORY_CONVERSATION_TEMPLATE
# Entity Memory: 対話からエンティティを抽出・更新
memory = ConversationEntityMemory(
llm=llm,
return_messages=True,
)
conversation = ConversationChain(
llm=llm,
memory=memory,
prompt=ENTITY_MEMORY_CONVERSATION_TEMPLATE,
verbose=True,
)
# エンティティを含む対話
conversation.predict(
input="佐藤さんはチームのシニアエンジニアです。Python専門家で経験5年です。"
)
conversation.predict(
input="鈴木さんはプロダクトマネージャーで、佐藤さんと一緒にレコメンドシステムプロジェクトを進めています。"
)
conversation.predict(
input="佐藤さんが最近MLflowを導入して実験管理を始めました。"
)
# エンティティ情報の確認
print("\n=== Entity Store ===")
for entity, info in memory.entity_store.store.items():
print(f" {entity}: {info}")
メモリタイプ比較
| メモリタイプ | メリット | デメリット | 適した場面 |
|---|---|---|---|
| Buffer | 全対話を保存、実装が簡単 | トークンコスト増加、ウィンドウ超過リスク | 短い対話、プロトタイプ |
| BufferWindow | コスト予測可能、最新情報を維持 | 古い情報が失われる | カスタマーサポート、FAQボット |
| Summary | 長い対話でも核心を維持 | 要約時の情報損失、追加LLM呼び出しコスト | 長期対話、サポート履歴 |
| SummaryBuffer | 最近の原文+過去の要約を結合 | 設定が複雑、要約品質に依存 | 技術サポート、プロジェクト対話 |
| Entity | 核心エンティティの追跡 | エンティティ抽出エラーの可能性、追加コスト | CRMボット、人物管理システム |
LangGraphステートベースエージェント
LangGraph基本構造
LangGraphは対話をステートグラフ(State Graph)としてモデル化する。各ノードは処理ステップ、エッジは状態遷移を表す。
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
# ステートベースの対話グラフ定義
def chatbot_node(state: MessagesState):
"""メインチャットボットノード"""
system_message = SystemMessage(
content="あなたは親切なAIアシスタントです。前の対話の文脈を考慮して回答してください。"
)
messages = [system_message] + state["messages"]
response = llm.invoke(messages)
return {"messages": [response]}
# グラフ構成
graph_builder = StateGraph(MessagesState)
graph_builder.add_node("chatbot", chatbot_node)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)
# メモリチェックポインターの追加(対話状態の永続化)
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)
# セッション別対話(thread_idでセッションを区別)
config = {"configurable": {"thread_id": "user-session-001"}}
# 最初のメッセージ
response1 = graph.invoke(
{"messages": [HumanMessage(content="こんにちは、データエンジニアです。")]},
config=config,
)
print(f"AI: {response1['messages'][-1].content}")
# 2番目のメッセージ(前の対話が自動的に維持される)
response2 = graph.invoke(
{"messages": [HumanMessage(content="私の職業は何でしたっけ?")]},
config=config,
)
print(f"AI: {response2['messages'][-1].content}")
条件付きルーティングとツール使用
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
@tool
def search_knowledge_base(query: str) -> str:
"""ナレッジベースから情報を検索します。"""
# 実際にはベクトルDB検索などを実行
knowledge = {
"返品ポリシー": "購入後14日以内であれば全額返金可能です。",
"配送期間": "注文後2-3営業日以内に配送されます。",
"会員ランク": "ブロンズ、シルバー、ゴールド、プラチナの4段階です。",
}
for key, value in knowledge.items():
if key in query:
return value
return "関連情報が見つかりませんでした。"
@tool
def get_order_status(order_id: str) -> str:
"""注文状況を照会します。"""
# 実際にはDB照会
return f"注文{order_id}: 配送中(到着予定日: 2026-03-13)"
# ツールバインド
tools = [search_knowledge_base, get_order_status]
llm = ChatOpenAI(model="gpt-4o", temperature=0).bind_tools(tools)
def assistant_node(state: MessagesState):
"""アシスタントノード: LLM呼び出しとツール使用判断"""
system_msg = SystemMessage(
content="あなたはECカスタマーサポートチャットボットです。必要に応じてツールを使用してください。"
)
messages = [system_msg] + state["messages"]
response = llm.invoke(messages)
return {"messages": [response]}
# グラフ構成
graph_builder = StateGraph(MessagesState)
graph_builder.add_node("assistant", assistant_node)
graph_builder.add_node("tools", ToolNode(tools))
graph_builder.add_edge(START, "assistant")
graph_builder.add_conditional_edges("assistant", tools_condition)
graph_builder.add_edge("tools", "assistant")
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)
# 対話実行
config = {"configurable": {"thread_id": "customer-123"}}
response = graph.invoke(
{"messages": [HumanMessage(content="注文番号ORD-2026-0311の配送状況を教えてください。")]},
config=config,
)
print(f"AI: {response['messages'][-1].content}")
永続メモリの実装
Redisベースのセッション管理
import redis
import json
from datetime import datetime, timedelta
from langchain_core.messages import HumanMessage, AIMessage, messages_from_dict, messages_to_dict
class RedisSessionMemory:
"""Redisベースの対話セッションメモリ"""
def __init__(self, redis_url="redis://localhost:6379", ttl_hours=24):
self.redis = redis.from_url(redis_url)
self.ttl = timedelta(hours=ttl_hours)
def _key(self, session_id: str) -> str:
return f"chat:session:{session_id}"
def save_messages(self, session_id: str, messages: list):
"""メッセージリストをRedisに保存"""
key = self._key(session_id)
data = {
"messages": messages_to_dict(messages),
"updated_at": datetime.now().isoformat(),
}
self.redis.setex(key, self.ttl, json.dumps(data, ensure_ascii=False))
def load_messages(self, session_id: str) -> list:
"""Redisからメッセージリストを読み込み"""
key = self._key(session_id)
data = self.redis.get(key)
if data is None:
return []
parsed = json.loads(data)
return messages_from_dict(parsed["messages"])
def add_message(self, session_id: str, message):
"""単一メッセージの追加"""
messages = self.load_messages(session_id)
messages.append(message)
self.save_messages(session_id, messages)
def clear_session(self, session_id: str):
"""セッション削除"""
self.redis.delete(self._key(session_id))
def get_session_info(self, session_id: str) -> dict:
"""セッションメタ情報の照会"""
key = self._key(session_id)
data = self.redis.get(key)
if data is None:
return {"exists": False}
parsed = json.loads(data)
return {
"exists": True,
"message_count": len(parsed["messages"]),
"updated_at": parsed["updated_at"],
"ttl_seconds": self.redis.ttl(key),
}
# 使用例
session_memory = RedisSessionMemory(redis_url="redis://localhost:6379")
session_id = "user-abc-123"
session_memory.add_message(session_id, HumanMessage(content="こんにちは"))
session_memory.add_message(session_id, AIMessage(content="こんにちは!何をお手伝いしましょうか?"))
messages = session_memory.load_messages(session_id)
print(f"保存されたメッセージ数: {len(messages)}")
PostgreSQLベースの長期メモリ
from sqlalchemy import create_engine, Column, String, Text, DateTime, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import json
Base = declarative_base()
class ConversationHistory(Base):
"""対話履歴テーブル"""
__tablename__ = "conversation_history"
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String(255), index=True, nullable=False)
user_id = Column(String(255), index=True, nullable=False)
role = Column(String(50), nullable=False) # human, ai, system
content = Column(Text, nullable=False)
metadata_json = Column(Text, default="{}")
created_at = Column(DateTime, default=datetime.utcnow)
class ConversationSummaryStore(Base):
"""対話要約テーブル"""
__tablename__ = "conversation_summaries"
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String(255), unique=True, nullable=False)
user_id = Column(String(255), index=True, nullable=False)
summary = Column(Text, nullable=False)
entity_data = Column(Text, default="{}")
message_count = Column(Integer, default=0)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class PostgresMemoryManager:
"""PostgreSQLベースの対話メモリマネージャー"""
def __init__(self, database_url: str):
self.engine = create_engine(database_url)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
def save_message(self, session_id: str, user_id: str, role: str, content: str):
"""メッセージ保存"""
session = self.Session()
try:
msg = ConversationHistory(
session_id=session_id,
user_id=user_id,
role=role,
content=content,
)
session.add(msg)
session.commit()
finally:
session.close()
def get_recent_messages(self, session_id: str, limit: int = 20):
"""最近のメッセージ照会"""
session = self.Session()
try:
messages = (
session.query(ConversationHistory)
.filter(ConversationHistory.session_id == session_id)
.order_by(ConversationHistory.created_at.desc())
.limit(limit)
.all()
)
return list(reversed(messages))
finally:
session.close()
def save_summary(self, session_id: str, user_id: str, summary: str,
entity_data: dict, message_count: int):
"""対話要約の保存/更新"""
session = self.Session()
try:
existing = (
session.query(ConversationSummaryStore)
.filter(ConversationSummaryStore.session_id == session_id)
.first()
)
if existing:
existing.summary = summary
existing.entity_data = json.dumps(entity_data, ensure_ascii=False)
existing.message_count = message_count
else:
new_summary = ConversationSummaryStore(
session_id=session_id,
user_id=user_id,
summary=summary,
entity_data=json.dumps(entity_data, ensure_ascii=False),
message_count=message_count,
)
session.add(new_summary)
session.commit()
finally:
session.close()
# 使用例
db_url = "postgresql://chatbot:password@localhost:5432/chatbot_db"
memory_manager = PostgresMemoryManager(db_url)
コンテキスト圧縮技法
対話要約+最近メッセージの結合
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
class HybridMemoryManager:
"""要約+最近バッファを結合したハイブリッドメモリ"""
def __init__(self, llm, max_buffer_messages=10):
self.llm = llm
self.max_buffer_messages = max_buffer_messages
self.summary = ""
self.buffer = []
def add_exchange(self, human_msg: str, ai_msg: str):
"""対話交換の追加"""
self.buffer.append(HumanMessage(content=human_msg))
self.buffer.append(AIMessage(content=ai_msg))
# バッファが上限を超えたら古いメッセージを要約に統合
if len(self.buffer) > self.max_buffer_messages * 2:
self._compress()
def _compress(self):
"""古いメッセージを要約に統合"""
# 前半を要約対象として選定
to_summarize = self.buffer[: self.max_buffer_messages]
self.buffer = self.buffer[self.max_buffer_messages:]
# 要約生成
conversation_text = "\n".join(
f"{'Human' if isinstance(m, HumanMessage) else 'AI'}: {m.content}"
for m in to_summarize
)
summary_prompt = f"""以下は前の対話の要約と新しい対話です。統合された要約を作成してください。
前の要約: {self.summary if self.summary else 'なし'}
新しい対話:
{conversation_text}
核心情報とコンテキストを維持しながら簡潔に要約してください:"""
response = self.llm.invoke([HumanMessage(content=summary_prompt)])
self.summary = response.content
def get_context_messages(self) -> list:
"""現在のコンテキストメッセージを返却"""
messages = []
if self.summary:
messages.append(SystemMessage(
content=f"前の対話の要約: {self.summary}"
))
messages.extend(self.buffer)
return messages
def get_stats(self) -> dict:
"""メモリ統計を返却"""
return {
"summary_length": len(self.summary),
"buffer_messages": len(self.buffer),
"has_summary": bool(self.summary),
}
# 使用例
llm = ChatOpenAI(model="gpt-4o", temperature=0)
hybrid_memory = HybridMemoryManager(llm=llm, max_buffer_messages=6)
# 対話シミュレーション
exchanges = [
("プロジェクトの日程はどうなっていますか?", "現在3月末のデプロイを目標にしています。"),
("バックエンド開発は誰が担当ですか?", "佐藤シニアエンジニアが担当しています。"),
("フロントエンドは?", "鈴木エンジニアがReactで開発中です。"),
("テスト計画はありますか?", "QAチームが4月第1週に統合テストを実施します。"),
("デプロイ環境は何ですか?", "AWS EKSベースのKubernetes環境です。"),
("CI/CDパイプラインは?", "GitHub ActionsとArgoCDを使用しています。"),
("モニタリングはどうしていますか?", "GrafanaとPrometheusでメトリクスを収集しています。"),
]
for human, ai in exchanges:
hybrid_memory.add_exchange(human, ai)
stats = hybrid_memory.get_stats()
print(f"メモリ状態: {stats}")
RAG連携メモリ
対話ベースRAGパイプライン
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.messages import HumanMessage, SystemMessage
from langchain.text_splitter import RecursiveCharacterTextSplitter
class RAGEnhancedMemory:
"""RAGで強化された対話メモリ"""
def __init__(self, llm, embeddings, collection_name="chat_memory"):
self.llm = llm
self.embeddings = embeddings
self.vectorstore = Chroma(
collection_name=collection_name,
embedding_function=embeddings,
)
self.recent_messages = []
self.max_recent = 10
def add_exchange(self, session_id: str, human_msg: str, ai_msg: str):
"""対話をベクトルストアに保存"""
# 最近メッセージバッファに追加
self.recent_messages.append(("human", human_msg))
self.recent_messages.append(("ai", ai_msg))
# ベクトルストアに対話エンベディングを保存
exchange_text = f"ユーザー: {human_msg}\nAI: {ai_msg}"
self.vectorstore.add_texts(
texts=[exchange_text],
metadatas=[{"session_id": session_id, "type": "exchange"}],
)
# 最近メッセージの上限を維持
if len(self.recent_messages) > self.max_recent * 2:
self.recent_messages = self.recent_messages[-self.max_recent * 2:]
def retrieve_relevant_context(self, query: str, k: int = 3) -> list:
"""クエリに関連する過去の対話を検索"""
results = self.vectorstore.similarity_search(query, k=k)
return [doc.page_content for doc in results]
def generate_response(self, session_id: str, user_input: str) -> str:
"""RAGベースの応答生成"""
# 関連する過去の対話を検索
relevant_context = self.retrieve_relevant_context(user_input)
# コンテキスト構成
context_parts = []
if relevant_context:
context_parts.append("関連する前の対話:")
for ctx in relevant_context:
context_parts.append(f" - {ctx}")
system_content = "あなたは前の対話コンテキストを活用して回答するAIアシスタントです。"
if context_parts:
system_content += "\n\n" + "\n".join(context_parts)
messages = [SystemMessage(content=system_content)]
# 最近のメッセージを追加
for role, content in self.recent_messages[-6:]:
if role == "human":
messages.append(HumanMessage(content=content))
else:
from langchain_core.messages import AIMessage
messages.append(AIMessage(content=content))
messages.append(HumanMessage(content=user_input))
response = self.llm.invoke(messages)
# 対話を保存
self.add_exchange(session_id, user_input, response.content)
return response.content
# 使用例
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
embeddings = OpenAIEmbeddings()
rag_memory = RAGEnhancedMemory(llm=llm, embeddings=embeddings)
セッション管理パターン
マルチテナントセッション管理
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime
import uuid
@dataclass
class ChatSession:
"""チャットセッション"""
session_id: str
user_id: str
created_at: datetime = field(default_factory=datetime.now)
last_active: datetime = field(default_factory=datetime.now)
metadata: dict = field(default_factory=dict)
is_active: bool = True
class SessionManager:
"""マルチテナントセッションマネージャー"""
def __init__(self, max_sessions_per_user: int = 5):
self.sessions: dict = {} # session_id -> ChatSession
self.user_sessions: dict = {} # user_id -> list of session_ids
self.max_sessions_per_user = max_sessions_per_user
def create_session(self, user_id: str, metadata: Optional[dict] = None) -> str:
"""新規セッション作成"""
# ユーザーごとのセッション数制限を確認
user_session_ids = self.user_sessions.get(user_id, [])
active_sessions = [
sid for sid in user_session_ids
if sid in self.sessions and self.sessions[sid].is_active
]
if len(active_sessions) >= self.max_sessions_per_user:
# 最も古いセッションを無効化
oldest = min(
active_sessions,
key=lambda sid: self.sessions[sid].last_active,
)
self.sessions[oldest].is_active = False
session_id = str(uuid.uuid4())
session = ChatSession(
session_id=session_id,
user_id=user_id,
metadata=metadata or {},
)
self.sessions[session_id] = session
if user_id not in self.user_sessions:
self.user_sessions[user_id] = []
self.user_sessions[user_id].append(session_id)
return session_id
def get_session(self, session_id: str) -> Optional[ChatSession]:
"""セッション照会"""
session = self.sessions.get(session_id)
if session and session.is_active:
session.last_active = datetime.now()
return session
return None
def list_user_sessions(self, user_id: str) -> list:
"""ユーザーセッション一覧照会"""
session_ids = self.user_sessions.get(user_id, [])
return [
self.sessions[sid]
for sid in session_ids
if sid in self.sessions and self.sessions[sid].is_active
]
def close_session(self, session_id: str):
"""セッション終了"""
if session_id in self.sessions:
self.sessions[session_id].is_active = False
# 使用例
session_mgr = SessionManager(max_sessions_per_user=3)
session_id = session_mgr.create_session("user-001", {"channel": "web"})
print(f"Created session: {session_id}")
トラブルシューティング
メモリ肥大化(Memory Bloat)の解決
対話が長くなるとメモリ使用量が急激に増加し、パフォーマンスが低下する場合がある。
class MemoryBloatGuard:
"""メモリ肥大化防止ガード"""
def __init__(self, max_messages=100, max_token_estimate=50000):
self.max_messages = max_messages
self.max_token_estimate = max_token_estimate
def check_and_trim(self, messages: list) -> tuple:
"""メモリ状態の確認とトリミング"""
total_chars = sum(len(m.content) for m in messages)
estimated_tokens = total_chars // 4 # 大まかなトークン推定
warnings = []
trimmed = messages
if len(messages) > self.max_messages:
warnings.append(
f"メッセージ数超過: {len(messages)} > {self.max_messages}"
)
# システムメッセージを保持、残りから古い部分を削除
system_msgs = [m for m in messages if isinstance(m, SystemMessage)]
non_system = [m for m in messages if not isinstance(m, SystemMessage)]
trimmed = system_msgs + non_system[-(self.max_messages - len(system_msgs)):]
if estimated_tokens > self.max_token_estimate:
warnings.append(
f"トークン推定超過: {estimated_tokens} > {self.max_token_estimate}"
)
return trimmed, warnings
# 使用
guard = MemoryBloatGuard(max_messages=50, max_token_estimate=30000)
コンテキスト混乱(Context Confusion)の防止
長い対話でAIが前の対話内容を混同し、誤った情報を提供する問題を解決する。
class ContextClarityChecker:
"""コンテキスト明確性チェッカー"""
def __init__(self, llm):
self.llm = llm
def check_ambiguity(self, user_input: str, recent_messages: list) -> dict:
"""ユーザー入力の曖昧性チェック"""
# 代名詞や指示語が含まれる場合を確認
ambiguous_patterns = ["それ", "これ", "あれ", "そこ", "ここ", "あそこ"]
has_ambiguity = any(p in user_input for p in ambiguous_patterns)
if not has_ambiguity:
return {"is_ambiguous": False, "resolved_input": user_input}
# LLMで曖昧性を解消
context_text = "\n".join(
f"{type(m).__name__}: {m.content}" for m in recent_messages[-6:]
)
clarification_prompt = f"""前の対話コンテキスト:
{context_text}
ユーザーの新しい入力: {user_input}
この入力の代名詞や指示語が何を指しているか明確にしてください。
代名詞を実際の対象に置き換えた文を返してください。"""
response = self.llm.invoke([HumanMessage(content=clarification_prompt)])
return {
"is_ambiguous": True,
"original_input": user_input,
"resolved_input": response.content,
}
運用ノート
パフォーマンス最適化のヒント
- メモリタイプの選択: 対話の長さに応じて適切なメモリタイプを選択する。10ターン以下ならBuffer、それ以上ならSummaryBufferを推奨
- ベクトルストアインデキシング: RAGメモリ使用時は適切なインデックス(HNSW、IVF)を設定して検索パフォーマンスを最適化
- Redis TTL管理: セッション有効期限を適切に設定してメモリリークを防止
- 非同期要約: 対話要約は応答返却後に非同期で実行し、レイテンシを削減
セキュリティ考慮事項
- セッションIDにUUID v4を使用して予測不可能性を保証
- 対話内容を暗号化して保存(AES-256)
- ユーザー間のセッション分離を徹底
- PII(個人識別情報)のマスキング処理を適用
- 対話履歴の保持期間ポリシーを策定
プロダクションチェックリスト
- [ ] 対話の長さに適したメモリタイプの選択完了
- [ ] セッション管理システムの実装(作成、照会、有効期限、削除)
- [ ] 永続ストレージ(Redis/PostgreSQL)の連携と障害対応
- [ ] メモリ肥大化防止ロジックの実装(最大メッセージ数、トークン制限)
- [ ] コンテキスト圧縮(要約)パイプラインの構築
- [ ] マルチテナントセッション分離テストの合格
- [ ] PIIマスキングと対話暗号化の適用
- [ ] セッション有効期限ポリシーとTTLの設定
- [ ] 対話履歴のバックアップと保持ポリシーの策定
- [ ] モニタリングダッシュボードの構築(セッション数、メモリ使用量、応答時間)
- [ ] 負荷テストの実施(同時セッション数、対話長さ別パフォーマンス)
- [ ] 障害復旧シナリオテスト(Redisダウン、DB接続失敗)