Skip to content

필사 모드: Multi-Turn Conversation Management and Context Optimization: LLM Chatbot Memory Patterns, Conversation Summarization, and Sliding Window Strategies

English
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

Introduction

The most fundamental challenge in LLM-based chatbots is **effectively managing context in multi-turn conversations**. Since LLMs are inherently stateless, the entire conversation history must be sent with every API call. However, context windows are finite, and token costs increase proportionally with conversation length.

Even with large context windows like GPT-4o's 128K tokens or Claude's 200K tokens, production environments can easily reach these limits during hundreds of turns of customer support conversations or extended technical support sessions. Moreover, the "Lost in the Middle" phenomenon means models cannot effectively utilize information buried in the middle of long contexts.

This article covers various LLM chatbot memory patterns (Buffer, Summary, Vector Store), Sliding Window strategies, conversation summarization techniques, token cost optimization, and production architecture patterns with practical code examples.

Context Window Limitations and Cost Analysis

Major LLM Context Window Comparison

| Model | Context Window | Input Cost (1M tokens) | Output Cost (1M tokens) | Notes |

| ----------------- | -------------- | ---------------------- | ----------------------- | --------------- |

| GPT-4o | 128K | 2.50 USD | 10.00 USD | General purpose |

| GPT-4o-mini | 128K | 0.15 USD | 0.60 USD | Lightweight |

| Claude 3.5 Sonnet | 200K | 3.00 USD | 15.00 USD | Long context |

| Gemini 1.5 Pro | 2M | 1.25 USD | 5.00 USD | Largest window |

| Llama 3.1 405B | 128K | Self-hosted | Self-hosted | Open source |

Token Budget Design

When designing per-conversation token budgets, you need to separately manage system prompts, conversation history, and response space.

class TokenBudgetManager:

"""Token budget management class"""

def __init__(self, model: str = "gpt-4o", max_context: int = 128000):

self.encoding = tiktoken.encoding_for_model(model)

self.max_context = max_context

Budget allocation: System 15%, History 60%, Response 25%

self.system_budget = int(max_context * 0.15)

self.history_budget = int(max_context * 0.60)

self.response_budget = int(max_context * 0.25)

def count_tokens(self, text: str) -> int:

"""Count tokens in text"""

return len(self.encoding.encode(text))

def count_message_tokens(self, messages: list[dict]) -> int:

"""Count total tokens in message list"""

total = 0

for msg in messages:

total += self.count_tokens(msg["content"])

total += 4 # Message metadata overhead

total += 2 # Start/end tokens

return total

def get_available_history_tokens(self, system_tokens: int) -> int:

"""Return available tokens for conversation history"""

used = system_tokens + self.response_budget

return self.max_context - used

def should_summarize(self, history_tokens: int) -> bool:

"""Recommend summarization when history exceeds 80% of budget"""

return history_tokens > self.history_budget * 0.8

Usage example

budget = TokenBudgetManager(model="gpt-4o")

system_prompt = "You are a customer support AI specialist..."

system_tokens = budget.count_tokens(system_prompt)

print(f"System prompt: {system_tokens} tokens")

print(f"History budget: {budget.history_budget} tokens")

print(f"Response budget: {budget.response_budget} tokens")

Cost Escalation Simulation

def calculate_cost_per_turn(turns: int, avg_tokens_per_turn: int = 200,

input_cost_per_1m: float = 2.50) -> float:

"""Calculate cumulative input cost by turn count"""

Assumes entire history is sent with each turn

total_tokens = 0

cumulative_cost = 0

for t in range(1, turns + 1):

total_tokens = t * avg_tokens_per_turn # Input tokens for current turn

turn_cost = (total_tokens / 1_000_000) * input_cost_per_1m

cumulative_cost += turn_cost

return cumulative_cost

Cost comparison by memory strategy

turns = np.arange(1, 101)

cost_no_memory = [calculate_cost_per_turn(t) for t in turns]

Sliding Window (keep only last 20 turns)

cost_sliding = [calculate_cost_per_turn(min(t, 20)) for t in turns]

Summary Memory (1/5 compression via summarization)

cost_summary = [calculate_cost_per_turn(t, avg_tokens_per_turn=40) for t in turns]

