Skip to content
Published on

Chatbot Multi-Turn Memory Management Guide: Context Retention Strategies with LangChain and LangGraph

Authors
  • Name
    Twitter
Chatbot Multi-Turn Memory Management

Introduction

When building chatbots, the most fundamental yet challenging problem is maintaining context across multi-turn conversations. Simple question-answering (single-turn) can handle each request independently, but real conversations build on previous content. To answer "How much is that?", the bot needs to understand what "that" refers to from the prior conversation.

LLM context windows are finite. Even GPT-4o's 128K tokens may not accommodate hundreds of conversation turns, and token costs increase rapidly. Therefore, a memory management strategy addressing what information to retain and how much is essential.

This guide compares LangChain's various memory types, demonstrates building stateful agents with LangGraph, implementing persistent memory with databases, and integrating RAG -- all aimed at production-grade multi-turn conversation systems.

Core Challenges of Multi-Turn Conversations

Context Window Limitations

LLMs have a finite number of tokens they can process in a single API call. As conversations grow longer, early conversation content gets truncated or costs skyrocket.

# Problem scenario: early context is lost as conversations grow
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage

llm = ChatOpenAI(model="gpt-4o", temperature=0)

# Simulating a 100-turn conversation
messages = []
for i in range(100):
    messages.append(HumanMessage(content=f"Turn {i}: This is question number {i}."))
    messages.append(AIMessage(content=f"Turn {i}: This is answer number {i}."))

# Sending all messages risks exceeding the token limit
# Solution: Apply memory management strategies
print(f"Total messages: {len(messages)}")

Relevance Decay

As conversations progress, the relevance of early messages decreases. Sending all conversation history with equal weight is inefficient.

ProblemDescriptionImpact
Token Limit ExceededLong conversations exceed context windowAPI errors or early conversation loss
Cost IncreaseUnnecessary past conversations sent every timeToken costs skyrocket
Relevance DilutionKey information buried in irrelevant conversationResponse quality degrades
Latency IncreaseLong prompts take time to processUser experience suffers
Hallucination IncreaseIncorrect reasoning from excessive contextReliability drops

LangChain Memory Types

ConversationBufferMemory

The simplest memory type that stores all conversation content as-is.

from langchain_openai import ChatOpenAI
from langchain.chains import ConversationChain
from langchain.memory import ConversationBufferMemory

llm = ChatOpenAI(model="gpt-4o", temperature=0.7)

# Buffer Memory: retains all conversation content as-is
memory = ConversationBufferMemory(return_messages=True)

conversation = ConversationChain(
    llm=llm,
    memory=memory,
    verbose=True,
)

# Conduct conversation
response1 = conversation.predict(input="Hi, my name is John. I'm a Python developer.")
print(f"AI: {response1}")

response2 = conversation.predict(input="My favorite framework is FastAPI.")
print(f"AI: {response2}")

response3 = conversation.predict(input="What's my name again?")
print(f"AI: {response3}")
# AI remembers previous conversation and answers "John"

# Check memory contents
print("\n=== Memory Contents ===")
for msg in memory.chat_memory.messages:
    print(f"  {type(msg).__name__}: {msg.content[:80]}...")

ConversationBufferWindowMemory

A sliding window approach that retains only the most recent N conversations.

from langchain.memory import ConversationBufferWindowMemory

# Keep only the last 5 turns
memory = ConversationBufferWindowMemory(
    k=5,
    return_messages=True,
)

conversation = ConversationChain(
    llm=llm,
    memory=memory,
    verbose=True,
)

# Conduct 10 turns of conversation
for i in range(10):
    response = conversation.predict(input=f"This is message number {i+1}.")
    print(f"Turn {i+1}: {response[:50]}...")

# With k=5, message 1 is deleted when message 6 arrives
print(f"\nMessages stored in memory: {len(memory.chat_memory.messages)}")

ConversationSummaryMemory

