- Published on
マルチターン会話管理とコンテキスト最適化: LLMチャットボットのMemoryパターン·会話要約·Sliding Window戦略
- Authors
- Name
- はじめに
- コンテキストウィンドウの制限とコスト分析
- メモリパターン比較分析
- Sliding Window戦略の深掘り
- 会話要約技法
- LangChain / LlamaIndex 実践実装
- ベクトルDBベースの永続メモリ
- コンテキストドリフトとHallucination対応
- プロダクションアーキテクチャパターン
- 運用時の注意事項
- メモリパターン選択ガイド
- まとめ
- 参考資料

はじめに
LLMベースのチャットボットにおける最も根本的な課題は、マルチターン会話でコンテキストを効果的に管理することである。LLMは本質的にステートレス(Stateless)であるため、API呼び出しのたびに全会話履歴を一緒に送信する必要がある。しかし、コンテキストウィンドウは有限であり、トークンコストは会話の長さに比例して増加する。
GPT-4oの128Kトークン、Claudeの200Kトークンという大型コンテキストウィンドウがあるが、実務では数百ターンの顧客サポート会話や長時間のテクニカルサポートセッションでこの制限に容易に到達する。さらに「Lost in the Middle」現象により、長いコンテキストの中間部分はモデルが適切に活用できない問題もある。
この記事では、LLMチャットボットの多様なメモリパターン(Buffer、Summary、Vector Store)、Sliding Window戦略、会話要約技法、トークンコスト最適化、そしてプロダクション環境でのアーキテクチャパターンを実践コードとともに解説する。
コンテキストウィンドウの制限とコスト分析
主要LLMコンテキストウィンドウ比較
| モデル | コンテキストウィンドウ | 入力コスト (1Mトークン) | 出力コスト (1Mトークン) | 備考 |
|---|---|---|---|---|
| GPT-4o | 128K | 2.50 USD | 10.00 USD | 汎用 |
| GPT-4o-mini | 128K | 0.15 USD | 0.60 USD | 軽量 |
| Claude 3.5 Sonnet | 200K | 3.00 USD | 15.00 USD | 長いコンテキスト |
| Gemini 1.5 Pro | 2M | 1.25 USD | 5.00 USD | 最大ウィンドウ |
| Llama 3.1 405B | 128K | セルフホスト | セルフホスト | オープンソース |
トークンバジェット設計
会話ごとのトークンバジェットを設計する際は、システムプロンプト、会話履歴、応答スペースを分離して管理する必要がある。
import tiktoken
class TokenBudgetManager:
"""トークンバジェット管理クラス"""
def __init__(self, model: str = "gpt-4o", max_context: int = 128000):
self.encoding = tiktoken.encoding_for_model(model)
self.max_context = max_context
# バジェット配分: システム15%、会話履歴60%、応答25%
self.system_budget = int(max_context * 0.15)
self.history_budget = int(max_context * 0.60)
self.response_budget = int(max_context * 0.25)
def count_tokens(self, text: str) -> int:
"""テキストのトークン数を計算"""
return len(self.encoding.encode(text))
def count_message_tokens(self, messages: list[dict]) -> int:
"""メッセージリストの合計トークン数を計算"""
total = 0
for msg in messages:
total += self.count_tokens(msg["content"])
total += 4 # メッセージメタデータオーバーヘッド
total += 2 # 開始/終了トークン
return total
def get_available_history_tokens(self, system_tokens: int) -> int:
"""会話履歴に使用可能なトークン数を返す"""
used = system_tokens + self.response_budget
return self.max_context - used
def should_summarize(self, history_tokens: int) -> bool:
"""会話履歴がバジェットの80%を超えた場合、要約を推奨"""
return history_tokens > self.history_budget * 0.8
# 使用例
budget = TokenBudgetManager(model="gpt-4o")
system_prompt = "あなたはカスタマーサポート専門AIです..."
system_tokens = budget.count_tokens(system_prompt)
print(f"システムプロンプト: {system_tokens} トークン")
print(f"会話履歴バジェット: {budget.history_budget} トークン")
print(f"応答バジェット: {budget.response_budget} トークン")
コスト増加シミュレーション
import matplotlib.pyplot as plt
import numpy as np
def calculate_cost_per_turn(turns: int, avg_tokens_per_turn: int = 200,
input_cost_per_1m: float = 2.50) -> float:
"""ターン数に応じた累積入力コストを計算"""
# 毎ターン全履歴を送信すると仮定
total_tokens = 0
cumulative_cost = 0
for t in range(1, turns + 1):
total_tokens = t * avg_tokens_per_turn # 現在のターンの入力トークン
turn_cost = (total_tokens / 1_000_000) * input_cost_per_1m
cumulative_cost += turn_cost
return cumulative_cost
# メモリ戦略別コスト比較
turns = np.arange(1, 101)
cost_no_memory = [calculate_cost_per_turn(t) for t in turns]
# Sliding Window(直近20ターンのみ保持)
cost_sliding = [calculate_cost_per_turn(min(t, 20)) for t in turns]
# Summary Memory(要約により1/5に圧縮)
cost_summary = [calculate_cost_per_turn(t, avg_tokens_per_turn=40) for t in turns]
print(f"100ターン会話コスト(メモリなし): ${cost_no_memory[-1]:.4f}")
print(f"100ターン会話コスト(Sliding Window): ${cost_sliding[-1]:.4f}")
print(f"100ターン会話コスト(Summary): ${cost_summary[-1]:.4f}")
メモリパターン比較分析
パターン別特性比較
| メモリパターン | トークン使用量 | 情報保存 | レイテンシ | 実装複雑度 | 適したケース |
|---|---|---|---|---|---|
| Buffer Memory | O(n) 線形増加 | 100% | 低 | 低 | 短い会話 |
| Window Memory | O(k) 固定 | 直近kターン | 低 | 低 | 一般チャットボット |
| Summary Memory | O(1) 固定 | 要約のみ | 中 | 中 | 長い会話 |
| Summary Buffer | O(k+1) | 要約 + 直近 | 中 | 中 | バランス型 |
| Vector Store | O(k) 検索 | 意味ベース | 高 | 高 | 知識集約型 |
| Entity Memory | O(e) エンティティ数 | エンティティ別 | 中 | 高 | パーソナライズ |
1. Buffer Memory - 全履歴保存
最もシンプルなパターンで、全会話履歴をそのまま保持する。
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
# Buffer Memory: 全会話をそのまま保存
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
memory = ConversationBufferMemory(return_messages=True)
chain = ConversationChain(llm=llm, memory=memory, verbose=True)
# 会話進行
response1 = chain.predict(input="こんにちは、サーバー障害が発生しました")
response2 = chain.predict(input="ログを確認したところOOMエラーがあります")
response3 = chain.predict(input="現在のメモリ使用量はどう確認しますか?")
# メモリに保存された全履歴を確認
for msg in memory.chat_memory.messages:
role = "User" if msg.type == "human" else "AI"
print(f"[{role}] {msg.content[:80]}...")
制限: 会話が長くなるほどトークン使用量が線形に増加し、コストとレイテンシが急増する。
2. Sliding Window Memory - 直近Nターンのみ保持
固定サイズのウィンドウを維持しながら古い会話を削除する。
from langchain.memory import ConversationBufferWindowMemory
# 直近10メッセージ(5ペア)のみ保持するSliding Window
window_memory = ConversationBufferWindowMemory(
k=10, # 直近10メッセージを保持
return_messages=True
)
chain = ConversationChain(llm=llm, memory=window_memory, verbose=True)
# トークンベースWindow実装(カスタム)
class TokenWindowMemory:
"""トークン数ベースで会話履歴を管理するメモリ"""
def __init__(self, max_tokens: int = 4000, model: str = "gpt-4o"):
self.max_tokens = max_tokens
self.encoding = tiktoken.encoding_for_model(model)
self.messages: list[dict] = []
def add_message(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
self._trim()
def _trim(self):
"""トークン制限を超えた場合、最も古いメッセージから削除"""
while self._total_tokens() > self.max_tokens and len(self.messages) > 2:
# 最初のシステムメッセージは保持
self.messages.pop(0 if self.messages[0]["role"] != "system" else 1)
def _total_tokens(self) -> int:
return sum(
len(self.encoding.encode(m["content"])) + 4
for m in self.messages
)
def get_messages(self) -> list[dict]:
return self.messages.copy()
# 使用例
token_window = TokenWindowMemory(max_tokens=4000)
token_window.add_message("system", "あなたはテクニカルサポート専門家です。")
token_window.add_message("user", "Dockerコンテナが再起動を繰り返しています。")
token_window.add_message("assistant", "OOMKilled状態かどうか確認しましょう...")
print(f"現在のトークン使用量: {token_window._total_tokens()}")
3. Summary Memory - 会話要約による圧縮
LLMを使用して以前の会話を要約し、要約文をコンテキストとして活用する。
from langchain.memory import ConversationSummaryMemory
# Summary Memory: LLMで会話を自動要約
summary_memory = ConversationSummaryMemory(
llm=ChatOpenAI(model="gpt-4o-mini", temperature=0), # 要約用軽量モデル
return_messages=True
)
# Summary Buffer Memory: 要約 + 直近会話の結合
from langchain.memory import ConversationSummaryBufferMemory
summary_buffer = ConversationSummaryBufferMemory(
llm=ChatOpenAI(model="gpt-4o-mini", temperature=0),
max_token_limit=2000, # この制限を超えると古いメッセージを要約
return_messages=True
)
# カスタムProgressive Summarization実装
class ProgressiveSummarizer:
"""段階的要約: 会話が蓄積されるにつれ段階的に要約を実行"""
def __init__(self, llm, summarize_threshold: int = 10):
self.llm = llm
self.summarize_threshold = summarize_threshold
self.summary = ""
self.recent_messages: list[dict] = []
self.turn_count = 0
async def add_exchange(self, user_msg: str, ai_msg: str):
self.recent_messages.append({"role": "user", "content": user_msg})
self.recent_messages.append({"role": "assistant", "content": ai_msg})
self.turn_count += 1
if self.turn_count % self.summarize_threshold == 0:
await self._summarize()
async def _summarize(self):
"""直近の会話を既存の要約に統合"""
messages_text = "\n".join(
f"{m['role']}: {m['content']}" for m in self.recent_messages
)
prompt = f"""以前の要約:
{self.summary if self.summary else '(なし)'}
最近の会話:
{messages_text}
上記の以前の要約と最近の会話を統合し、核心情報を保存する
簡潔な要約を作成してください。ユーザーの名前、好み、未解決の問題を
必ず含めてください。"""
response = await self.llm.ainvoke(prompt)
self.summary = response.content
self.recent_messages = self.recent_messages[-4:] # 直近2ターンのみ保持
def get_context(self) -> str:
parts = []
if self.summary:
parts.append(f"[会話要約]\n{self.summary}")
if self.recent_messages:
recent = "\n".join(
f"{m['role']}: {m['content']}" for m in self.recent_messages
)
parts.append(f"[最近の会話]\n{recent}")
return "\n\n".join(parts)
4. Vector Store Memory - 意味ベース検索
会話履歴をベクトル埋め込みとして保存し、現在の質問と意味的に類似した過去の会話を検索する。
from langchain.memory import VectorStoreRetrieverMemory
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
# Vector Storeベースのメモリ設定
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma(
collection_name="conversation_memory",
embedding_function=embeddings,
persist_directory="./chroma_memory"
)
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 5} # 最も関連性の高い5つの会話を検索
)
vector_memory = VectorStoreRetrieverMemory(
retriever=retriever,
memory_key="relevant_history",
input_key="input"
)
# 会話を保存
vector_memory.save_context(
{"input": "プロジェクトAのデプロイスケジュールはどうなっていますか?"},
{"output": "プロジェクトAは3月15日にステージング、3月20日にプロダクションデプロイ予定です。"}
)
vector_memory.save_context(
{"input": "データベースマイグレーションはいつですか?"},
{"output": "DBマイグレーションは3月18日午前2時に実施予定です。"}
)
# 関連会話を検索
relevant = vector_memory.load_memory_variables(
{"input": "プロジェクトAデプロイ前に確認すべき事項は?"}
)
print(relevant["relevant_history"])
Sliding Window戦略の深掘り
適応型Sliding Window
固定サイズではなく、会話の重要度に応じて動的にウィンドウを調整する戦略である。
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import hashlib
@dataclass
class ConversationTurn:
role: str
content: str
timestamp: datetime
importance: float = 0.5 # 0.0 ~ 1.0
tokens: int = 0
turn_id: str = ""
def __post_init__(self):
if not self.turn_id:
self.turn_id = hashlib.md5(
f"{self.timestamp}{self.content[:50]}".encode()
).hexdigest()[:8]
class AdaptiveSlidingWindow:
"""重要度ベースの適応型スライディングウィンドウ"""
def __init__(self, max_tokens: int = 8000, min_turns: int = 4):
self.max_tokens = max_tokens
self.min_turns = min_turns # 最低保持ターン数
self.turns: list[ConversationTurn] = []
self.archived: list[ConversationTurn] = []
def add_turn(self, turn: ConversationTurn):
self.turns.append(turn)
self._optimize()
def _calculate_importance(self, turn: ConversationTurn, index: int) -> float:
"""ターンの重要度を多次元で計算"""
score = turn.importance
# 最近のターンほど高い重み
recency = index / max(len(self.turns) - 1, 1)
score += recency * 0.3
# 質問を含むターンは重要度上昇
if "?" in turn.content or "どう" in turn.content:
score += 0.2
# エラー/障害関連キーワード
critical_keywords = ["エラー", "障害", "失敗", "緊急", "error", "fail"]
if any(kw in turn.content.lower() for kw in critical_keywords):
score += 0.3
return min(score, 1.0)
def _optimize(self):
"""トークン制限内で重要なターンを優先保持"""
total_tokens = sum(t.tokens for t in self.turns)
if total_tokens <= self.max_tokens:
return
# 重要度スコア計算
scored = [
(i, self._calculate_importance(t, i), t)
for i, t in enumerate(self.turns)
]
# 直近min_turnsは必ず保持
protected = self.turns[-self.min_turns:]
candidates = scored[:-self.min_turns]
# 重要度の低い順にソートして削除
candidates.sort(key=lambda x: x[1])
while total_tokens > self.max_tokens and candidates:
_, _, turn = candidates.pop(0)
self.archived.append(turn)
self.turns.remove(turn)
total_tokens -= turn.tokens
def get_context(self) -> list[dict]:
return [
{"role": t.role, "content": t.content}
for t in self.turns
]
時間ベースウィンドウとトークンベースウィンドウの比較
class TimeBasedWindow:
"""時間ベースのスライディングウィンドウ - 直近N分以内の会話のみ保持"""
def __init__(self, window_minutes: int = 30):
self.window_minutes = window_minutes
self.messages: list[dict] = []
def add_message(self, role: str, content: str):
self.messages.append({
"role": role,
"content": content,
"timestamp": datetime.now()
})
self._cleanup()
def _cleanup(self):
cutoff = datetime.now() - timedelta(minutes=self.window_minutes)
self.messages = [
m for m in self.messages
if m["timestamp"] > cutoff
]
def get_messages(self) -> list[dict]:
return [
{"role": m["role"], "content": m["content"]}
for m in self.messages
]
class HybridWindow:
"""トークン + 時間ハイブリッドウィンドウ"""
def __init__(self, max_tokens: int = 4000, max_minutes: int = 60):
self.max_tokens = max_tokens
self.max_minutes = max_minutes
self.token_window = TokenWindowMemory(max_tokens=max_tokens)
self.time_window = TimeBasedWindow(window_minutes=max_minutes)
def add_message(self, role: str, content: str):
self.token_window.add_message(role, content)
self.time_window.add_message(role, content)
def get_messages(self) -> list[dict]:
# 両ウィンドウの共通部分を使用(より厳格なフィルタリング)
token_msgs = set(
m["content"] for m in self.token_window.get_messages()
)
time_msgs = self.time_window.get_messages()
return [m for m in time_msgs if m["content"] in token_msgs]
会話要約技法
要約戦略比較
| 戦略 | 要約タイミング | トークン節約率 | 情報損失 | 追加コスト |
|---|---|---|---|---|
| 毎ターン要約 | 毎交換後 | 80-90% | 中 | 高 |
| 閾値要約 | Nターンごと | 60-80% | 低 | 中 |
| 階層的要約 | 段階別 | 70-85% | 非常に低 | 中 |
| 選択的要約 | 重要度ベース | 50-70% | 最小 | 低 |
階層的要約システム実装
from enum import Enum
from typing import Any
class MemoryTier(Enum):
SHORT_TERM = "short_term" # 直近会話の原文
MID_TERM = "mid_term" # セッション要約
LONG_TERM = "long_term" # 核心事実/好み
class HierarchicalMemory:
"""3階層メモリアーキテクチャ"""
def __init__(self, llm, short_term_limit: int = 10,
mid_term_limit: int = 5):
self.llm = llm
self.short_term_limit = short_term_limit
self.mid_term_limit = mid_term_limit
self.short_term: list[dict] = [] # 直近の原文メッセージ
self.mid_term: list[str] = [] # セッション要約
self.long_term: dict[str, Any] = { # 永久保存情報
"user_name": None,
"preferences": [],
"key_facts": [],
"unresolved_issues": []
}
async def add_exchange(self, user_msg: str, ai_msg: str):
# 1. 短期メモリに追加
self.short_term.append({"role": "user", "content": user_msg})
self.short_term.append({"role": "assistant", "content": ai_msg})
# 2. 短期メモリが制限を超えた場合、中期に昇格
if len(self.short_term) > self.short_term_limit * 2:
await self._promote_to_mid_term()
# 3. 中期メモリが制限を超えた場合、長期に抽出
if len(self.mid_term) > self.mid_term_limit:
await self._extract_to_long_term()
async def _promote_to_mid_term(self):
"""短期 -> 中期: 古いメッセージを要約して昇格"""
old_messages = self.short_term[:-6] # 直近3ターンを除外
self.short_term = self.short_term[-6:]
text = "\n".join(f"{m['role']}: {m['content']}" for m in old_messages)
prompt = f"次の会話を3-4文で要約してください:\n\n{text}"
response = await self.llm.ainvoke(prompt)
self.mid_term.append(response.content)
async def _extract_to_long_term(self):
"""中期 -> 長期: 核心事実を抽出して永久保存"""
summaries = "\n\n".join(self.mid_term[:-2])
self.mid_term = self.mid_term[-2:]
prompt = f"""次の会話要約から核心情報をJSONで抽出してください:
{summaries}
抽出項目:
- user_preferences: ユーザーの好み
- key_facts: 核心事実
- unresolved_issues: 未解決の問題"""
response = await self.llm.ainvoke(prompt)
# JSONパース後long_termにマージ(本番ではエラー処理が必要)
import json
try:
extracted = json.loads(response.content)
self.long_term["preferences"].extend(
extracted.get("user_preferences", [])
)
self.long_term["key_facts"].extend(
extracted.get("key_facts", [])
)
self.long_term["unresolved_issues"] = extracted.get(
"unresolved_issues", []
)
except json.JSONDecodeError:
pass # パース失敗時は無視
def build_context(self) -> str:
"""全コンテキストを組み立てて返す"""
parts = []
# 長期メモリ(常に含む)
if any(self.long_term.values()):
lt = self.long_term
facts = "\n".join(f"- {f}" for f in lt["key_facts"][-10:])
prefs = ", ".join(lt["preferences"][-5:])
issues = "\n".join(f"- {i}" for i in lt["unresolved_issues"])
parts.append(
f"[ユーザープロフィール]\n名前: {lt['user_name']}\n"
f"好み: {prefs}\n核心事実:\n{facts}\n"
f"未解決の問題:\n{issues}"
)
# 中期メモリ(セッション要約)
if self.mid_term:
parts.append(
"[以前の会話要約]\n" + "\n---\n".join(self.mid_term)
)
# 短期メモリ(直近の原文)
if self.short_term:
recent = "\n".join(
f"{m['role']}: {m['content']}" for m in self.short_term
)
parts.append(f"[最近の会話]\n{recent}")
return "\n\n".join(parts)
LangChain / LlamaIndex 実践実装
LangChain LCELベースのメモリ実装
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnablePassthrough
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import RedisChatMessageHistory
# LCELベースのチェーン構成
prompt = ChatPromptTemplate.from_messages([
("system", "あなたは親切なテクニカルサポート専門家です。"
"以前の会話内容を参考に一貫した応答を提供してください。"),
MessagesPlaceholder(variable_name="history"),
("human", "{input}")
])
chain = prompt | ChatOpenAI(model="gpt-4o", temperature=0.7)
# Redisベースの永続セッション管理
def get_session_history(session_id: str):
return RedisChatMessageHistory(
session_id=session_id,
url="redis://localhost:6379"
)
# メッセージ履歴が統合されたチェーン
chain_with_history = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="input",
history_messages_key="history"
)
# セッション別会話
config = {"configurable": {"session_id": "user-123-session-456"}}
response = chain_with_history.invoke(
{"input": "KubernetesのPodがCrashLoopBackOff状態です"},
config=config
)
print(response.content)
LlamaIndex ChatMemoryBuffer実装
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.chat_engine import SimpleChatEngine
from llama_index.llms.openai import OpenAI
# LlamaIndexメモリバッファ設定
memory = ChatMemoryBuffer.from_defaults(token_limit=4000)
llm = OpenAI(model="gpt-4o", temperature=0.7)
chat_engine = SimpleChatEngine.from_defaults(
llm=llm,
memory=memory,
system_prompt="あなたはDevOpsエンジニア専門チャットボットです。"
)
# 会話進行
response1 = chat_engine.chat("CI/CDパイプラインが失敗しました")
response2 = chat_engine.chat("エラーログをお見せします: connection timeout")
response3 = chat_engine.chat("先ほどお話しした問題の解決方法は?")
# メモリ状態確認
print(f"メモリ内メッセージ数: {len(memory.get_all())}")
ベクトルDBベースの永続メモリ
Pineconeを活用した長期メモリアーキテクチャ
from pinecone import Pinecone
from langchain_openai import OpenAIEmbeddings
from datetime import datetime
import json
import uuid
class PersistentConversationMemory:
"""Pineconeベースの永続会話メモリ"""
def __init__(self, index_name: str = "conversation-memory"):
self.pc = Pinecone()
self.index = self.pc.Index(index_name)
self.embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
def store_exchange(self, user_id: str, session_id: str,
user_msg: str, ai_msg: str,
metadata: dict = None):
"""会話交換をベクトルDBに保存"""
exchange_text = f"User: {user_msg}\nAssistant: {ai_msg}"
embedding = self.embeddings.embed_query(exchange_text)
record_metadata = {
"user_id": user_id,
"session_id": session_id,
"user_message": user_msg[:500],
"ai_message": ai_msg[:500],
"timestamp": datetime.now().isoformat(),
"type": "exchange"
}
if metadata:
record_metadata.update(metadata)
self.index.upsert(vectors=[{
"id": str(uuid.uuid4()),
"values": embedding,
"metadata": record_metadata
}])
def recall(self, user_id: str, query: str,
top_k: int = 5) -> list[dict]:
"""現在の質問に関連する過去の会話を検索"""
query_embedding = self.embeddings.embed_query(query)
results = self.index.query(
vector=query_embedding,
top_k=top_k,
filter={"user_id": {"$eq": user_id}},
include_metadata=True
)
return [
{
"user_message": match.metadata["user_message"],
"ai_message": match.metadata["ai_message"],
"timestamp": match.metadata["timestamp"],
"relevance": match.score
}
for match in results.matches
]
def build_memory_context(self, user_id: str, query: str) -> str:
"""検索された過去の会話をコンテキスト文字列に組み立て"""
memories = self.recall(user_id, query)
if not memories:
return ""
lines = ["[関連する過去の会話]"]
for m in memories:
lines.append(f"({m['timestamp'][:10]}) "
f"User: {m['user_message']}")
lines.append(f" AI: {m['ai_message']}")
lines.append("")
return "\n".join(lines)
コンテキストドリフトとHallucination対応
問題パターンと検出
会話が長くなると、2つの主要な問題が発生する。
- コンテキストドリフト: 初期会話の意図から徐々に離れる現象
- 古いコンテキストベースのHallucination: 要約過程で歪んだ情報による幻覚
class ContextDriftDetector:
"""コンテキストドリフトを検出するモジュール"""
def __init__(self, embeddings, drift_threshold: float = 0.3):
self.embeddings = embeddings
self.drift_threshold = drift_threshold
self.initial_topic_embedding = None
self.recent_embeddings: list[list[float]] = []
def set_initial_topic(self, first_message: str):
"""会話の初期トピックを設定"""
self.initial_topic_embedding = self.embeddings.embed_query(
first_message
)
def check_drift(self, current_message: str) -> dict:
"""現在のメッセージが初期トピックからどれだけ逸脱したか測定"""
current_embedding = self.embeddings.embed_query(current_message)
self.recent_embeddings.append(current_embedding)
if self.initial_topic_embedding is None:
self.set_initial_topic(current_message)
return {"drifted": False, "similarity": 1.0}
similarity = self._cosine_similarity(
self.initial_topic_embedding, current_embedding
)
return {
"drifted": similarity < self.drift_threshold,
"similarity": similarity,
"suggestion": (
"会話トピックが大きく変更されました。"
"新しいセッションを開始するか、コンテキストをリセットすることを推奨します。"
if similarity < self.drift_threshold else None
)
}
@staticmethod
def _cosine_similarity(a: list[float], b: list[float]) -> float:
import numpy as np
a, b = np.array(a), np.array(b)
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
要約精度の検証
class SummaryValidator:
"""会話要約の精度を検証"""
def __init__(self, llm):
self.llm = llm
async def validate_summary(self, original_messages: list[dict],
summary: str) -> dict:
"""原本会話に対する要約の忠実度を検証"""
original_text = "\n".join(
f"{m['role']}: {m['content']}" for m in original_messages
)
prompt = f"""原本会話と要約を比較して以下を評価してください:
1. 核心情報保存率 (0-100)
2. 歪んだ情報の有無
3. 欠落した重要情報
原本会話:
{original_text}
要約:
{summary}
JSON形式で応答してください。"""
response = await self.llm.ainvoke(prompt)
try:
result = json.loads(response.content)
return result
except json.JSONDecodeError:
return {"error": "検証結果のパースに失敗"}
プロダクションアーキテクチャパターン
全体アーキテクチャ
# docker-compose.yml - プロダクション会話メモリスタック
version: '3.8'
services:
chat-api:
image: chat-service:latest
ports:
- '8000:8000'
environment:
- REDIS_URL=redis://redis:6379
- PINECONE_API_KEY=pk-xxx
- OPENAI_API_KEY=sk-xxx
depends_on:
- redis
- postgres
redis:
image: redis:7-alpine
ports:
- '6379:6379'
volumes:
- redis-data:/data
command: redis-server --appendonly yes
postgres:
image: pgvector/pgvector:pg16
environment:
POSTGRES_DB: chatbot
POSTGRES_USER: admin
POSTGRES_PASSWORD: secure-password
volumes:
- pg-data:/var/lib/postgresql/data
ports:
- '5432:5432'
volumes:
redis-data:
pg-data:
FastAPIベースの会話サーバー
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import redis.asyncio as redis
import json
app = FastAPI(title="Multi-Turn Chat API")
# Redis接続
redis_client = redis.from_url("redis://localhost:6379", decode_responses=True)
class ChatRequest(BaseModel):
user_id: str
session_id: str
message: str
class ChatResponse(BaseModel):
reply: str
session_id: str
turn_count: int
tokens_used: int
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
"""マルチターン会話エンドポイント"""
session_key = f"session:{request.user_id}:{request.session_id}"
# 1. セッション履歴をロード
history_raw = await redis_client.lrange(session_key, 0, -1)
history = [json.loads(h) for h in history_raw]
# 2. メモリ管理 (Sliding Window + Summary)
manager = SessionMemoryManager(max_turns=20, summary_threshold=15)
context = await manager.prepare_context(history, request.message)
# 3. LLM呼び出し
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
messages = context + [{"role": "user", "content": request.message}]
response = await llm.ainvoke(messages)
# 4. 履歴を保存
await redis_client.rpush(
session_key,
json.dumps({"role": "user", "content": request.message})
)
await redis_client.rpush(
session_key,
json.dumps({"role": "assistant", "content": response.content})
)
# 5. TTL設定(24時間)
await redis_client.expire(session_key, 86400)
turn_count = len(history) // 2 + 1
return ChatResponse(
reply=response.content,
session_id=request.session_id,
turn_count=turn_count,
tokens_used=response.response_metadata.get("token_usage", {}).get(
"total_tokens", 0
)
)
運用時の注意事項
モニタリングチェックリスト
- トークン使用量モニタリング: セッションあたりの平均/最大トークン消費を追跡し、異常な急増時にアラートを設定する。
- 要約品質検証: 定期的に要約結果をサンプリングして情報損失の有無を確認する。
- コンテキストドリフト追跡: セッションが長くなる際のトピック逸脱率をモニタリングする。
- レイテンシ分析: メモリ検索/要約段階のレイテンシが全体応答時間に与える影響を測定する。
- コスト追跡: メモリ管理用LLM呼び出し(要約等)のコストを別途追跡する。
よくある障害ケースと復旧手順
class MemoryRecoveryHandler:
"""メモリ関連障害復旧ハンドラ"""
async def handle_token_overflow(self, session_id: str):
"""トークン制限超過時の緊急対処"""
# 1. 直近5ターンのみ残して緊急要約
# 2. 要約失敗時は直近3ターンのみ保持して残りを破棄
# 3. ユーザーにコンテキスト縮小を通知
pass
async def handle_summary_failure(self, session_id: str):
"""要約LLM呼び出し失敗時"""
# 1. リトライ(最大3回、バックオフ)
# 2. フォールバック: 単純メッセージ数ベースウィンドウに切り替え
# 3. 要約なしで直近会話のみで進行
pass
async def handle_vector_db_failure(self, session_id: str):
"""ベクトルDB接続失敗時"""
# 1. ローカルキャッシュから直近会話を提供
# 2. Redis短期メモリにフォールバック
# 3. ベクトル検索なしで基本会話を進行
pass
async def handle_context_drift(self, session_id: str, drift_score: float):
"""コンテキストドリフト検出時"""
# 1. ユーザーにトピック変更を通知
# 2. 新規セッション開始を提案
# 3. 現在のトピック基準でコンテキストを再構成
pass
パフォーマンス最適化のヒント
# Redisメモリ使用量モニタリング
redis-cli INFO memory | grep used_memory_human
# セッション別メモリサイズ確認
redis-cli DEBUG OBJECT "session:user-123:session-456"
# 期限切れセッションのクリーンアップ
redis-cli --scan --pattern "session:*" | while read key; do
ttl=$(redis-cli TTL "$key")
if [ "$ttl" -eq "-1" ]; then
echo "No TTL set for $key"
fi
done
メモリパターン選択ガイド
ユースケース別推奨
| ユースケース | 推奨パターン | 理由 |
|---|---|---|
| シンプルなFAQボット | Buffer Window (k=5) | 短い会話、最小コスト |
| カスタマーサポートチャットボット | Summary Buffer + Entity | 長い会話、顧客情報追跡 |
| テクニカルサポートエージェント | Hierarchical + Vector | 過去の問題検索が必要 |
| パーソナルアシスタントボット | Full Hierarchical | 長期記憶、パーソナライズ |
| コードレビューボット | Token Window | コードコンテキスト最大化 |
意思決定フローチャート
会話の長さは?
|
+-- 5ターン以下 --> Buffer Memory
|
+-- 5〜30ターン --> パーソナライズが必要?
| |
| +-- No --> Sliding Window
| +-- Yes --> Summary Buffer + Entity
|
+-- 30ターン以上 --> 過去の会話検索が必要?
|
+-- No --> Hierarchical Memory
+-- Yes --> Hierarchical + Vector Store
まとめ
マルチターン会話管理は、LLMチャットボットの品質を決定する核心要素である。単純にすべての会話をコンテキストに入れる方式は、コストとパフォーマンスの面で持続可能ではない。Buffer、Summary、Vector Storeなど多様なメモリパターンを理解し、ユースケースに合った戦略を選択する必要がある。
Sliding Windowは最も実用的な基本戦略であり、ここに会話要約とベクトル検索を組み合わせることで、長い会話でも高い品質を維持できる。階層的メモリアーキテクチャは人間の記憶構造を模倣し、短期/中期/長期記憶を分離管理することで、トークン効率と情報保存のバランスを最適化する。
プロダクション環境では、Redisを活用したセッション管理、Pinecone/ChromaなどのベクトルDBを活用した永続メモリ、そして障害復旧戦略が必須である。トークン使用量と要約品質を継続的にモニタリングし、コンテキストドリフトへの対応策を準備しておこう。
参考資料
- LangChain Conversational Memory - Pinecone
- Context Window Management - Redis Blog
- Context Window Management Strategies for Long-Context AI Agents - Maxim AI
- AI Agent Memory Architecture - IBM
- LLM Chat History Summarization Guide - Mem0
- Top Techniques to Manage Context Length in LLMs - Agenta
- LangChain Memory Tutorial - Aurelio AI