print(f"100-turn cost (no memory): ${cost_no_memory[-1]:.4f}")

print(f"100-turn cost (Sliding Window): ${cost_sliding[-1]:.4f}")

print(f"100-turn cost (Summary): ${cost_summary[-1]:.4f}")

Memory Pattern Comparative Analysis

Pattern Characteristics Comparison

| Memory Pattern | Token Usage | Info Retention | Latency | Implementation Complexity | Best For |

| -------------- | ------------- | ---------------- | ------- | ------------------------- | ------------------- |

| Buffer Memory | O(n) linear | 100% | Low | Low | Short conversations |

| Window Memory | O(k) fixed | Last k turns | Low | Low | General chatbots |

| Summary Memory | O(1) fixed | Summary only | Medium | Medium | Long conversations |

| Summary Buffer | O(k+1) | Summary + recent | Medium | Medium | Balanced |

| Vector Store | O(k) search | Semantic-based | High | High | Knowledge-intensive |

| Entity Memory | O(e) entities | Per-entity | Medium | High | Personalization |

1. Buffer Memory - Full History Storage

The simplest pattern that retains the entire conversation history as-is.

from langchain_openai import ChatOpenAI

from langchain.memory import ConversationBufferMemory

from langchain.chains import ConversationChain

Buffer Memory: stores all conversations as-is

llm = ChatOpenAI(model="gpt-4o", temperature=0.7)

memory = ConversationBufferMemory(return_messages=True)

chain = ConversationChain(llm=llm, memory=memory, verbose=True)

Conversation flow

response1 = chain.predict(input="Hello, we have a server outage")

response2 = chain.predict(input="Checking the logs, I see OOM errors")

response3 = chain.predict(input="How can I check current memory usage?")

Review entire history stored in memory

for msg in memory.chat_memory.messages:

role = "User" if msg.type == "human" else "AI"

print(f"[{role}] {msg.content[:80]}...")

**Limitation**: As conversations grow longer, token usage increases linearly, causing costs and latency to spike.

2. Sliding Window Memory - Keep Only Recent N Turns

Maintains a fixed-size window while removing older conversations.

from langchain.memory import ConversationBufferWindowMemory

Sliding Window keeping only the last 10 messages (5 pairs)

window_memory = ConversationBufferWindowMemory(

k=10, # Keep last 10 messages

return_messages=True

)

chain = ConversationChain(llm=llm, memory=window_memory, verbose=True)

Token-based Window implementation (custom)

class TokenWindowMemory:

"""Memory that manages conversation history based on token count"""

def __init__(self, max_tokens: int = 4000, model: str = "gpt-4o"):

self.max_tokens = max_tokens

self.encoding = tiktoken.encoding_for_model(model)

self.messages: list[dict] = []

def add_message(self, role: str, content: str):

self.messages.append({"role": role, "content": content})

self._trim()

def _trim(self):

"""Remove oldest messages when token limit is exceeded"""

while self._total_tokens() > self.max_tokens and len(self.messages) > 2:

Preserve first system message

self.messages.pop(0 if self.messages[0]["role"] != "system" else 1)

def _total_tokens(self) -> int:

return sum(

len(self.encoding.encode(m["content"])) + 4

for m in self.messages

)

def get_messages(self) -> list[dict]:

return self.messages.copy()

Usage example

token_window = TokenWindowMemory(max_tokens=4000)

token_window.add_message("system", "You are a technical support expert.")

token_window.add_message("user", "My Docker container keeps restarting.")

token_window.add_message("assistant", "Let me check if it is in OOMKilled state...")

print(f"Current token usage: {token_window._total_tokens()}")

3. Summary Memory - Compression Through Conversation Summarization

Uses an LLM to summarize previous conversations and leverages the summary as context.

from langchain.memory import ConversationSummaryMemory

Summary Memory: automatically summarizes conversations with LLM

summary_memory = ConversationSummaryMemory(

llm=ChatOpenAI(model="gpt-4o-mini", temperature=0), # Lightweight model for summarization

return_messages=True

)

Summary Buffer Memory: combines summary + recent conversations