Uses an LLM to summarize and store conversation content. Retains key information even across long conversations.

from langchain.memory import ConversationSummaryMemory

# Summary Memory: LLM summarizes and stores conversations
memory = ConversationSummaryMemory(
    llm=llm,
    return_messages=True,
)

conversation = ConversationChain(
    llm=llm,
    memory=memory,
    verbose=True,
)

# Multiple turns of conversation
conversation.predict(input="Hello, I'm a backend developer based in Seoul.")
conversation.predict(input="I mainly use Python and Go, working in Kubernetes environments.")
conversation.predict(input="Recently I've been developing chatbots with LangChain.")
conversation.predict(input="Building RAG pipelines is my main task.")

# Check summary content
print("\n=== Summary ===")
print(memory.buffer)
# A summary is stored instead of the full conversation

ConversationSummaryBufferMemory

A hybrid approach combining summary and buffer. Recent conversations are kept as originals while older ones are summarized.

from langchain.memory import ConversationSummaryBufferMemory

# Summary + Buffer hybrid: recent as original, past as summary
memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=300,  # Older conversations are summarized when this limit is exceeded
    return_messages=True,
)

conversation = ConversationChain(
    llm=llm,
    memory=memory,
    verbose=True,
)

# Conduct conversation
conversation.predict(input="Project A is an e-commerce platform.")
conversation.predict(input="The tech stack is Next.js, FastAPI, PostgreSQL.")
conversation.predict(input="Currently implementing the payment module.")
conversation.predict(input="Webhook handling with the payment gateway is tricky.")
conversation.predict(input="We also need to set up a test environment.")

# Check memory state
print("\n=== Moving Summary ===")
print(memory.moving_summary_buffer)
print(f"\nCurrent buffer message count: {len(memory.chat_memory.messages)}")

EntityMemory

Extracts and manages entities (people, places, concepts) from conversations.

from langchain.memory import ConversationEntityMemory
from langchain.memory.prompt import ENTITY_MEMORY_CONVERSATION_TEMPLATE

# Entity Memory: extracts and updates entities from conversations
memory = ConversationEntityMemory(
    llm=llm,
    return_messages=True,
)

conversation = ConversationChain(
    llm=llm,
    memory=memory,
    prompt=ENTITY_MEMORY_CONVERSATION_TEMPLATE,
    verbose=True,
)

# Conversations containing entities
conversation.predict(
    input="Alice is our team's senior developer. She's a Python expert with 5 years of experience."
)
conversation.predict(
    input="Bob is a product manager working with Alice on a recommendation system project."
)
conversation.predict(
    input="Alice recently introduced MLflow for experiment management."
)

# Check entity information
print("\n=== Entity Store ===")
for entity, info in memory.entity_store.store.items():
    print(f"  {entity}: {info}")

Memory Type Comparison

Memory TypeProsConsBest For
BufferFull conversation preserved, simple implementationToken costs increase, window overflow riskShort conversations, prototypes
BufferWindowPredictable costs, retains latest infoOlder information lostCustomer support, FAQ bots
SummaryRetains key info across long conversationsInformation loss during summarization, extra LLM call costLong conversations, support history
SummaryBufferCombines recent originals with past summariesComplex setup, depends on summary qualityTech support, project conversations
EntityTracks key entitiesEntity extraction errors possible, extra costCRM bots, contact management systems

LangGraph Stateful Agents

LangGraph Basic Structure

LangGraph models conversations as state graphs. Each node represents a processing step, and edges represent state transitions.

from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage

llm = ChatOpenAI(model="gpt-4o", temperature=0.7)

# Define a state-based conversation graph
def chatbot_node(state: MessagesState):
    """Main chatbot node"""
    system_message = SystemMessage(
        content="You are a friendly AI assistant. Respond considering the previous conversation context."
    )
    messages = [system_message] + state["messages"]
    response = llm.invoke(messages)
    return {"messages": [response]}

