- Published on
Chatbot Multi-Turn Memory Management Guide: Context Retention Strategies with LangChain and LangGraph
- Authors
- Name
- Introduction
- Core Challenges of Multi-Turn Conversations
- LangChain Memory Types
- Memory Type Comparison
- LangGraph Stateful Agents
- Persistent Memory Implementation
- Context Compression Techniques
- RAG-Enhanced Memory
- Session Management Patterns
- Troubleshooting
- Operational Notes
- Production Checklist
- References

Introduction
When building chatbots, the most fundamental yet challenging problem is maintaining context across multi-turn conversations. Simple question-answering (single-turn) can handle each request independently, but real conversations build on previous content. To answer "How much is that?", the bot needs to understand what "that" refers to from the prior conversation.
LLM context windows are finite. Even GPT-4o's 128K tokens may not accommodate hundreds of conversation turns, and token costs increase rapidly. Therefore, a memory management strategy addressing what information to retain and how much is essential.
This guide compares LangChain's various memory types, demonstrates building stateful agents with LangGraph, implementing persistent memory with databases, and integrating RAG -- all aimed at production-grade multi-turn conversation systems.
Core Challenges of Multi-Turn Conversations
Context Window Limitations
LLMs have a finite number of tokens they can process in a single API call. As conversations grow longer, early conversation content gets truncated or costs skyrocket.
# Problem scenario: early context is lost as conversations grow
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# Simulating a 100-turn conversation
messages = []
for i in range(100):
messages.append(HumanMessage(content=f"Turn {i}: This is question number {i}."))
messages.append(AIMessage(content=f"Turn {i}: This is answer number {i}."))
# Sending all messages risks exceeding the token limit
# Solution: Apply memory management strategies
print(f"Total messages: {len(messages)}")
Relevance Decay
As conversations progress, the relevance of early messages decreases. Sending all conversation history with equal weight is inefficient.
| Problem | Description | Impact |
|---|---|---|
| Token Limit Exceeded | Long conversations exceed context window | API errors or early conversation loss |
| Cost Increase | Unnecessary past conversations sent every time | Token costs skyrocket |
| Relevance Dilution | Key information buried in irrelevant conversation | Response quality degrades |
| Latency Increase | Long prompts take time to process | User experience suffers |
| Hallucination Increase | Incorrect reasoning from excessive context | Reliability drops |
LangChain Memory Types
ConversationBufferMemory
The simplest memory type that stores all conversation content as-is.
from langchain_openai import ChatOpenAI
from langchain.chains import ConversationChain
from langchain.memory import ConversationBufferMemory
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
# Buffer Memory: retains all conversation content as-is
memory = ConversationBufferMemory(return_messages=True)
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True,
)
# Conduct conversation
response1 = conversation.predict(input="Hi, my name is John. I'm a Python developer.")
print(f"AI: {response1}")
response2 = conversation.predict(input="My favorite framework is FastAPI.")
print(f"AI: {response2}")
response3 = conversation.predict(input="What's my name again?")
print(f"AI: {response3}")
# AI remembers previous conversation and answers "John"
# Check memory contents
print("\n=== Memory Contents ===")
for msg in memory.chat_memory.messages:
print(f" {type(msg).__name__}: {msg.content[:80]}...")
ConversationBufferWindowMemory
A sliding window approach that retains only the most recent N conversations.
from langchain.memory import ConversationBufferWindowMemory
# Keep only the last 5 turns
memory = ConversationBufferWindowMemory(
k=5,
return_messages=True,
)
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True,
)
# Conduct 10 turns of conversation
for i in range(10):
response = conversation.predict(input=f"This is message number {i+1}.")
print(f"Turn {i+1}: {response[:50]}...")
# With k=5, message 1 is deleted when message 6 arrives
print(f"\nMessages stored in memory: {len(memory.chat_memory.messages)}")
ConversationSummaryMemory
Uses an LLM to summarize and store conversation content. Retains key information even across long conversations.
from langchain.memory import ConversationSummaryMemory
# Summary Memory: LLM summarizes and stores conversations
memory = ConversationSummaryMemory(
llm=llm,
return_messages=True,
)
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True,
)
# Multiple turns of conversation
conversation.predict(input="Hello, I'm a backend developer based in Seoul.")
conversation.predict(input="I mainly use Python and Go, working in Kubernetes environments.")
conversation.predict(input="Recently I've been developing chatbots with LangChain.")
conversation.predict(input="Building RAG pipelines is my main task.")
# Check summary content
print("\n=== Summary ===")
print(memory.buffer)
# A summary is stored instead of the full conversation
ConversationSummaryBufferMemory
A hybrid approach combining summary and buffer. Recent conversations are kept as originals while older ones are summarized.
from langchain.memory import ConversationSummaryBufferMemory
# Summary + Buffer hybrid: recent as original, past as summary
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=300, # Older conversations are summarized when this limit is exceeded
return_messages=True,
)
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True,
)
# Conduct conversation
conversation.predict(input="Project A is an e-commerce platform.")
conversation.predict(input="The tech stack is Next.js, FastAPI, PostgreSQL.")
conversation.predict(input="Currently implementing the payment module.")
conversation.predict(input="Webhook handling with the payment gateway is tricky.")
conversation.predict(input="We also need to set up a test environment.")
# Check memory state
print("\n=== Moving Summary ===")
print(memory.moving_summary_buffer)
print(f"\nCurrent buffer message count: {len(memory.chat_memory.messages)}")
EntityMemory
Extracts and manages entities (people, places, concepts) from conversations.
from langchain.memory import ConversationEntityMemory
from langchain.memory.prompt import ENTITY_MEMORY_CONVERSATION_TEMPLATE
# Entity Memory: extracts and updates entities from conversations
memory = ConversationEntityMemory(
llm=llm,
return_messages=True,
)
conversation = ConversationChain(
llm=llm,
memory=memory,
prompt=ENTITY_MEMORY_CONVERSATION_TEMPLATE,
verbose=True,
)
# Conversations containing entities
conversation.predict(
input="Alice is our team's senior developer. She's a Python expert with 5 years of experience."
)
conversation.predict(
input="Bob is a product manager working with Alice on a recommendation system project."
)
conversation.predict(
input="Alice recently introduced MLflow for experiment management."
)
# Check entity information
print("\n=== Entity Store ===")
for entity, info in memory.entity_store.store.items():
print(f" {entity}: {info}")
Memory Type Comparison
| Memory Type | Pros | Cons | Best For |
|---|---|---|---|
| Buffer | Full conversation preserved, simple implementation | Token costs increase, window overflow risk | Short conversations, prototypes |
| BufferWindow | Predictable costs, retains latest info | Older information lost | Customer support, FAQ bots |
| Summary | Retains key info across long conversations | Information loss during summarization, extra LLM call cost | Long conversations, support history |
| SummaryBuffer | Combines recent originals with past summaries | Complex setup, depends on summary quality | Tech support, project conversations |
| Entity | Tracks key entities | Entity extraction errors possible, extra cost | CRM bots, contact management systems |
LangGraph Stateful Agents
LangGraph Basic Structure
LangGraph models conversations as state graphs. Each node represents a processing step, and edges represent state transitions.
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
# Define a state-based conversation graph
def chatbot_node(state: MessagesState):
"""Main chatbot node"""
system_message = SystemMessage(
content="You are a friendly AI assistant. Respond considering the previous conversation context."
)
messages = [system_message] + state["messages"]
response = llm.invoke(messages)
return {"messages": [response]}
# Build graph
graph_builder = StateGraph(MessagesState)
graph_builder.add_node("chatbot", chatbot_node)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)
# Add memory checkpointer (persist conversation state)
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)
# Per-session conversation (sessions distinguished by thread_id)
config = {"configurable": {"thread_id": "user-session-001"}}
# First message
response1 = graph.invoke(
{"messages": [HumanMessage(content="Hello, I'm a data engineer.")]},
config=config,
)
print(f"AI: {response1['messages'][-1].content}")
# Second message (previous conversation automatically maintained)
response2 = graph.invoke(
{"messages": [HumanMessage(content="What's my profession again?")]},
config=config,
)
print(f"AI: {response2['messages'][-1].content}")
Conditional Routing and Tool Usage
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
@tool
def search_knowledge_base(query: str) -> str:
"""Search the knowledge base for information."""
# In practice, this would perform vector DB searches
knowledge = {
"refund policy": "Full refund within 14 days of purchase.",
"shipping time": "Delivery within 2-3 business days after ordering.",
"membership tiers": "Bronze, Silver, Gold, and Platinum - 4 tiers.",
}
for key, value in knowledge.items():
if key in query.lower():
return value
return "No relevant information found."
@tool
def get_order_status(order_id: str) -> str:
"""Look up order status."""
# In practice, this would query a database
return f"Order {order_id}: In transit (expected arrival: 2026-03-13)"
# Bind tools
tools = [search_knowledge_base, get_order_status]
llm = ChatOpenAI(model="gpt-4o", temperature=0).bind_tools(tools)
def assistant_node(state: MessagesState):
"""Assistant node: LLM invocation and tool usage decisions"""
system_msg = SystemMessage(
content="You are an e-commerce customer support chatbot. Use tools when needed."
)
messages = [system_msg] + state["messages"]
response = llm.invoke(messages)
return {"messages": [response]}
# Build graph
graph_builder = StateGraph(MessagesState)
graph_builder.add_node("assistant", assistant_node)
graph_builder.add_node("tools", ToolNode(tools))
graph_builder.add_edge(START, "assistant")
graph_builder.add_conditional_edges("assistant", tools_condition)
graph_builder.add_edge("tools", "assistant")
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)
# Run conversation
config = {"configurable": {"thread_id": "customer-123"}}
response = graph.invoke(
{"messages": [HumanMessage(content="What's the shipping status for order ORD-2026-0311?")]},
config=config,
)
print(f"AI: {response['messages'][-1].content}")
Persistent Memory Implementation
Redis-Based Session Management
import redis
import json
from datetime import datetime, timedelta
from langchain_core.messages import HumanMessage, AIMessage, messages_from_dict, messages_to_dict
class RedisSessionMemory:
"""Redis-based conversation session memory"""
def __init__(self, redis_url="redis://localhost:6379", ttl_hours=24):
self.redis = redis.from_url(redis_url)
self.ttl = timedelta(hours=ttl_hours)
def _key(self, session_id: str) -> str:
return f"chat:session:{session_id}"
def save_messages(self, session_id: str, messages: list):
"""Save message list to Redis"""
key = self._key(session_id)
data = {
"messages": messages_to_dict(messages),
"updated_at": datetime.now().isoformat(),
}
self.redis.setex(key, self.ttl, json.dumps(data, ensure_ascii=False))
def load_messages(self, session_id: str) -> list:
"""Load message list from Redis"""
key = self._key(session_id)
data = self.redis.get(key)
if data is None:
return []
parsed = json.loads(data)
return messages_from_dict(parsed["messages"])
def add_message(self, session_id: str, message):
"""Add a single message"""
messages = self.load_messages(session_id)
messages.append(message)
self.save_messages(session_id, messages)
def clear_session(self, session_id: str):
"""Delete a session"""
self.redis.delete(self._key(session_id))
def get_session_info(self, session_id: str) -> dict:
"""Query session metadata"""
key = self._key(session_id)
data = self.redis.get(key)
if data is None:
return {"exists": False}
parsed = json.loads(data)
return {
"exists": True,
"message_count": len(parsed["messages"]),
"updated_at": parsed["updated_at"],
"ttl_seconds": self.redis.ttl(key),
}
# Usage example
session_memory = RedisSessionMemory(redis_url="redis://localhost:6379")
session_id = "user-abc-123"
session_memory.add_message(session_id, HumanMessage(content="Hello"))
session_memory.add_message(session_id, AIMessage(content="Hello! How can I help you?"))
messages = session_memory.load_messages(session_id)
print(f"Stored messages: {len(messages)}")
PostgreSQL-Based Long-Term Memory
from sqlalchemy import create_engine, Column, String, Text, DateTime, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import json
Base = declarative_base()
class ConversationHistory(Base):
"""Conversation history table"""
__tablename__ = "conversation_history"
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String(255), index=True, nullable=False)
user_id = Column(String(255), index=True, nullable=False)
role = Column(String(50), nullable=False) # human, ai, system
content = Column(Text, nullable=False)
metadata_json = Column(Text, default="{}")
created_at = Column(DateTime, default=datetime.utcnow)
class ConversationSummaryStore(Base):
"""Conversation summary table"""
__tablename__ = "conversation_summaries"
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String(255), unique=True, nullable=False)
user_id = Column(String(255), index=True, nullable=False)
summary = Column(Text, nullable=False)
entity_data = Column(Text, default="{}")
message_count = Column(Integer, default=0)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class PostgresMemoryManager:
"""PostgreSQL-based conversation memory manager"""
def __init__(self, database_url: str):
self.engine = create_engine(database_url)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
def save_message(self, session_id: str, user_id: str, role: str, content: str):
"""Save a message"""
session = self.Session()
try:
msg = ConversationHistory(
session_id=session_id,
user_id=user_id,
role=role,
content=content,
)
session.add(msg)
session.commit()
finally:
session.close()
def get_recent_messages(self, session_id: str, limit: int = 20):
"""Query recent messages"""
session = self.Session()
try:
messages = (
session.query(ConversationHistory)
.filter(ConversationHistory.session_id == session_id)
.order_by(ConversationHistory.created_at.desc())
.limit(limit)
.all()
)
return list(reversed(messages))
finally:
session.close()
def save_summary(self, session_id: str, user_id: str, summary: str,
entity_data: dict, message_count: int):
"""Save/update conversation summary"""
session = self.Session()
try:
existing = (
session.query(ConversationSummaryStore)
.filter(ConversationSummaryStore.session_id == session_id)
.first()
)
if existing:
existing.summary = summary
existing.entity_data = json.dumps(entity_data, ensure_ascii=False)
existing.message_count = message_count
else:
new_summary = ConversationSummaryStore(
session_id=session_id,
user_id=user_id,
summary=summary,
entity_data=json.dumps(entity_data, ensure_ascii=False),
message_count=message_count,
)
session.add(new_summary)
session.commit()
finally:
session.close()
# Usage example
db_url = "postgresql://chatbot:password@localhost:5432/chatbot_db"
memory_manager = PostgresMemoryManager(db_url)
Context Compression Techniques
Summary + Recent Messages Combination
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
class HybridMemoryManager:
"""Hybrid memory combining summary and recent buffer"""
def __init__(self, llm, max_buffer_messages=10):
self.llm = llm
self.max_buffer_messages = max_buffer_messages
self.summary = ""
self.buffer = []
def add_exchange(self, human_msg: str, ai_msg: str):
"""Add a conversation exchange"""
self.buffer.append(HumanMessage(content=human_msg))
self.buffer.append(AIMessage(content=ai_msg))
# Compress old messages into summary when buffer exceeds limit
if len(self.buffer) > self.max_buffer_messages * 2:
self._compress()
def _compress(self):
"""Integrate old messages into summary"""
# Select first half for summarization
to_summarize = self.buffer[: self.max_buffer_messages]
self.buffer = self.buffer[self.max_buffer_messages:]
# Generate summary
conversation_text = "\n".join(
f"{'Human' if isinstance(m, HumanMessage) else 'AI'}: {m.content}"
for m in to_summarize
)
summary_prompt = f"""Below is the previous summary and new conversation. Write an integrated summary.
Previous summary: {self.summary if self.summary else 'None'}
New conversation:
{conversation_text}
Summarize concisely while maintaining key information and context:"""
response = self.llm.invoke([HumanMessage(content=summary_prompt)])
self.summary = response.content
def get_context_messages(self) -> list:
"""Return current context messages"""
messages = []
if self.summary:
messages.append(SystemMessage(
content=f"Previous conversation summary: {self.summary}"
))
messages.extend(self.buffer)
return messages
def get_stats(self) -> dict:
"""Return memory statistics"""
return {
"summary_length": len(self.summary),
"buffer_messages": len(self.buffer),
"has_summary": bool(self.summary),
}
# Usage example
llm = ChatOpenAI(model="gpt-4o", temperature=0)
hybrid_memory = HybridMemoryManager(llm=llm, max_buffer_messages=6)
# Simulate conversation
exchanges = [
("What's the project timeline?", "We're targeting end of March for deployment."),
("Who handles backend development?", "Senior developer Kim is responsible."),
("And frontend?", "Developer Lee is building it with React."),
("Is there a test plan?", "QA team will run integration tests in the first week of April."),
("What's the deployment environment?", "AWS EKS-based Kubernetes environment."),
("What about CI/CD pipeline?", "We use GitHub Actions and ArgoCD."),
("How do you handle monitoring?", "We collect metrics with Grafana and Prometheus."),
]
for human, ai in exchanges:
hybrid_memory.add_exchange(human, ai)
stats = hybrid_memory.get_stats()
print(f"Memory state: {stats}")
RAG-Enhanced Memory
Conversation-Based RAG Pipeline
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.messages import HumanMessage, SystemMessage
from langchain.text_splitter import RecursiveCharacterTextSplitter
class RAGEnhancedMemory:
"""RAG-enhanced conversation memory"""
def __init__(self, llm, embeddings, collection_name="chat_memory"):
self.llm = llm
self.embeddings = embeddings
self.vectorstore = Chroma(
collection_name=collection_name,
embedding_function=embeddings,
)
self.recent_messages = []
self.max_recent = 10
def add_exchange(self, session_id: str, human_msg: str, ai_msg: str):
"""Store conversation in vector store"""
# Add to recent messages buffer
self.recent_messages.append(("human", human_msg))
self.recent_messages.append(("ai", ai_msg))
# Store conversation embedding in vector store
exchange_text = f"User: {human_msg}\nAI: {ai_msg}"
self.vectorstore.add_texts(
texts=[exchange_text],
metadatas=[{"session_id": session_id, "type": "exchange"}],
)
# Maintain recent message limit
if len(self.recent_messages) > self.max_recent * 2:
self.recent_messages = self.recent_messages[-self.max_recent * 2:]
def retrieve_relevant_context(self, query: str, k: int = 3) -> list:
"""Search for past conversations relevant to the query"""
results = self.vectorstore.similarity_search(query, k=k)
return [doc.page_content for doc in results]
def generate_response(self, session_id: str, user_input: str) -> str:
"""Generate RAG-based response"""
# Retrieve relevant past conversations
relevant_context = self.retrieve_relevant_context(user_input)
# Build context
context_parts = []
if relevant_context:
context_parts.append("Relevant previous conversations:")
for ctx in relevant_context:
context_parts.append(f" - {ctx}")
system_content = "You are an AI assistant that leverages previous conversation context in your responses."
if context_parts:
system_content += "\n\n" + "\n".join(context_parts)
messages = [SystemMessage(content=system_content)]
# Add recent messages
for role, content in self.recent_messages[-6:]:
if role == "human":
messages.append(HumanMessage(content=content))
else:
from langchain_core.messages import AIMessage
messages.append(AIMessage(content=content))
messages.append(HumanMessage(content=user_input))
response = self.llm.invoke(messages)
# Save conversation
self.add_exchange(session_id, user_input, response.content)
return response.content
# Usage example
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
embeddings = OpenAIEmbeddings()
rag_memory = RAGEnhancedMemory(llm=llm, embeddings=embeddings)
Session Management Patterns
Multi-Tenant Session Management
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime
import uuid
@dataclass
class ChatSession:
"""Chat session"""
session_id: str
user_id: str
created_at: datetime = field(default_factory=datetime.now)
last_active: datetime = field(default_factory=datetime.now)
metadata: dict = field(default_factory=dict)
is_active: bool = True
class SessionManager:
"""Multi-tenant session manager"""
def __init__(self, max_sessions_per_user: int = 5):
self.sessions: dict = {} # session_id -> ChatSession
self.user_sessions: dict = {} # user_id -> list of session_ids
self.max_sessions_per_user = max_sessions_per_user
def create_session(self, user_id: str, metadata: Optional[dict] = None) -> str:
"""Create a new session"""
# Check per-user session limit
user_session_ids = self.user_sessions.get(user_id, [])
active_sessions = [
sid for sid in user_session_ids
if sid in self.sessions and self.sessions[sid].is_active
]
if len(active_sessions) >= self.max_sessions_per_user:
# Deactivate oldest session
oldest = min(
active_sessions,
key=lambda sid: self.sessions[sid].last_active,
)
self.sessions[oldest].is_active = False
session_id = str(uuid.uuid4())
session = ChatSession(
session_id=session_id,
user_id=user_id,
metadata=metadata or {},
)
self.sessions[session_id] = session
if user_id not in self.user_sessions:
self.user_sessions[user_id] = []
self.user_sessions[user_id].append(session_id)
return session_id
def get_session(self, session_id: str) -> Optional[ChatSession]:
"""Retrieve a session"""
session = self.sessions.get(session_id)
if session and session.is_active:
session.last_active = datetime.now()
return session
return None
def list_user_sessions(self, user_id: str) -> list:
"""List user sessions"""
session_ids = self.user_sessions.get(user_id, [])
return [
self.sessions[sid]
for sid in session_ids
if sid in self.sessions and self.sessions[sid].is_active
]
def close_session(self, session_id: str):
"""Close a session"""
if session_id in self.sessions:
self.sessions[session_id].is_active = False
# Usage example
session_mgr = SessionManager(max_sessions_per_user=3)
session_id = session_mgr.create_session("user-001", {"channel": "web"})
print(f"Created session: {session_id}")
Troubleshooting
Resolving Memory Bloat
When conversations grow long, memory usage can increase rapidly and degrade performance.
class MemoryBloatGuard:
"""Memory bloat prevention guard"""
def __init__(self, max_messages=100, max_token_estimate=50000):
self.max_messages = max_messages
self.max_token_estimate = max_token_estimate
def check_and_trim(self, messages: list) -> tuple:
"""Check memory state and trim"""
total_chars = sum(len(m.content) for m in messages)
estimated_tokens = total_chars // 4 # Rough token estimation
warnings = []
trimmed = messages
if len(messages) > self.max_messages:
warnings.append(
f"Message count exceeded: {len(messages)} > {self.max_messages}"
)
# Preserve system messages, remove oldest from the rest
system_msgs = [m for m in messages if isinstance(m, SystemMessage)]
non_system = [m for m in messages if not isinstance(m, SystemMessage)]
trimmed = system_msgs + non_system[-(self.max_messages - len(system_msgs)):]
if estimated_tokens > self.max_token_estimate:
warnings.append(
f"Token estimate exceeded: {estimated_tokens} > {self.max_token_estimate}"
)
return trimmed, warnings
# Usage
guard = MemoryBloatGuard(max_messages=50, max_token_estimate=30000)
Preventing Context Confusion
Solving the problem where AI confuses previous conversation content and provides incorrect information in long conversations.
class ContextClarityChecker:
"""Context clarity checker"""
def __init__(self, llm):
self.llm = llm
def check_ambiguity(self, user_input: str, recent_messages: list) -> dict:
"""Check user input for ambiguity"""
# Check for pronouns or demonstratives
ambiguous_patterns = ["that", "this", "it", "those", "them", "there", "here"]
has_ambiguity = any(p in user_input.lower().split() for p in ambiguous_patterns)
if not has_ambiguity:
return {"is_ambiguous": False, "resolved_input": user_input}
# Resolve ambiguity with LLM
context_text = "\n".join(
f"{type(m).__name__}: {m.content}" for m in recent_messages[-6:]
)
clarification_prompt = f"""Previous conversation context:
{context_text}
User's new input: {user_input}
Clarify what the pronouns or demonstratives in this input refer to.
Return the sentence with pronouns replaced by their actual referents."""
response = self.llm.invoke([HumanMessage(content=clarification_prompt)])
return {
"is_ambiguous": True,
"original_input": user_input,
"resolved_input": response.content,
}
Operational Notes
Performance Optimization Tips
- Memory type selection: Choose the appropriate memory type based on conversation length. Buffer is recommended for 10 turns or fewer, SummaryBuffer for longer conversations
- Vector store indexing: When using RAG memory, configure appropriate indexes (HNSW, IVF) to optimize search performance
- Redis TTL management: Set appropriate session expiration times to prevent memory leaks
- Asynchronous summarization: Perform conversation summarization asynchronously after response delivery to reduce latency
Security Considerations
- Use UUID v4 for session IDs to ensure unpredictability
- Encrypt conversation content at rest (AES-256)
- Ensure strict session isolation between users
- Apply PII (Personally Identifiable Information) masking
- Establish conversation history retention policies
Production Checklist
- [ ] Select appropriate memory type for conversation length
- [ ] Implement session management system (create, query, expire, delete)
- [ ] Integrate persistent storage (Redis/PostgreSQL) with failure handling
- [ ] Implement memory bloat prevention logic (max messages, token limits)
- [ ] Build context compression (summarization) pipeline
- [ ] Pass multi-tenant session isolation tests
- [ ] Apply PII masking and conversation encryption
- [ ] Configure session expiration policies and TTL
- [ ] Establish conversation history backup and retention policies
- [ ] Build monitoring dashboards (session count, memory usage, response time)
- [ ] Perform load testing (concurrent sessions, performance by conversation length)
- [ ] Test failure recovery scenarios (Redis down, DB connection failure)