from langchain.memory import ConversationSummaryBufferMemory

summary_buffer = ConversationSummaryBufferMemory(

llm=ChatOpenAI(model="gpt-4o-mini", temperature=0),

max_token_limit=2000, # Summarizes older messages when this limit is exceeded

return_messages=True

)

Custom Progressive Summarization implementation

class ProgressiveSummarizer:

"""Progressive summarization: performs staged summarization as conversations accumulate"""

def __init__(self, llm, summarize_threshold: int = 10):

self.llm = llm

self.summarize_threshold = summarize_threshold

self.summary = ""

self.recent_messages: list[dict] = []

self.turn_count = 0

async def add_exchange(self, user_msg: str, ai_msg: str):

self.recent_messages.append({"role": "user", "content": user_msg})

self.recent_messages.append({"role": "assistant", "content": ai_msg})

self.turn_count += 1

if self.turn_count % self.summarize_threshold == 0:

await self._summarize()

async def _summarize(self):

"""Integrate recent conversation into existing summary"""

messages_text = "\n".join(

f"{m['role']}: {m['content']}" for m in self.recent_messages

)

prompt = f"""Previous summary:

{self.summary if self.summary else '(none)'}

Recent conversation:

{messages_text}

Create a concise summary that integrates the previous summary with

the recent conversation, preserving key information. Include user names,

preferences, and unresolved issues."""

response = await self.llm.ainvoke(prompt)

self.summary = response.content

self.recent_messages = self.recent_messages[-4:] # Keep only last 2 turns

def get_context(self) -> str:

parts = []

if self.summary:

parts.append(f"[Conversation Summary]\n{self.summary}")

if self.recent_messages:

recent = "\n".join(

f"{m['role']}: {m['content']}" for m in self.recent_messages

)

parts.append(f"[Recent Conversation]\n{recent}")

return "\n\n".join(parts)

4. Vector Store Memory - Semantic-Based Retrieval

Stores conversation history as vector embeddings and retrieves past conversations semantically similar to the current question.

from langchain.memory import VectorStoreRetrieverMemory

from langchain_openai import OpenAIEmbeddings

from langchain_community.vectorstores import Chroma

Vector Store-based memory setup

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

vectorstore = Chroma(

collection_name="conversation_memory",

embedding_function=embeddings,

persist_directory="./chroma_memory"

)

retriever = vectorstore.as_retriever(

search_type="similarity",

search_kwargs={"k": 5} # Retrieve top 5 most relevant conversations

)

vector_memory = VectorStoreRetrieverMemory(

retriever=retriever,

memory_key="relevant_history",

input_key="input"

)

Store conversations

vector_memory.save_context(

{"input": "What is the deployment schedule for Project A?"},

{"output": "Project A is scheduled for staging on March 15 and production on March 20."}

)

vector_memory.save_context(

{"input": "When is the database migration?"},

{"output": "The DB migration is scheduled for 2 AM on March 18."}

)

Retrieve relevant conversations

relevant = vector_memory.load_memory_variables(

{"input": "What should we check before Project A deployment?"}

)

print(relevant["relevant_history"])

Deep Dive into Sliding Window Strategies

Adaptive Sliding Window

A strategy that dynamically adjusts the window based on conversation importance rather than using a fixed size.

from dataclasses import dataclass, field

from datetime import datetime

from typing import Optional

@dataclass

class ConversationTurn:

role: str

content: str

timestamp: datetime

importance: float = 0.5 # 0.0 ~ 1.0

tokens: int = 0

turn_id: str = ""

def __post_init__(self):

if not self.turn_id:

self.turn_id = hashlib.md5(

f"{self.timestamp}{self.content[:50]}".encode()

).hexdigest()[:8]

class AdaptiveSlidingWindow:

"""Importance-based adaptive sliding window"""

def __init__(self, max_tokens: int = 8000, min_turns: int = 4):

self.max_tokens = max_tokens

self.min_turns = min_turns # Minimum turns to retain

self.turns: list[ConversationTurn] = []

self.archived: list[ConversationTurn] = []

def add_turn(self, turn: ConversationTurn):

self.turns.append(turn)

self._optimize()