# Build graph
graph_builder = StateGraph(MessagesState)
graph_builder.add_node("chatbot", chatbot_node)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)

# Add memory checkpointer (persist conversation state)
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)

# Per-session conversation (sessions distinguished by thread_id)
config = {"configurable": {"thread_id": "user-session-001"}}

# First message
response1 = graph.invoke(
    {"messages": [HumanMessage(content="Hello, I'm a data engineer.")]},
    config=config,
)
print(f"AI: {response1['messages'][-1].content}")

# Second message (previous conversation automatically maintained)
response2 = graph.invoke(
    {"messages": [HumanMessage(content="What's my profession again?")]},
    config=config,
)
print(f"AI: {response2['messages'][-1].content}")

Conditional Routing and Tool Usage

from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

@tool
def search_knowledge_base(query: str) -> str:
    """Search the knowledge base for information."""
    # In practice, this would perform vector DB searches
    knowledge = {
        "refund policy": "Full refund within 14 days of purchase.",
        "shipping time": "Delivery within 2-3 business days after ordering.",
        "membership tiers": "Bronze, Silver, Gold, and Platinum - 4 tiers.",
    }
    for key, value in knowledge.items():
        if key in query.lower():
            return value
    return "No relevant information found."

@tool
def get_order_status(order_id: str) -> str:
    """Look up order status."""
    # In practice, this would query a database
    return f"Order {order_id}: In transit (expected arrival: 2026-03-13)"

# Bind tools
tools = [search_knowledge_base, get_order_status]
llm = ChatOpenAI(model="gpt-4o", temperature=0).bind_tools(tools)

def assistant_node(state: MessagesState):
    """Assistant node: LLM invocation and tool usage decisions"""
    system_msg = SystemMessage(
        content="You are an e-commerce customer support chatbot. Use tools when needed."
    )
    messages = [system_msg] + state["messages"]
    response = llm.invoke(messages)
    return {"messages": [response]}

# Build graph
graph_builder = StateGraph(MessagesState)
graph_builder.add_node("assistant", assistant_node)
graph_builder.add_node("tools", ToolNode(tools))

graph_builder.add_edge(START, "assistant")
graph_builder.add_conditional_edges("assistant", tools_condition)
graph_builder.add_edge("tools", "assistant")

memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)

# Run conversation
config = {"configurable": {"thread_id": "customer-123"}}

response = graph.invoke(
    {"messages": [HumanMessage(content="What's the shipping status for order ORD-2026-0311?")]},
    config=config,
)
print(f"AI: {response['messages'][-1].content}")

Persistent Memory Implementation

Redis-Based Session Management

import redis
import json
from datetime import datetime, timedelta
from langchain_core.messages import HumanMessage, AIMessage, messages_from_dict, messages_to_dict

class RedisSessionMemory:
    """Redis-based conversation session memory"""

    def __init__(self, redis_url="redis://localhost:6379", ttl_hours=24):
        self.redis = redis.from_url(redis_url)
        self.ttl = timedelta(hours=ttl_hours)

    def _key(self, session_id: str) -> str:
        return f"chat:session:{session_id}"

    def save_messages(self, session_id: str, messages: list):
        """Save message list to Redis"""
        key = self._key(session_id)
        data = {
            "messages": messages_to_dict(messages),
            "updated_at": datetime.now().isoformat(),
        }
        self.redis.setex(key, self.ttl, json.dumps(data, ensure_ascii=False))

    def load_messages(self, session_id: str) -> list:
        """Load message list from Redis"""
        key = self._key(session_id)
        data = self.redis.get(key)
        if data is None:
            return []
        parsed = json.loads(data)
        return messages_from_dict(parsed["messages"])

    def add_message(self, session_id: str, message):
        """Add a single message"""
        messages = self.load_messages(session_id)
        messages.append(message)
        self.save_messages(session_id, messages)

    def clear_session(self, session_id: str):
        """Delete a session"""
        self.redis.delete(self._key(session_id))

    def get_session_info(self, session_id: str) -> dict:
        """Query session metadata"""
        key = self._key(session_id)
        data = self.redis.get(key)
        if data is None:
            return {"exists": False}
        parsed = json.loads(data)
        return {
            "exists": True,
            "message_count": len(parsed["messages"]),
            "updated_at": parsed["updated_at"],
            "ttl_seconds": self.redis.ttl(key),
        }

