- Published on
Multi-Turn Conversation Management and Context Optimization: LLM Chatbot Memory Patterns, Conversation Summarization, and Sliding Window Strategies
- Authors
- Name
- Introduction
- Context Window Limitations and Cost Analysis
- Memory Pattern Comparative Analysis
- Deep Dive into Sliding Window Strategies
- Conversation Summarization Techniques
- LangChain / LlamaIndex Production Implementation
- Vector DB-Based Persistent Memory
- Context Drift and Hallucination Mitigation
- Production Architecture Patterns
- Operational Notes
- Memory Pattern Selection Guide
- Conclusion
- References

Introduction
The most fundamental challenge in LLM-based chatbots is effectively managing context in multi-turn conversations. Since LLMs are inherently stateless, the entire conversation history must be sent with every API call. However, context windows are finite, and token costs increase proportionally with conversation length.
Even with large context windows like GPT-4o's 128K tokens or Claude's 200K tokens, production environments can easily reach these limits during hundreds of turns of customer support conversations or extended technical support sessions. Moreover, the "Lost in the Middle" phenomenon means models cannot effectively utilize information buried in the middle of long contexts.
This article covers various LLM chatbot memory patterns (Buffer, Summary, Vector Store), Sliding Window strategies, conversation summarization techniques, token cost optimization, and production architecture patterns with practical code examples.
Context Window Limitations and Cost Analysis
Major LLM Context Window Comparison
| Model | Context Window | Input Cost (1M tokens) | Output Cost (1M tokens) | Notes |
|---|---|---|---|---|
| GPT-4o | 128K | 2.50 USD | 10.00 USD | General purpose |
| GPT-4o-mini | 128K | 0.15 USD | 0.60 USD | Lightweight |
| Claude 3.5 Sonnet | 200K | 3.00 USD | 15.00 USD | Long context |
| Gemini 1.5 Pro | 2M | 1.25 USD | 5.00 USD | Largest window |
| Llama 3.1 405B | 128K | Self-hosted | Self-hosted | Open source |
Token Budget Design
When designing per-conversation token budgets, you need to separately manage system prompts, conversation history, and response space.
import tiktoken
class TokenBudgetManager:
"""Token budget management class"""
def __init__(self, model: str = "gpt-4o", max_context: int = 128000):
self.encoding = tiktoken.encoding_for_model(model)
self.max_context = max_context
# Budget allocation: System 15%, History 60%, Response 25%
self.system_budget = int(max_context * 0.15)
self.history_budget = int(max_context * 0.60)
self.response_budget = int(max_context * 0.25)
def count_tokens(self, text: str) -> int:
"""Count tokens in text"""
return len(self.encoding.encode(text))
def count_message_tokens(self, messages: list[dict]) -> int:
"""Count total tokens in message list"""
total = 0
for msg in messages:
total += self.count_tokens(msg["content"])
total += 4 # Message metadata overhead
total += 2 # Start/end tokens
return total
def get_available_history_tokens(self, system_tokens: int) -> int:
"""Return available tokens for conversation history"""
used = system_tokens + self.response_budget
return self.max_context - used
def should_summarize(self, history_tokens: int) -> bool:
"""Recommend summarization when history exceeds 80% of budget"""
return history_tokens > self.history_budget * 0.8
# Usage example
budget = TokenBudgetManager(model="gpt-4o")
system_prompt = "You are a customer support AI specialist..."
system_tokens = budget.count_tokens(system_prompt)
print(f"System prompt: {system_tokens} tokens")
print(f"History budget: {budget.history_budget} tokens")
print(f"Response budget: {budget.response_budget} tokens")
Cost Escalation Simulation
import matplotlib.pyplot as plt
import numpy as np
def calculate_cost_per_turn(turns: int, avg_tokens_per_turn: int = 200,
input_cost_per_1m: float = 2.50) -> float:
"""Calculate cumulative input cost by turn count"""
# Assumes entire history is sent with each turn
total_tokens = 0
cumulative_cost = 0
for t in range(1, turns + 1):
total_tokens = t * avg_tokens_per_turn # Input tokens for current turn
turn_cost = (total_tokens / 1_000_000) * input_cost_per_1m
cumulative_cost += turn_cost
return cumulative_cost
# Cost comparison by memory strategy
turns = np.arange(1, 101)
cost_no_memory = [calculate_cost_per_turn(t) for t in turns]
# Sliding Window (keep only last 20 turns)
cost_sliding = [calculate_cost_per_turn(min(t, 20)) for t in turns]
# Summary Memory (1/5 compression via summarization)
cost_summary = [calculate_cost_per_turn(t, avg_tokens_per_turn=40) for t in turns]
print(f"100-turn cost (no memory): ${cost_no_memory[-1]:.4f}")
print(f"100-turn cost (Sliding Window): ${cost_sliding[-1]:.4f}")
print(f"100-turn cost (Summary): ${cost_summary[-1]:.4f}")
Memory Pattern Comparative Analysis
Pattern Characteristics Comparison
| Memory Pattern | Token Usage | Info Retention | Latency | Implementation Complexity | Best For |
|---|---|---|---|---|---|
| Buffer Memory | O(n) linear | 100% | Low | Low | Short conversations |
| Window Memory | O(k) fixed | Last k turns | Low | Low | General chatbots |
| Summary Memory | O(1) fixed | Summary only | Medium | Medium | Long conversations |
| Summary Buffer | O(k+1) | Summary + recent | Medium | Medium | Balanced |
| Vector Store | O(k) search | Semantic-based | High | High | Knowledge-intensive |
| Entity Memory | O(e) entities | Per-entity | Medium | High | Personalization |
1. Buffer Memory - Full History Storage
The simplest pattern that retains the entire conversation history as-is.
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
# Buffer Memory: stores all conversations as-is
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
memory = ConversationBufferMemory(return_messages=True)
chain = ConversationChain(llm=llm, memory=memory, verbose=True)
# Conversation flow
response1 = chain.predict(input="Hello, we have a server outage")
response2 = chain.predict(input="Checking the logs, I see OOM errors")
response3 = chain.predict(input="How can I check current memory usage?")
# Review entire history stored in memory
for msg in memory.chat_memory.messages:
role = "User" if msg.type == "human" else "AI"
print(f"[{role}] {msg.content[:80]}...")
Limitation: As conversations grow longer, token usage increases linearly, causing costs and latency to spike.
2. Sliding Window Memory - Keep Only Recent N Turns
Maintains a fixed-size window while removing older conversations.
from langchain.memory import ConversationBufferWindowMemory
# Sliding Window keeping only the last 10 messages (5 pairs)
window_memory = ConversationBufferWindowMemory(
k=10, # Keep last 10 messages
return_messages=True
)
chain = ConversationChain(llm=llm, memory=window_memory, verbose=True)
# Token-based Window implementation (custom)
class TokenWindowMemory:
"""Memory that manages conversation history based on token count"""
def __init__(self, max_tokens: int = 4000, model: str = "gpt-4o"):
self.max_tokens = max_tokens
self.encoding = tiktoken.encoding_for_model(model)
self.messages: list[dict] = []
def add_message(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
self._trim()
def _trim(self):
"""Remove oldest messages when token limit is exceeded"""
while self._total_tokens() > self.max_tokens and len(self.messages) > 2:
# Preserve first system message
self.messages.pop(0 if self.messages[0]["role"] != "system" else 1)
def _total_tokens(self) -> int:
return sum(
len(self.encoding.encode(m["content"])) + 4
for m in self.messages
)
def get_messages(self) -> list[dict]:
return self.messages.copy()
# Usage example
token_window = TokenWindowMemory(max_tokens=4000)
token_window.add_message("system", "You are a technical support expert.")
token_window.add_message("user", "My Docker container keeps restarting.")
token_window.add_message("assistant", "Let me check if it is in OOMKilled state...")
print(f"Current token usage: {token_window._total_tokens()}")
3. Summary Memory - Compression Through Conversation Summarization
Uses an LLM to summarize previous conversations and leverages the summary as context.
from langchain.memory import ConversationSummaryMemory
# Summary Memory: automatically summarizes conversations with LLM
summary_memory = ConversationSummaryMemory(
llm=ChatOpenAI(model="gpt-4o-mini", temperature=0), # Lightweight model for summarization
return_messages=True
)
# Summary Buffer Memory: combines summary + recent conversations
from langchain.memory import ConversationSummaryBufferMemory
summary_buffer = ConversationSummaryBufferMemory(
llm=ChatOpenAI(model="gpt-4o-mini", temperature=0),
max_token_limit=2000, # Summarizes older messages when this limit is exceeded
return_messages=True
)
# Custom Progressive Summarization implementation
class ProgressiveSummarizer:
"""Progressive summarization: performs staged summarization as conversations accumulate"""
def __init__(self, llm, summarize_threshold: int = 10):
self.llm = llm
self.summarize_threshold = summarize_threshold
self.summary = ""
self.recent_messages: list[dict] = []
self.turn_count = 0
async def add_exchange(self, user_msg: str, ai_msg: str):
self.recent_messages.append({"role": "user", "content": user_msg})
self.recent_messages.append({"role": "assistant", "content": ai_msg})
self.turn_count += 1
if self.turn_count % self.summarize_threshold == 0:
await self._summarize()
async def _summarize(self):
"""Integrate recent conversation into existing summary"""
messages_text = "\n".join(
f"{m['role']}: {m['content']}" for m in self.recent_messages
)
prompt = f"""Previous summary:
{self.summary if self.summary else '(none)'}
Recent conversation:
{messages_text}
Create a concise summary that integrates the previous summary with
the recent conversation, preserving key information. Include user names,
preferences, and unresolved issues."""
response = await self.llm.ainvoke(prompt)
self.summary = response.content
self.recent_messages = self.recent_messages[-4:] # Keep only last 2 turns
def get_context(self) -> str:
parts = []
if self.summary:
parts.append(f"[Conversation Summary]\n{self.summary}")
if self.recent_messages:
recent = "\n".join(
f"{m['role']}: {m['content']}" for m in self.recent_messages
)
parts.append(f"[Recent Conversation]\n{recent}")
return "\n\n".join(parts)
4. Vector Store Memory - Semantic-Based Retrieval
Stores conversation history as vector embeddings and retrieves past conversations semantically similar to the current question.
from langchain.memory import VectorStoreRetrieverMemory
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
# Vector Store-based memory setup
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma(
collection_name="conversation_memory",
embedding_function=embeddings,
persist_directory="./chroma_memory"
)
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 5} # Retrieve top 5 most relevant conversations
)
vector_memory = VectorStoreRetrieverMemory(
retriever=retriever,
memory_key="relevant_history",
input_key="input"
)
# Store conversations
vector_memory.save_context(
{"input": "What is the deployment schedule for Project A?"},
{"output": "Project A is scheduled for staging on March 15 and production on March 20."}
)
vector_memory.save_context(
{"input": "When is the database migration?"},
{"output": "The DB migration is scheduled for 2 AM on March 18."}
)
# Retrieve relevant conversations
relevant = vector_memory.load_memory_variables(
{"input": "What should we check before Project A deployment?"}
)
print(relevant["relevant_history"])
Deep Dive into Sliding Window Strategies
Adaptive Sliding Window
A strategy that dynamically adjusts the window based on conversation importance rather than using a fixed size.
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import hashlib
@dataclass
class ConversationTurn:
role: str
content: str
timestamp: datetime
importance: float = 0.5 # 0.0 ~ 1.0
tokens: int = 0
turn_id: str = ""
def __post_init__(self):
if not self.turn_id:
self.turn_id = hashlib.md5(
f"{self.timestamp}{self.content[:50]}".encode()
).hexdigest()[:8]
class AdaptiveSlidingWindow:
"""Importance-based adaptive sliding window"""
def __init__(self, max_tokens: int = 8000, min_turns: int = 4):
self.max_tokens = max_tokens
self.min_turns = min_turns # Minimum turns to retain
self.turns: list[ConversationTurn] = []
self.archived: list[ConversationTurn] = []
def add_turn(self, turn: ConversationTurn):
self.turns.append(turn)
self._optimize()
def _calculate_importance(self, turn: ConversationTurn, index: int) -> float:
"""Calculate turn importance across multiple dimensions"""
score = turn.importance
# Higher weight for more recent turns
recency = index / max(len(self.turns) - 1, 1)
score += recency * 0.3
# Turns containing questions get higher importance
if "?" in turn.content or "how" in turn.content.lower():
score += 0.2
# Error/incident related keywords
critical_keywords = ["error", "failure", "outage", "critical", "urgent", "fail"]
if any(kw in turn.content.lower() for kw in critical_keywords):
score += 0.3
return min(score, 1.0)
def _optimize(self):
"""Prioritize retaining important turns within token limit"""
total_tokens = sum(t.tokens for t in self.turns)
if total_tokens <= self.max_tokens:
return
# Calculate importance scores
scored = [
(i, self._calculate_importance(t, i), t)
for i, t in enumerate(self.turns)
]
# Always retain recent min_turns
protected = self.turns[-self.min_turns:]
candidates = scored[:-self.min_turns]
# Sort by lowest importance and remove
candidates.sort(key=lambda x: x[1])
while total_tokens > self.max_tokens and candidates:
_, _, turn = candidates.pop(0)
self.archived.append(turn)
self.turns.remove(turn)
total_tokens -= turn.tokens
def get_context(self) -> list[dict]:
return [
{"role": t.role, "content": t.content}
for t in self.turns
]
Time-Based vs Token-Based Window Comparison
class TimeBasedWindow:
"""Time-based sliding window - keeps only conversations within last N minutes"""
def __init__(self, window_minutes: int = 30):
self.window_minutes = window_minutes
self.messages: list[dict] = []
def add_message(self, role: str, content: str):
self.messages.append({
"role": role,
"content": content,
"timestamp": datetime.now()
})
self._cleanup()
def _cleanup(self):
cutoff = datetime.now() - timedelta(minutes=self.window_minutes)
self.messages = [
m for m in self.messages
if m["timestamp"] > cutoff
]
def get_messages(self) -> list[dict]:
return [
{"role": m["role"], "content": m["content"]}
for m in self.messages
]
class HybridWindow:
"""Token + Time hybrid window"""
def __init__(self, max_tokens: int = 4000, max_minutes: int = 60):
self.max_tokens = max_tokens
self.max_minutes = max_minutes
self.token_window = TokenWindowMemory(max_tokens=max_tokens)
self.time_window = TimeBasedWindow(window_minutes=max_minutes)
def add_message(self, role: str, content: str):
self.token_window.add_message(role, content)
self.time_window.add_message(role, content)
def get_messages(self) -> list[dict]:
# Use intersection of both windows (stricter filtering)
token_msgs = set(
m["content"] for m in self.token_window.get_messages()
)
time_msgs = self.time_window.get_messages()
return [m for m in time_msgs if m["content"] in token_msgs]
Conversation Summarization Techniques
Summarization Strategy Comparison
| Strategy | Summarization Timing | Token Savings | Information Loss | Additional Cost |
|---|---|---|---|---|
| Per-turn summary | After every exchange | 80-90% | Medium | High |
| Threshold summary | Every N turns | 60-80% | Low | Medium |
| Hierarchical summary | Staged | 70-85% | Very low | Medium |
| Selective summary | Importance-based | 50-70% | Minimal | Low |
Hierarchical Summarization System Implementation
from enum import Enum
from typing import Any
class MemoryTier(Enum):
SHORT_TERM = "short_term" # Recent conversation verbatim
MID_TERM = "mid_term" # Session summaries
LONG_TERM = "long_term" # Core facts/preferences
class HierarchicalMemory:
"""3-tier memory architecture"""
def __init__(self, llm, short_term_limit: int = 10,
mid_term_limit: int = 5):
self.llm = llm
self.short_term_limit = short_term_limit
self.mid_term_limit = mid_term_limit
self.short_term: list[dict] = [] # Recent verbatim messages
self.mid_term: list[str] = [] # Session summaries
self.long_term: dict[str, Any] = { # Persistent stored info
"user_name": None,
"preferences": [],
"key_facts": [],
"unresolved_issues": []
}
async def add_exchange(self, user_msg: str, ai_msg: str):
# 1. Add to short-term memory
self.short_term.append({"role": "user", "content": user_msg})
self.short_term.append({"role": "assistant", "content": ai_msg})
# 2. Promote to mid-term when short-term exceeds limit
if len(self.short_term) > self.short_term_limit * 2:
await self._promote_to_mid_term()
# 3. Extract to long-term when mid-term exceeds limit
if len(self.mid_term) > self.mid_term_limit:
await self._extract_to_long_term()
async def _promote_to_mid_term(self):
"""Short -> Mid: summarize older messages and promote"""
old_messages = self.short_term[:-6] # Exclude last 3 turns
self.short_term = self.short_term[-6:]
text = "\n".join(f"{m['role']}: {m['content']}" for m in old_messages)
prompt = f"Summarize the following conversation in 3-4 sentences:\n\n{text}"
response = await self.llm.ainvoke(prompt)
self.mid_term.append(response.content)
async def _extract_to_long_term(self):
"""Mid -> Long: extract key facts for permanent storage"""
summaries = "\n\n".join(self.mid_term[:-2])
self.mid_term = self.mid_term[-2:]
prompt = f"""Extract key information from the following conversation summaries as JSON:
{summaries}
Items to extract:
- user_preferences: user preferences
- key_facts: key facts
- unresolved_issues: unresolved issues"""
response = await self.llm.ainvoke(prompt)
# Parse JSON and merge into long_term (error handling needed in production)
import json
try:
extracted = json.loads(response.content)
self.long_term["preferences"].extend(
extracted.get("user_preferences", [])
)
self.long_term["key_facts"].extend(
extracted.get("key_facts", [])
)
self.long_term["unresolved_issues"] = extracted.get(
"unresolved_issues", []
)
except json.JSONDecodeError:
pass # Ignore on parse failure
def build_context(self) -> str:
"""Assemble and return full context"""
parts = []
# Long-term memory (always included)
if any(self.long_term.values()):
lt = self.long_term
facts = "\n".join(f"- {f}" for f in lt["key_facts"][-10:])
prefs = ", ".join(lt["preferences"][-5:])
issues = "\n".join(f"- {i}" for i in lt["unresolved_issues"])
parts.append(
f"[User Profile]\nName: {lt['user_name']}\n"
f"Preferences: {prefs}\nKey Facts:\n{facts}\n"
f"Unresolved Issues:\n{issues}"
)
# Mid-term memory (session summaries)
if self.mid_term:
parts.append(
"[Previous Conversation Summary]\n" + "\n---\n".join(self.mid_term)
)
# Short-term memory (recent verbatim)
if self.short_term:
recent = "\n".join(
f"{m['role']}: {m['content']}" for m in self.short_term
)
parts.append(f"[Recent Conversation]\n{recent}")
return "\n\n".join(parts)
LangChain / LlamaIndex Production Implementation
LangChain LCEL-Based Memory Implementation
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnablePassthrough
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import RedisChatMessageHistory
# LCEL-based chain composition
prompt = ChatPromptTemplate.from_messages([
("system", "You are a friendly technical support expert. "
"Refer to previous conversation history for consistent responses."),
MessagesPlaceholder(variable_name="history"),
("human", "{input}")
])
chain = prompt | ChatOpenAI(model="gpt-4o", temperature=0.7)
# Redis-based persistent session management
def get_session_history(session_id: str):
return RedisChatMessageHistory(
session_id=session_id,
url="redis://localhost:6379"
)
# Chain with integrated message history
chain_with_history = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="input",
history_messages_key="history"
)
# Per-session conversation
config = {"configurable": {"session_id": "user-123-session-456"}}
response = chain_with_history.invoke(
{"input": "My Kubernetes Pod is in CrashLoopBackOff state"},
config=config
)
print(response.content)
LlamaIndex ChatMemoryBuffer Implementation
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.chat_engine import SimpleChatEngine
from llama_index.llms.openai import OpenAI
# LlamaIndex memory buffer setup
memory = ChatMemoryBuffer.from_defaults(token_limit=4000)
llm = OpenAI(model="gpt-4o", temperature=0.7)
chat_engine = SimpleChatEngine.from_defaults(
llm=llm,
memory=memory,
system_prompt="You are a DevOps engineer chatbot specialist."
)
# Conversation flow
response1 = chat_engine.chat("Our CI/CD pipeline has failed")
response2 = chat_engine.chat("Here is the error log: connection timeout")
response3 = chat_engine.chat("What was the solution for the issue I mentioned earlier?")
# Check memory state
print(f"Messages in memory: {len(memory.get_all())}")
Vector DB-Based Persistent Memory
Long-Term Memory Architecture with Pinecone
from pinecone import Pinecone
from langchain_openai import OpenAIEmbeddings
from datetime import datetime
import json
import uuid
class PersistentConversationMemory:
"""Pinecone-based persistent conversation memory"""
def __init__(self, index_name: str = "conversation-memory"):
self.pc = Pinecone()
self.index = self.pc.Index(index_name)
self.embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
def store_exchange(self, user_id: str, session_id: str,
user_msg: str, ai_msg: str,
metadata: dict = None):
"""Store conversation exchange in vector DB"""
exchange_text = f"User: {user_msg}\nAssistant: {ai_msg}"
embedding = self.embeddings.embed_query(exchange_text)
record_metadata = {
"user_id": user_id,
"session_id": session_id,
"user_message": user_msg[:500],
"ai_message": ai_msg[:500],
"timestamp": datetime.now().isoformat(),
"type": "exchange"
}
if metadata:
record_metadata.update(metadata)
self.index.upsert(vectors=[{
"id": str(uuid.uuid4()),
"values": embedding,
"metadata": record_metadata
}])
def recall(self, user_id: str, query: str,
top_k: int = 5) -> list[dict]:
"""Retrieve past conversations relevant to current query"""
query_embedding = self.embeddings.embed_query(query)
results = self.index.query(
vector=query_embedding,
top_k=top_k,
filter={"user_id": {"$eq": user_id}},
include_metadata=True
)
return [
{
"user_message": match.metadata["user_message"],
"ai_message": match.metadata["ai_message"],
"timestamp": match.metadata["timestamp"],
"relevance": match.score
}
for match in results.matches
]
def build_memory_context(self, user_id: str, query: str) -> str:
"""Assemble retrieved past conversations into context string"""
memories = self.recall(user_id, query)
if not memories:
return ""
lines = ["[Relevant Past Conversations]"]
for m in memories:
lines.append(f"({m['timestamp'][:10]}) "
f"User: {m['user_message']}")
lines.append(f" AI: {m['ai_message']}")
lines.append("")
return "\n".join(lines)
Context Drift and Hallucination Mitigation
Problem Patterns and Detection
Two major problems emerge as conversations grow longer:
- Context Drift: The conversation gradually diverges from the original intent
- Stale Context Hallucination: Hallucinations caused by distorted information from the summarization process
class ContextDriftDetector:
"""Module for detecting context drift"""
def __init__(self, embeddings, drift_threshold: float = 0.3):
self.embeddings = embeddings
self.drift_threshold = drift_threshold
self.initial_topic_embedding = None
self.recent_embeddings: list[list[float]] = []
def set_initial_topic(self, first_message: str):
"""Set the initial topic of the conversation"""
self.initial_topic_embedding = self.embeddings.embed_query(
first_message
)
def check_drift(self, current_message: str) -> dict:
"""Measure how far current message has drifted from initial topic"""
current_embedding = self.embeddings.embed_query(current_message)
self.recent_embeddings.append(current_embedding)
if self.initial_topic_embedding is None:
self.set_initial_topic(current_message)
return {"drifted": False, "similarity": 1.0}
similarity = self._cosine_similarity(
self.initial_topic_embedding, current_embedding
)
return {
"drifted": similarity < self.drift_threshold,
"similarity": similarity,
"suggestion": (
"The conversation topic has significantly changed. "
"Consider starting a new session or resetting context."
if similarity < self.drift_threshold else None
)
}
@staticmethod
def _cosine_similarity(a: list[float], b: list[float]) -> float:
import numpy as np
a, b = np.array(a), np.array(b)
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
Summary Accuracy Validation
class SummaryValidator:
"""Validates accuracy of conversation summaries"""
def __init__(self, llm):
self.llm = llm
async def validate_summary(self, original_messages: list[dict],
summary: str) -> dict:
"""Verify summary fidelity against original conversation"""
original_text = "\n".join(
f"{m['role']}: {m['content']}" for m in original_messages
)
prompt = f"""Compare the original conversation with its summary and evaluate:
1. Key information preservation rate (0-100)
2. Presence of distorted information
3. Missing important information
Original conversation:
{original_text}
Summary:
{summary}
Respond in JSON format."""
response = await self.llm.ainvoke(prompt)
try:
result = json.loads(response.content)
return result
except json.JSONDecodeError:
return {"error": "Validation result parsing failed"}
Production Architecture Patterns
Full Architecture
# docker-compose.yml - Production conversation memory stack
version: '3.8'
services:
chat-api:
image: chat-service:latest
ports:
- '8000:8000'
environment:
- REDIS_URL=redis://redis:6379
- PINECONE_API_KEY=pk-xxx
- OPENAI_API_KEY=sk-xxx
depends_on:
- redis
- postgres
redis:
image: redis:7-alpine
ports:
- '6379:6379'
volumes:
- redis-data:/data
command: redis-server --appendonly yes
postgres:
image: pgvector/pgvector:pg16
environment:
POSTGRES_DB: chatbot
POSTGRES_USER: admin
POSTGRES_PASSWORD: secure-password
volumes:
- pg-data:/var/lib/postgresql/data
ports:
- '5432:5432'
volumes:
redis-data:
pg-data:
FastAPI-Based Conversation Server
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import redis.asyncio as redis
import json
app = FastAPI(title="Multi-Turn Chat API")
# Redis connection
redis_client = redis.from_url("redis://localhost:6379", decode_responses=True)
class ChatRequest(BaseModel):
user_id: str
session_id: str
message: str
class ChatResponse(BaseModel):
reply: str
session_id: str
turn_count: int
tokens_used: int
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
"""Multi-turn conversation endpoint"""
session_key = f"session:{request.user_id}:{request.session_id}"
# 1. Load session history
history_raw = await redis_client.lrange(session_key, 0, -1)
history = [json.loads(h) for h in history_raw]
# 2. Memory management (Sliding Window + Summary)
manager = SessionMemoryManager(max_turns=20, summary_threshold=15)
context = await manager.prepare_context(history, request.message)
# 3. LLM call
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
messages = context + [{"role": "user", "content": request.message}]
response = await llm.ainvoke(messages)
# 4. Save history
await redis_client.rpush(
session_key,
json.dumps({"role": "user", "content": request.message})
)
await redis_client.rpush(
session_key,
json.dumps({"role": "assistant", "content": response.content})
)
# 5. Set TTL (24 hours)
await redis_client.expire(session_key, 86400)
turn_count = len(history) // 2 + 1
return ChatResponse(
reply=response.content,
session_id=request.session_id,
turn_count=turn_count,
tokens_used=response.response_metadata.get("token_usage", {}).get(
"total_tokens", 0
)
)
Operational Notes
Monitoring Checklist
- Token Usage Monitoring: Track average/max token consumption per session and set alerts for unusual spikes.
- Summary Quality Verification: Periodically sample summary results to check for information loss.
- Context Drift Tracking: Monitor topic deviation rates as sessions grow longer.
- Latency Analysis: Measure how memory retrieval/summarization stages impact overall response time.
- Cost Tracking: Separately track costs for memory management LLM calls (summarization, etc.).
Common Failure Cases and Recovery Procedures
class MemoryRecoveryHandler:
"""Memory-related failure recovery handler"""
async def handle_token_overflow(self, session_id: str):
"""Emergency handling when token limit is exceeded"""
# 1. Emergency summarization keeping only last 5 turns
# 2. If summarization fails, keep only last 3 turns and discard rest
# 3. Notify user about context reduction
pass
async def handle_summary_failure(self, session_id: str):
"""When summary LLM call fails"""
# 1. Retry (max 3 times with backoff)
# 2. Fallback: switch to simple message-count-based window
# 3. Proceed with only recent conversation without summary
pass
async def handle_vector_db_failure(self, session_id: str):
"""When vector DB connection fails"""
# 1. Serve recent conversation from local cache
# 2. Fallback to Redis short-term memory
# 3. Continue basic conversation without vector search
pass
async def handle_context_drift(self, session_id: str, drift_score: float):
"""When context drift is detected"""
# 1. Notify user about topic change
# 2. Suggest starting new session
# 3. Reconstruct context based on current topic
pass
Performance Optimization Tips
# Monitor Redis memory usage
redis-cli INFO memory | grep used_memory_human
# Check per-session memory size
redis-cli DEBUG OBJECT "session:user-123:session-456"
# Clean up expired sessions
redis-cli --scan --pattern "session:*" | while read key; do
ttl=$(redis-cli TTL "$key")
if [ "$ttl" -eq "-1" ]; then
echo "No TTL set for $key"
fi
done
Memory Pattern Selection Guide
Recommendations by Use Case
| Use Case | Recommended Pattern | Rationale |
|---|---|---|
| Simple FAQ bot | Buffer Window (k=5) | Short conversations, minimal cost |
| Customer support chatbot | Summary Buffer + Entity | Long conversations, customer info tracking |
| Technical support agent | Hierarchical + Vector | Need to search past issues |
| Personal assistant bot | Full Hierarchical | Long-term memory, personalization |
| Code review bot | Token Window | Maximize code context |
Decision Flowchart
Conversation length?
|
+-- 5 turns or less --> Buffer Memory
|
+-- 5~30 turns --> Personalization needed?
| |
| +-- No --> Sliding Window
| +-- Yes --> Summary Buffer + Entity
|
+-- 30+ turns --> Need to search past conversations?
|
+-- No --> Hierarchical Memory
+-- Yes --> Hierarchical + Vector Store
Conclusion
Multi-turn conversation management is the core factor that determines the quality of LLM chatbots. Simply stuffing all conversations into the context is not sustainable in terms of cost and performance. Understanding various memory patterns like Buffer, Summary, and Vector Store, and selecting the right strategy for your use case is essential.
Sliding Window is the most practical baseline strategy, and combining it with conversation summarization and vector search maintains high quality even in long conversations. Hierarchical memory architecture mimics human memory structure by separately managing short-term, mid-term, and long-term memory, optimizing the balance between token efficiency and information preservation.
In production environments, session management with Redis, persistent memory with vector DBs like Pinecone/Chroma, and failure recovery strategies are essential. Continuously monitor token usage and summary quality, and have mitigation plans ready for context drift.
References
- LangChain Conversational Memory - Pinecone
- Context Window Management - Redis Blog
- Context Window Management Strategies for Long-Context AI Agents - Maxim AI
- AI Agent Memory Architecture - IBM
- LLM Chat History Summarization Guide - Mem0
- Top Techniques to Manage Context Length in LLMs - Agenta
- LangChain Memory Tutorial - Aurelio AI