def _calculate_importance(self, turn: ConversationTurn, index: int) -> float:

"""Calculate turn importance across multiple dimensions"""

score = turn.importance

Higher weight for more recent turns

recency = index / max(len(self.turns) - 1, 1)

score += recency * 0.3

Turns containing questions get higher importance

if "?" in turn.content or "how" in turn.content.lower():

score += 0.2

Error/incident related keywords

critical_keywords = ["error", "failure", "outage", "critical", "urgent", "fail"]

if any(kw in turn.content.lower() for kw in critical_keywords):

score += 0.3

return min(score, 1.0)

def _optimize(self):

"""Prioritize retaining important turns within token limit"""

total_tokens = sum(t.tokens for t in self.turns)

if total_tokens <= self.max_tokens:

return

Calculate importance scores

scored = [

(i, self._calculate_importance(t, i), t)

for i, t in enumerate(self.turns)

]

Always retain recent min_turns

protected = self.turns[-self.min_turns:]

candidates = scored[:-self.min_turns]

Sort by lowest importance and remove

candidates.sort(key=lambda x: x[1])

while total_tokens > self.max_tokens and candidates:

_, _, turn = candidates.pop(0)

self.archived.append(turn)

self.turns.remove(turn)

total_tokens -= turn.tokens

def get_context(self) -> list[dict]:

return [

{"role": t.role, "content": t.content}

for t in self.turns

]

Time-Based vs Token-Based Window Comparison

class TimeBasedWindow:

"""Time-based sliding window - keeps only conversations within last N minutes"""

def __init__(self, window_minutes: int = 30):

self.window_minutes = window_minutes

self.messages: list[dict] = []

def add_message(self, role: str, content: str):

self.messages.append({

"role": role,

"content": content,

"timestamp": datetime.now()

})

self._cleanup()

def _cleanup(self):

cutoff = datetime.now() - timedelta(minutes=self.window_minutes)

self.messages = [

m for m in self.messages

if m["timestamp"] > cutoff

]

def get_messages(self) -> list[dict]:

return [

{"role": m["role"], "content": m["content"]}

for m in self.messages

]

class HybridWindow:

"""Token + Time hybrid window"""

def __init__(self, max_tokens: int = 4000, max_minutes: int = 60):

self.max_tokens = max_tokens

self.max_minutes = max_minutes

self.token_window = TokenWindowMemory(max_tokens=max_tokens)

self.time_window = TimeBasedWindow(window_minutes=max_minutes)

def add_message(self, role: str, content: str):

self.token_window.add_message(role, content)

self.time_window.add_message(role, content)

def get_messages(self) -> list[dict]:

Use intersection of both windows (stricter filtering)

token_msgs = set(

m["content"] for m in self.token_window.get_messages()

)

time_msgs = self.time_window.get_messages()

return [m for m in time_msgs if m["content"] in token_msgs]

Conversation Summarization Techniques

Summarization Strategy Comparison

| Strategy | Summarization Timing | Token Savings | Information Loss | Additional Cost |

| -------------------- | -------------------- | ------------- | ---------------- | --------------- |

| Per-turn summary | After every exchange | 80-90% | Medium | High |

| Threshold summary | Every N turns | 60-80% | Low | Medium |

| Hierarchical summary | Staged | 70-85% | Very low | Medium |

| Selective summary | Importance-based | 50-70% | Minimal | Low |

Hierarchical Summarization System Implementation

from enum import Enum

from typing import Any

class MemoryTier(Enum):

SHORT_TERM = "short_term" # Recent conversation verbatim

MID_TERM = "mid_term" # Session summaries

LONG_TERM = "long_term" # Core facts/preferences

class HierarchicalMemory:

"""3-tier memory architecture"""

def __init__(self, llm, short_term_limit: int = 10,

mid_term_limit: int = 5):

self.llm = llm

self.short_term_limit = short_term_limit

self.mid_term_limit = mid_term_limit

self.short_term: list[dict] = [] # Recent verbatim messages

self.mid_term: list[str] = [] # Session summaries

self.long_term: dict[str, Any] = { # Persistent stored info

"user_name": None,

"preferences": [],

"key_facts": [],

"unresolved_issues": []

}