# Usage example
session_memory = RedisSessionMemory(redis_url="redis://localhost:6379")

session_id = "user-abc-123"
session_memory.add_message(session_id, HumanMessage(content="Hello"))
session_memory.add_message(session_id, AIMessage(content="Hello! How can I help you?"))

messages = session_memory.load_messages(session_id)
print(f"Stored messages: {len(messages)}")

PostgreSQL-Based Long-Term Memory

from sqlalchemy import create_engine, Column, String, Text, DateTime, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import json

Base = declarative_base()

class ConversationHistory(Base):
    """Conversation history table"""
    __tablename__ = "conversation_history"

    id = Column(Integer, primary_key=True, autoincrement=True)
    session_id = Column(String(255), index=True, nullable=False)
    user_id = Column(String(255), index=True, nullable=False)
    role = Column(String(50), nullable=False)  # human, ai, system
    content = Column(Text, nullable=False)
    metadata_json = Column(Text, default="{}")
    created_at = Column(DateTime, default=datetime.utcnow)

class ConversationSummaryStore(Base):
    """Conversation summary table"""
    __tablename__ = "conversation_summaries"

    id = Column(Integer, primary_key=True, autoincrement=True)
    session_id = Column(String(255), unique=True, nullable=False)
    user_id = Column(String(255), index=True, nullable=False)
    summary = Column(Text, nullable=False)
    entity_data = Column(Text, default="{}")
    message_count = Column(Integer, default=0)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

class PostgresMemoryManager:
    """PostgreSQL-based conversation memory manager"""

    def __init__(self, database_url: str):
        self.engine = create_engine(database_url)
        Base.metadata.create_all(self.engine)
        self.Session = sessionmaker(bind=self.engine)

    def save_message(self, session_id: str, user_id: str, role: str, content: str):
        """Save a message"""
        session = self.Session()
        try:
            msg = ConversationHistory(
                session_id=session_id,
                user_id=user_id,
                role=role,
                content=content,
            )
            session.add(msg)
            session.commit()
        finally:
            session.close()

    def get_recent_messages(self, session_id: str, limit: int = 20):
        """Query recent messages"""
        session = self.Session()
        try:
            messages = (
                session.query(ConversationHistory)
                .filter(ConversationHistory.session_id == session_id)
                .order_by(ConversationHistory.created_at.desc())
                .limit(limit)
                .all()
            )
            return list(reversed(messages))
        finally:
            session.close()

    def save_summary(self, session_id: str, user_id: str, summary: str,
                     entity_data: dict, message_count: int):
        """Save/update conversation summary"""
        session = self.Session()
        try:
            existing = (
                session.query(ConversationSummaryStore)
                .filter(ConversationSummaryStore.session_id == session_id)
                .first()
            )
            if existing:
                existing.summary = summary
                existing.entity_data = json.dumps(entity_data, ensure_ascii=False)
                existing.message_count = message_count
            else:
                new_summary = ConversationSummaryStore(
                    session_id=session_id,
                    user_id=user_id,
                    summary=summary,
                    entity_data=json.dumps(entity_data, ensure_ascii=False),
                    message_count=message_count,
                )
                session.add(new_summary)
            session.commit()
        finally:
            session.close()

# Usage example
db_url = "postgresql://chatbot:password@localhost:5432/chatbot_db"
memory_manager = PostgresMemoryManager(db_url)

Context Compression Techniques

Summary + Recent Messages Combination

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

class HybridMemoryManager:
    """Hybrid memory combining summary and recent buffer"""

    def __init__(self, llm, max_buffer_messages=10):
        self.llm = llm
        self.max_buffer_messages = max_buffer_messages
        self.summary = ""
        self.buffer = []

    def add_exchange(self, human_msg: str, ai_msg: str):
        """Add a conversation exchange"""
        self.buffer.append(HumanMessage(content=human_msg))
        self.buffer.append(AIMessage(content=ai_msg))

        # Compress old messages into summary when buffer exceeds limit
        if len(self.buffer) > self.max_buffer_messages * 2:
            self._compress()

    def _compress(self):
        """Integrate old messages into summary"""
        # Select first half for summarization
        to_summarize = self.buffer[: self.max_buffer_messages]
        self.buffer = self.buffer[self.max_buffer_messages:]

        # Generate summary
        conversation_text = "\n".join(
            f"{'Human' if isinstance(m, HumanMessage) else 'AI'}: {m.content}"
            for m in to_summarize
        )
        summary_prompt = f"""Below is the previous summary and new conversation. Write an integrated summary.

Previous summary: {self.summary if self.summary else 'None'}

New conversation:
{conversation_text}

Summarize concisely while maintaining key information and context:"""

        response = self.llm.invoke([HumanMessage(content=summary_prompt)])
        self.summary = response.content

    def get_context_messages(self) -> list:
        """Return current context messages"""
        messages = []
        if self.summary:
            messages.append(SystemMessage(
                content=f"Previous conversation summary: {self.summary}"
            ))
        messages.extend(self.buffer)
        return messages

    def get_stats(self) -> dict:
        """Return memory statistics"""
        return {
            "summary_length": len(self.summary),
            "buffer_messages": len(self.buffer),
            "has_summary": bool(self.summary),
        }

# Usage example
llm = ChatOpenAI(model="gpt-4o", temperature=0)
hybrid_memory = HybridMemoryManager(llm=llm, max_buffer_messages=6)

# Simulate conversation
exchanges = [
    ("What's the project timeline?", "We're targeting end of March for deployment."),
    ("Who handles backend development?", "Senior developer Kim is responsible."),
    ("And frontend?", "Developer Lee is building it with React."),
    ("Is there a test plan?", "QA team will run integration tests in the first week of April."),
    ("What's the deployment environment?", "AWS EKS-based Kubernetes environment."),
    ("What about CI/CD pipeline?", "We use GitHub Actions and ArgoCD."),
    ("How do you handle monitoring?", "We collect metrics with Grafana and Prometheus."),
]

for human, ai in exchanges:
    hybrid_memory.add_exchange(human, ai)

stats = hybrid_memory.get_stats()
print(f"Memory state: {stats}")

RAG-Enhanced Memory

Conversation-Based RAG Pipeline

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.messages import HumanMessage, SystemMessage
from langchain.text_splitter import RecursiveCharacterTextSplitter