async def add_exchange(self, user_msg: str, ai_msg: str):

1. Add to short-term memory

self.short_term.append({"role": "user", "content": user_msg})

self.short_term.append({"role": "assistant", "content": ai_msg})

2. Promote to mid-term when short-term exceeds limit

if len(self.short_term) > self.short_term_limit * 2:

await self._promote_to_mid_term()

3. Extract to long-term when mid-term exceeds limit

if len(self.mid_term) > self.mid_term_limit:

await self._extract_to_long_term()

async def _promote_to_mid_term(self):

"""Short -> Mid: summarize older messages and promote"""

old_messages = self.short_term[:-6] # Exclude last 3 turns

self.short_term = self.short_term[-6:]

text = "\n".join(f"{m['role']}: {m['content']}" for m in old_messages)

prompt = f"Summarize the following conversation in 3-4 sentences:\n\n{text}"

response = await self.llm.ainvoke(prompt)

self.mid_term.append(response.content)

async def _extract_to_long_term(self):

"""Mid -> Long: extract key facts for permanent storage"""

summaries = "\n\n".join(self.mid_term[:-2])

self.mid_term = self.mid_term[-2:]

prompt = f"""Extract key information from the following conversation summaries as JSON:

{summaries}

Items to extract:

- user_preferences: user preferences

- key_facts: key facts

- unresolved_issues: unresolved issues"""

response = await self.llm.ainvoke(prompt)

Parse JSON and merge into long_term (error handling needed in production)

try:

extracted = json.loads(response.content)

self.long_term["preferences"].extend(

extracted.get("user_preferences", [])

)

self.long_term["key_facts"].extend(

extracted.get("key_facts", [])

)

self.long_term["unresolved_issues"] = extracted.get(

"unresolved_issues", []

)

except json.JSONDecodeError:

pass # Ignore on parse failure

def build_context(self) -> str:

"""Assemble and return full context"""

parts = []

Long-term memory (always included)

if any(self.long_term.values()):

lt = self.long_term

facts = "\n".join(f"- {f}" for f in lt["key_facts"][-10:])

prefs = ", ".join(lt["preferences"][-5:])

issues = "\n".join(f"- {i}" for i in lt["unresolved_issues"])

parts.append(

f"[User Profile]\nName: {lt['user_name']}\n"

f"Preferences: {prefs}\nKey Facts:\n{facts}\n"

f"Unresolved Issues:\n{issues}"

)

Mid-term memory (session summaries)

if self.mid_term:

parts.append(

"[Previous Conversation Summary]\n" + "\n---\n".join(self.mid_term)

)

Short-term memory (recent verbatim)

if self.short_term:

recent = "\n".join(

f"{m['role']}: {m['content']}" for m in self.short_term

)

parts.append(f"[Recent Conversation]\n{recent}")

return "\n\n".join(parts)

LangChain / LlamaIndex Production Implementation

LangChain LCEL-Based Memory Implementation

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

from langchain_core.runnables import RunnablePassthrough

from langchain_core.runnables.history import RunnableWithMessageHistory

from langchain_community.chat_message_histories import RedisChatMessageHistory

LCEL-based chain composition

prompt = ChatPromptTemplate.from_messages([

("system", "You are a friendly technical support expert. "

"Refer to previous conversation history for consistent responses."),

MessagesPlaceholder(variable_name="history"),

("human", "{input}")

])

chain = prompt | ChatOpenAI(model="gpt-4o", temperature=0.7)

Redis-based persistent session management

def get_session_history(session_id: str):

return RedisChatMessageHistory(

session_id=session_id,

url="redis://localhost:6379"

)

Chain with integrated message history

chain_with_history = RunnableWithMessageHistory(

chain,

get_session_history,

input_messages_key="input",

history_messages_key="history"

)

Per-session conversation

config = {"configurable": {"session_id": "user-123-session-456"}}

response = chain_with_history.invoke(

{"input": "My Kubernetes Pod is in CrashLoopBackOff state"},

config=config

)

print(response.content)

LlamaIndex ChatMemoryBuffer Implementation

from llama_index.core.memory import ChatMemoryBuffer

from llama_index.core.chat_engine import SimpleChatEngine