class RAGEnhancedMemory:
    """RAG-enhanced conversation memory"""

    def __init__(self, llm, embeddings, collection_name="chat_memory"):
        self.llm = llm
        self.embeddings = embeddings
        self.vectorstore = Chroma(
            collection_name=collection_name,
            embedding_function=embeddings,
        )
        self.recent_messages = []
        self.max_recent = 10

    def add_exchange(self, session_id: str, human_msg: str, ai_msg: str):
        """Store conversation in vector store"""
        # Add to recent messages buffer
        self.recent_messages.append(("human", human_msg))
        self.recent_messages.append(("ai", ai_msg))

        # Store conversation embedding in vector store
        exchange_text = f"User: {human_msg}\nAI: {ai_msg}"
        self.vectorstore.add_texts(
            texts=[exchange_text],
            metadatas=[{"session_id": session_id, "type": "exchange"}],
        )

        # Maintain recent message limit
        if len(self.recent_messages) > self.max_recent * 2:
            self.recent_messages = self.recent_messages[-self.max_recent * 2:]

    def retrieve_relevant_context(self, query: str, k: int = 3) -> list:
        """Search for past conversations relevant to the query"""
        results = self.vectorstore.similarity_search(query, k=k)
        return [doc.page_content for doc in results]

    def generate_response(self, session_id: str, user_input: str) -> str:
        """Generate RAG-based response"""
        # Retrieve relevant past conversations
        relevant_context = self.retrieve_relevant_context(user_input)

        # Build context
        context_parts = []
        if relevant_context:
            context_parts.append("Relevant previous conversations:")
            for ctx in relevant_context:
                context_parts.append(f"  - {ctx}")

        system_content = "You are an AI assistant that leverages previous conversation context in your responses."
        if context_parts:
            system_content += "\n\n" + "\n".join(context_parts)

        messages = [SystemMessage(content=system_content)]

        # Add recent messages
        for role, content in self.recent_messages[-6:]:
            if role == "human":
                messages.append(HumanMessage(content=content))
            else:
                from langchain_core.messages import AIMessage
                messages.append(AIMessage(content=content))

        messages.append(HumanMessage(content=user_input))
        response = self.llm.invoke(messages)

        # Save conversation
        self.add_exchange(session_id, user_input, response.content)

        return response.content

# Usage example
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
embeddings = OpenAIEmbeddings()
rag_memory = RAGEnhancedMemory(llm=llm, embeddings=embeddings)

Session Management Patterns

Multi-Tenant Session Management

from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime
import uuid

@dataclass
class ChatSession:
    """Chat session"""
    session_id: str
    user_id: str
    created_at: datetime = field(default_factory=datetime.now)
    last_active: datetime = field(default_factory=datetime.now)
    metadata: dict = field(default_factory=dict)
    is_active: bool = True

class SessionManager:
    """Multi-tenant session manager"""

    def __init__(self, max_sessions_per_user: int = 5):
        self.sessions: dict = {}  # session_id -> ChatSession
        self.user_sessions: dict = {}  # user_id -> list of session_ids
        self.max_sessions_per_user = max_sessions_per_user

    def create_session(self, user_id: str, metadata: Optional[dict] = None) -> str:
        """Create a new session"""
        # Check per-user session limit
        user_session_ids = self.user_sessions.get(user_id, [])
        active_sessions = [
            sid for sid in user_session_ids
            if sid in self.sessions and self.sessions[sid].is_active
        ]

        if len(active_sessions) >= self.max_sessions_per_user:
            # Deactivate oldest session
            oldest = min(
                active_sessions,
                key=lambda sid: self.sessions[sid].last_active,
            )
            self.sessions[oldest].is_active = False

        session_id = str(uuid.uuid4())
        session = ChatSession(
            session_id=session_id,
            user_id=user_id,
            metadata=metadata or {},
        )
        self.sessions[session_id] = session

        if user_id not in self.user_sessions:
            self.user_sessions[user_id] = []
        self.user_sessions[user_id].append(session_id)

        return session_id

    def get_session(self, session_id: str) -> Optional[ChatSession]:
        """Retrieve a session"""
        session = self.sessions.get(session_id)
        if session and session.is_active:
            session.last_active = datetime.now()
            return session
        return None

    def list_user_sessions(self, user_id: str) -> list:
        """List user sessions"""
        session_ids = self.user_sessions.get(user_id, [])
        return [
            self.sessions[sid]
            for sid in session_ids
            if sid in self.sessions and self.sessions[sid].is_active
        ]

    def close_session(self, session_id: str):
        """Close a session"""
        if session_id in self.sessions:
            self.sessions[session_id].is_active = False

# Usage example
session_mgr = SessionManager(max_sessions_per_user=3)
session_id = session_mgr.create_session("user-001", {"channel": "web"})
print(f"Created session: {session_id}")

Troubleshooting

Resolving Memory Bloat

When conversations grow long, memory usage can increase rapidly and degrade performance.