from llama_index.llms.openai import OpenAI

LlamaIndex memory buffer setup

memory = ChatMemoryBuffer.from_defaults(token_limit=4000)

llm = OpenAI(model="gpt-4o", temperature=0.7)

chat_engine = SimpleChatEngine.from_defaults(

llm=llm,

memory=memory,

system_prompt="You are a DevOps engineer chatbot specialist."

)

Conversation flow

response1 = chat_engine.chat("Our CI/CD pipeline has failed")

response2 = chat_engine.chat("Here is the error log: connection timeout")

response3 = chat_engine.chat("What was the solution for the issue I mentioned earlier?")

Check memory state

print(f"Messages in memory: {len(memory.get_all())}")

Vector DB-Based Persistent Memory

Long-Term Memory Architecture with Pinecone

from pinecone import Pinecone

from langchain_openai import OpenAIEmbeddings

from datetime import datetime

class PersistentConversationMemory:

"""Pinecone-based persistent conversation memory"""

def __init__(self, index_name: str = "conversation-memory"):

self.pc = Pinecone()

self.index = self.pc.Index(index_name)

self.embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

def store_exchange(self, user_id: str, session_id: str,

user_msg: str, ai_msg: str,

metadata: dict = None):

"""Store conversation exchange in vector DB"""

exchange_text = f"User: {user_msg}\nAssistant: {ai_msg}"

embedding = self.embeddings.embed_query(exchange_text)

record_metadata = {

"user_id": user_id,

"session_id": session_id,

"user_message": user_msg[:500],

"ai_message": ai_msg[:500],

"timestamp": datetime.now().isoformat(),

"type": "exchange"

}

if metadata:

record_metadata.update(metadata)

self.index.upsert(vectors=[{

"id": str(uuid.uuid4()),

"values": embedding,

"metadata": record_metadata

}])

def recall(self, user_id: str, query: str,

top_k: int = 5) -> list[dict]:

"""Retrieve past conversations relevant to current query"""

query_embedding = self.embeddings.embed_query(query)

results = self.index.query(

vector=query_embedding,

top_k=top_k,

filter={"user_id": {"$eq": user_id}},

include_metadata=True

)

return [

{

"user_message": match.metadata["user_message"],

"ai_message": match.metadata["ai_message"],

"timestamp": match.metadata["timestamp"],

"relevance": match.score

}

for match in results.matches

]

def build_memory_context(self, user_id: str, query: str) -> str:

"""Assemble retrieved past conversations into context string"""

memories = self.recall(user_id, query)

if not memories:

return ""

lines = ["[Relevant Past Conversations]"]

for m in memories:

lines.append(f"({m['timestamp'][:10]}) "

f"User: {m['user_message']}")

lines.append(f" AI: {m['ai_message']}")

lines.append("")

return "\n".join(lines)

Context Drift and Hallucination Mitigation

Problem Patterns and Detection

Two major problems emerge as conversations grow longer:

1. **Context Drift**: The conversation gradually diverges from the original intent

2. **Stale Context Hallucination**: Hallucinations caused by distorted information from the summarization process

class ContextDriftDetector:

"""Module for detecting context drift"""

def __init__(self, embeddings, drift_threshold: float = 0.3):

self.embeddings = embeddings

self.drift_threshold = drift_threshold

self.initial_topic_embedding = None

self.recent_embeddings: list[list[float]] = []

def set_initial_topic(self, first_message: str):

"""Set the initial topic of the conversation"""

self.initial_topic_embedding = self.embeddings.embed_query(

first_message

)

def check_drift(self, current_message: str) -> dict:

"""Measure how far current message has drifted from initial topic"""

current_embedding = self.embeddings.embed_query(current_message)

self.recent_embeddings.append(current_embedding)

if self.initial_topic_embedding is None:

self.set_initial_topic(current_message)

return {"drifted": False, "similarity": 1.0}

similarity = self._cosine_similarity(

self.initial_topic_embedding, current_embedding

)

return {

"drifted": similarity < self.drift_threshold,

"similarity": similarity,

"suggestion": (

"The conversation topic has significantly changed. "

"Consider starting a new session or resetting context."

if similarity < self.drift_threshold else None

)

}

@staticmethod

def _cosine_similarity(a: list[float], b: list[float]) -> float:

a, b = np.array(a), np.array(b)

return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

Summary Accuracy Validation

class SummaryValidator:

"""Validates accuracy of conversation summaries"""

def __init__(self, llm):

self.llm = llm

async def validate_summary(self, original_messages: list[dict],

summary: str) -> dict:

"""Verify summary fidelity against original conversation"""

original_text = "\n".join(

f"{m['role']}: {m['content']}" for m in original_messages

)

prompt = f"""Compare the original conversation with its summary and evaluate:

1. Key information preservation rate (0-100)

2. Presence of distorted information

3. Missing important information

Original conversation:

{original_text}

Summary:

{summary}

Respond in JSON format."""

response = await self.llm.ainvoke(prompt)

try:

result = json.loads(response.content)

return result

except json.JSONDecodeError:

return {"error": "Validation result parsing failed"}

Production Architecture Patterns

Full Architecture

docker-compose.yml - Production conversation memory stack

version: '3.8'

services:

chat-api:

image: chat-service:latest

ports:

- '8000:8000'

environment:

- REDIS_URL=redis://redis:6379

- PINECONE_API_KEY=pk-xxx

- OPENAI_API_KEY=sk-xxx

depends_on:

- redis

- postgres

redis:

image: redis:7-alpine

ports:

- '6379:6379'

volumes:

- redis-data:/data

command: redis-server --appendonly yes

postgres:

image: pgvector/pgvector:pg16

environment:

POSTGRES_DB: chatbot

POSTGRES_USER: admin

POSTGRES_PASSWORD: secure-password

volumes:

- pg-data:/var/lib/postgresql/data

ports:

- '5432:5432'

volumes:

redis-data:

pg-data:

FastAPI-Based Conversation Server

from fastapi import FastAPI, HTTPException

from pydantic import BaseModel

app = FastAPI(title="Multi-Turn Chat API")

Redis connection

redis_client = redis.from_url("redis://localhost:6379", decode_responses=True)

class ChatRequest(BaseModel):

user_id: str

session_id: str

message: str

class ChatResponse(BaseModel):

reply: str

session_id: str

turn_count: int

tokens_used: int

@app.post("/chat", response_model=ChatResponse)

async def chat_endpoint(request: ChatRequest):

"""Multi-turn conversation endpoint"""

session_key = f"session:{request.user_id}:{request.session_id}"

1. Load session history

history_raw = await redis_client.lrange(session_key, 0, -1)

history = [json.loads(h) for h in history_raw]

2. Memory management (Sliding Window + Summary)

manager = SessionMemoryManager(max_turns=20, summary_threshold=15)

context = await manager.prepare_context(history, request.message)

3. LLM call

llm = ChatOpenAI(model="gpt-4o", temperature=0.7)

messages = context + [{"role": "user", "content": request.message}]

response = await llm.ainvoke(messages)

4. Save history

await redis_client.rpush(

session_key,

json.dumps({"role": "user", "content": request.message})

)

await redis_client.rpush(

session_key,

json.dumps({"role": "assistant", "content": response.content})

)

5. Set TTL (24 hours)

await redis_client.expire(session_key, 86400)

turn_count = len(history) // 2 + 1

return ChatResponse(

reply=response.content,

session_id=request.session_id,

turn_count=turn_count,

tokens_used=response.response_metadata.get("token_usage", {}).get(

"total_tokens", 0

)

)

Operational Notes

Monitoring Checklist

1. **Token Usage Monitoring**: Track average/max token consumption per session and set alerts for unusual spikes.

2. **Summary Quality Verification**: Periodically sample summary results to check for information loss.

3. **Context Drift Tracking**: Monitor topic deviation rates as sessions grow longer.

4. **Latency Analysis**: Measure how memory retrieval/summarization stages impact overall response time.

5. **Cost Tracking**: Separately track costs for memory management LLM calls (summarization, etc.).

Common Failure Cases and Recovery Procedures

class MemoryRecoveryHandler:

"""Memory-related failure recovery handler"""

async def handle_token_overflow(self, session_id: str):