class MemoryBloatGuard:
    """Memory bloat prevention guard"""

    def __init__(self, max_messages=100, max_token_estimate=50000):
        self.max_messages = max_messages
        self.max_token_estimate = max_token_estimate

    def check_and_trim(self, messages: list) -> tuple:
        """Check memory state and trim"""
        total_chars = sum(len(m.content) for m in messages)
        estimated_tokens = total_chars // 4  # Rough token estimation

        warnings = []
        trimmed = messages

        if len(messages) > self.max_messages:
            warnings.append(
                f"Message count exceeded: {len(messages)} > {self.max_messages}"
            )
            # Preserve system messages, remove oldest from the rest
            system_msgs = [m for m in messages if isinstance(m, SystemMessage)]
            non_system = [m for m in messages if not isinstance(m, SystemMessage)]
            trimmed = system_msgs + non_system[-(self.max_messages - len(system_msgs)):]

        if estimated_tokens > self.max_token_estimate:
            warnings.append(
                f"Token estimate exceeded: {estimated_tokens} > {self.max_token_estimate}"
            )

        return trimmed, warnings

# Usage
guard = MemoryBloatGuard(max_messages=50, max_token_estimate=30000)

Preventing Context Confusion

Solving the problem where AI confuses previous conversation content and provides incorrect information in long conversations.

class ContextClarityChecker:
    """Context clarity checker"""

    def __init__(self, llm):
        self.llm = llm

    def check_ambiguity(self, user_input: str, recent_messages: list) -> dict:
        """Check user input for ambiguity"""
        # Check for pronouns or demonstratives
        ambiguous_patterns = ["that", "this", "it", "those", "them", "there", "here"]
        has_ambiguity = any(p in user_input.lower().split() for p in ambiguous_patterns)

        if not has_ambiguity:
            return {"is_ambiguous": False, "resolved_input": user_input}

        # Resolve ambiguity with LLM
        context_text = "\n".join(
            f"{type(m).__name__}: {m.content}" for m in recent_messages[-6:]
        )

        clarification_prompt = f"""Previous conversation context:
{context_text}

User's new input: {user_input}

Clarify what the pronouns or demonstratives in this input refer to.
Return the sentence with pronouns replaced by their actual referents."""

        response = self.llm.invoke([HumanMessage(content=clarification_prompt)])

        return {
            "is_ambiguous": True,
            "original_input": user_input,
            "resolved_input": response.content,
        }

Operational Notes

Performance Optimization Tips

  1. Memory type selection: Choose the appropriate memory type based on conversation length. Buffer is recommended for 10 turns or fewer, SummaryBuffer for longer conversations
  2. Vector store indexing: When using RAG memory, configure appropriate indexes (HNSW, IVF) to optimize search performance
  3. Redis TTL management: Set appropriate session expiration times to prevent memory leaks
  4. Asynchronous summarization: Perform conversation summarization asynchronously after response delivery to reduce latency

Security Considerations

  • Use UUID v4 for session IDs to ensure unpredictability
  • Encrypt conversation content at rest (AES-256)
  • Ensure strict session isolation between users
  • Apply PII (Personally Identifiable Information) masking
  • Establish conversation history retention policies

Production Checklist

  • [ ] Select appropriate memory type for conversation length
  • [ ] Implement session management system (create, query, expire, delete)
  • [ ] Integrate persistent storage (Redis/PostgreSQL) with failure handling
  • [ ] Implement memory bloat prevention logic (max messages, token limits)
  • [ ] Build context compression (summarization) pipeline
  • [ ] Pass multi-tenant session isolation tests
  • [ ] Apply PII masking and conversation encryption
  • [ ] Configure session expiration policies and TTL
  • [ ] Establish conversation history backup and retention policies
  • [ ] Build monitoring dashboards (session count, memory usage, response time)
  • [ ] Perform load testing (concurrent sessions, performance by conversation length)
  • [ ] Test failure recovery scenarios (Redis down, DB connection failure)

References