"""Emergency handling when token limit is exceeded"""

1. Emergency summarization keeping only last 5 turns

2. If summarization fails, keep only last 3 turns and discard rest

3. Notify user about context reduction

pass

async def handle_summary_failure(self, session_id: str):

"""When summary LLM call fails"""

1. Retry (max 3 times with backoff)

2. Fallback: switch to simple message-count-based window

3. Proceed with only recent conversation without summary

pass

async def handle_vector_db_failure(self, session_id: str):

"""When vector DB connection fails"""

1. Serve recent conversation from local cache

2. Fallback to Redis short-term memory

3. Continue basic conversation without vector search

pass

async def handle_context_drift(self, session_id: str, drift_score: float):

"""When context drift is detected"""

1. Notify user about topic change

2. Suggest starting new session

3. Reconstruct context based on current topic

pass

Performance Optimization Tips

Monitor Redis memory usage

redis-cli INFO memory | grep used_memory_human

Check per-session memory size

redis-cli DEBUG OBJECT "session:user-123:session-456"

Clean up expired sessions

redis-cli --scan --pattern "session:*" | while read key; do

ttl=$(redis-cli TTL "$key")

if [ "$ttl" -eq "-1" ]; then

echo "No TTL set for $key"

fi

done

Memory Pattern Selection Guide

Recommendations by Use Case

| Use Case | Recommended Pattern | Rationale |

| ------------------------ | ----------------------- | ------------------------------------------ |

| Simple FAQ bot | Buffer Window (k=5) | Short conversations, minimal cost |

| Customer support chatbot | Summary Buffer + Entity | Long conversations, customer info tracking |

| Technical support agent | Hierarchical + Vector | Need to search past issues |

| Personal assistant bot | Full Hierarchical | Long-term memory, personalization |

| Code review bot | Token Window | Maximize code context |

Decision Flowchart

Conversation length?

|

+-- 5 turns or less --> Buffer Memory

|

+-- 5~30 turns --> Personalization needed?

| |

| +-- No --> Sliding Window

| +-- Yes --> Summary Buffer + Entity

|

+-- 30+ turns --> Need to search past conversations?

|

+-- No --> Hierarchical Memory

+-- Yes --> Hierarchical + Vector Store

Conclusion

Multi-turn conversation management is the core factor that determines the quality of LLM chatbots. Simply stuffing all conversations into the context is not sustainable in terms of cost and performance. Understanding various memory patterns like Buffer, Summary, and Vector Store, and selecting the right strategy for your use case is essential.

Sliding Window is the most practical baseline strategy, and combining it with conversation summarization and vector search maintains high quality even in long conversations. Hierarchical memory architecture mimics human memory structure by separately managing short-term, mid-term, and long-term memory, optimizing the balance between token efficiency and information preservation.

In production environments, session management with Redis, persistent memory with vector DBs like Pinecone/Chroma, and failure recovery strategies are essential. Continuously monitor token usage and summary quality, and have mitigation plans ready for context drift.

References

- [LangChain Conversational Memory - Pinecone](https://www.pinecone.io/learn/series/langchain/langchain-conversational-memory/)

- [Context Window Management - Redis Blog](https://redis.io/blog/context-window-management-llm-apps-developer-guide/)

- [Context Window Management Strategies for Long-Context AI Agents - Maxim AI](https://www.getmaxim.ai/articles/context-window-management-strategies-for-long-context-ai-agents-and-chatbots/)

- [AI Agent Memory Architecture - IBM](https://www.ibm.com/think/topics/ai-agent-memory)

- [LLM Chat History Summarization Guide - Mem0](https://mem0.ai/blog/llm-chat-history-summarization-guide-2025)

- [Top Techniques to Manage Context Length in LLMs - Agenta](https://agenta.ai/blog/top-6-techniques-to-manage-context-length-in-llms)

- [LangChain Memory Tutorial - Aurelio AI](https://www.aurelio.ai/learn/langchain-conversational-memory)

현재 단락 (1/682)

The most fundamental challenge in LLM-based chatbots is **effectively managing context in multi-turn...

작성 글자: 0원문 글자: 29,146작성 단락: 0/682