Split View: 멀티턴 대화 관리와 컨텍스트 최적화: LLM 챗봇의 Memory 패턴·대화 요약·Sliding Window 전략
멀티턴 대화 관리와 컨텍스트 최적화: LLM 챗봇의 Memory 패턴·대화 요약·Sliding Window 전략
- 들어가며
- 컨텍스트 윈도우의 한계와 비용 분석
- 메모리 패턴 비교 분석
- Sliding Window 전략 심화
- 대화 요약 기법
- LangChain / LlamaIndex 실전 구현
- 벡터 DB 기반 영속 메모리
- 컨텍스트 드리프트와 Hallucination 대응
- 프로덕션 아키텍처 패턴
- 운영 시 주의사항
- 메모리 패턴 선택 가이드
- 마치며
- 참고자료

들어가며
LLM 기반 챗봇에서 가장 근본적인 도전 과제는 멀티턴 대화에서 컨텍스트를 효과적으로 관리하는 것이다. LLM은 본질적으로 무상태(Stateless)이므로, 매번 API를 호출할 때마다 전체 대화 이력을 함께 전송해야 한다. 그러나 컨텍스트 윈도우는 유한하고, 토큰 비용은 대화 길이에 비례하여 증가한다.
GPT-4o의 128K 토큰, Claude의 200K 토큰이라는 대형 컨텍스트 윈도우가 있지만, 실무에서는 수백 턴의 고객 상담 대화나 장시간의 기술 지원 세션에서 이 한계에 쉽게 도달한다. 더구나 "Lost in the Middle" 현상으로 인해 긴 컨텍스트의 중간 부분은 모델이 제대로 활용하지 못하는 문제도 있다.
이 글에서는 LLM 챗봇의 다양한 메모리 패턴(Buffer, Summary, Vector Store), Sliding Window 전략, 대화 요약 기법, 토큰 비용 최적화, 그리고 프로덕션 환경에서의 아키텍처 패턴을 실전 코드와 함께 다룬다.
컨텍스트 윈도우의 한계와 비용 분석
주요 LLM 컨텍스트 윈도우 비교
| 모델 | 컨텍스트 윈도우 | 입력 비용 (1M 토큰) | 출력 비용 (1M 토큰) | 비고 |
|---|---|---|---|---|
| GPT-4o | 128K | 2.50 USD | 10.00 USD | 범용 |
| GPT-4o-mini | 128K | 0.15 USD | 0.60 USD | 경량 |
| Claude 3.5 Sonnet | 200K | 3.00 USD | 15.00 USD | 긴 컨텍스트 |
| Gemini 1.5 Pro | 2M | 1.25 USD | 5.00 USD | 최대 윈도우 |
| Llama 3.1 405B | 128K | 자체 호스팅 | 자체 호스팅 | 오픈소스 |
토큰 예산 설계
대화당 토큰 예산을 설계할 때는 시스템 프롬프트, 대화 이력, 응답 공간을 분리하여 관리해야 한다.
import tiktoken
class TokenBudgetManager:
"""토큰 예산을 관리하는 클래스"""
def __init__(self, model: str = "gpt-4o", max_context: int = 128000):
self.encoding = tiktoken.encoding_for_model(model)
self.max_context = max_context
# 예산 배분: 시스템 15%, 대화 이력 60%, 응답 25%
self.system_budget = int(max_context * 0.15)
self.history_budget = int(max_context * 0.60)
self.response_budget = int(max_context * 0.25)
def count_tokens(self, text: str) -> int:
"""텍스트의 토큰 수를 계산"""
return len(self.encoding.encode(text))
def count_message_tokens(self, messages: list[dict]) -> int:
"""메시지 리스트의 총 토큰 수를 계산"""
total = 0
for msg in messages:
total += self.count_tokens(msg["content"])
total += 4 # 메시지 메타데이터 오버헤드
total += 2 # 시작/종료 토큰
return total
def get_available_history_tokens(self, system_tokens: int) -> int:
"""대화 이력에 사용 가능한 토큰 수를 반환"""
used = system_tokens + self.response_budget
return self.max_context - used
def should_summarize(self, history_tokens: int) -> bool:
"""대화 이력이 예산의 80%를 초과하면 요약을 추천"""
return history_tokens > self.history_budget * 0.8
# 사용 예시
budget = TokenBudgetManager(model="gpt-4o")
system_prompt = "당신은 고객 상담 전문 AI입니다..."
system_tokens = budget.count_tokens(system_prompt)
print(f"시스템 프롬프트: {system_tokens} 토큰")
print(f"대화 이력 예산: {budget.history_budget} 토큰")
print(f"응답 예산: {budget.response_budget} 토큰")
비용 증가 시뮬레이션
import matplotlib.pyplot as plt
import numpy as np
def calculate_cost_per_turn(turns: int, avg_tokens_per_turn: int = 200,
input_cost_per_1m: float = 2.50) -> float:
"""턴 수에 따른 누적 입력 비용 계산"""
# 매 턴마다 전체 이력을 전송한다고 가정
total_tokens = 0
cumulative_cost = 0
for t in range(1, turns + 1):
total_tokens = t * avg_tokens_per_turn # 현재 턴의 입력 토큰
turn_cost = (total_tokens / 1_000_000) * input_cost_per_1m
cumulative_cost += turn_cost
return cumulative_cost
# 메모리 전략별 비용 비교
turns = np.arange(1, 101)
cost_no_memory = [calculate_cost_per_turn(t) for t in turns]
# Sliding Window (최근 20턴만 유지)
cost_sliding = [calculate_cost_per_turn(min(t, 20)) for t in turns]
# Summary Memory (요약으로 1/5 압축)
cost_summary = [calculate_cost_per_turn(t, avg_tokens_per_turn=40) for t in turns]
print(f"100턴 대화 비용 (메모리 없음): ${cost_no_memory[-1]:.4f}")
print(f"100턴 대화 비용 (Sliding Window): ${cost_sliding[-1]:.4f}")
print(f"100턴 대화 비용 (Summary): ${cost_summary[-1]:.4f}")
메모리 패턴 비교 분석
패턴별 특성 비교
| 메모리 패턴 | 토큰 사용량 | 정보 보존 | 지연 시간 | 구현 복잡도 | 적합한 케이스 |
|---|---|---|---|---|---|
| Buffer Memory | O(n) 선형 증가 | 100% | 낮음 | 낮음 | 짧은 대화 |
| Window Memory | O(k) 고정 | 최근 k턴 | 낮음 | 낮음 | 일반 챗봇 |
| Summary Memory | O(1) 고정 | 요약본 | 중간 | 중간 | 긴 대화 |
| Summary Buffer | O(k+1) | 요약 + 최근 | 중간 | 중간 | 균형형 |
| Vector Store | O(k) 검색 | 의미 기반 | 높음 | 높음 | 지식 집약 |
| Entity Memory | O(e) 엔티티 수 | 엔티티별 | 중간 | 높음 | 개인화 |
1. Buffer Memory - 전체 이력 저장
가장 단순한 패턴으로, 모든 대화 이력을 그대로 유지한다.
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
# Buffer Memory: 모든 대화를 그대로 저장
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
memory = ConversationBufferMemory(return_messages=True)
chain = ConversationChain(llm=llm, memory=memory, verbose=True)
# 대화 진행
response1 = chain.predict(input="안녕하세요, 서버 장애가 발생했어요")
response2 = chain.predict(input="로그를 확인해보니 OOM 에러가 있어요")
response3 = chain.predict(input="현재 메모리 사용량은 어떻게 확인하나요?")
# 메모리에 저장된 전체 이력 확인
for msg in memory.chat_memory.messages:
role = "User" if msg.type == "human" else "AI"
print(f"[{role}] {msg.content[:80]}...")
한계: 대화가 길어질수록 토큰 사용량이 선형으로 증가하여 비용과 지연 시간이 급증한다.
2. Sliding Window Memory - 최근 N턴만 유지
고정 크기의 윈도우를 유지하면서 오래된 대화를 제거한다.
from langchain.memory import ConversationBufferWindowMemory
# 최근 10턴(5쌍)만 유지하는 Sliding Window
window_memory = ConversationBufferWindowMemory(
k=10, # 최근 10개 메시지 유지
return_messages=True
)
chain = ConversationChain(llm=llm, memory=window_memory, verbose=True)
# 토큰 기반 Window 구현 (커스텀)
class TokenWindowMemory:
"""토큰 수 기반으로 대화 이력을 관리하는 메모리"""
def __init__(self, max_tokens: int = 4000, model: str = "gpt-4o"):
self.max_tokens = max_tokens
self.encoding = tiktoken.encoding_for_model(model)
self.messages: list[dict] = []
def add_message(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
self._trim()
def _trim(self):
"""토큰 한도를 초과하면 가장 오래된 메시지부터 제거"""
while self._total_tokens() > self.max_tokens and len(self.messages) > 2:
# 첫 번째 시스템 메시지는 보존
self.messages.pop(0 if self.messages[0]["role"] != "system" else 1)
def _total_tokens(self) -> int:
return sum(
len(self.encoding.encode(m["content"])) + 4
for m in self.messages
)
def get_messages(self) -> list[dict]:
return self.messages.copy()
# 사용 예시
token_window = TokenWindowMemory(max_tokens=4000)
token_window.add_message("system", "당신은 기술 지원 전문가입니다.")
token_window.add_message("user", "Docker 컨테이너가 계속 재시작됩니다.")
token_window.add_message("assistant", "OOMKilled 상태인지 확인해보겠습니다...")
print(f"현재 토큰 사용량: {token_window._total_tokens()}")
3. Summary Memory - 대화 요약을 통한 압축
LLM을 사용하여 이전 대화를 요약하고, 요약문을 컨텍스트로 활용한다.
from langchain.memory import ConversationSummaryMemory
# Summary Memory: LLM으로 대화를 자동 요약
summary_memory = ConversationSummaryMemory(
llm=ChatOpenAI(model="gpt-4o-mini", temperature=0), # 요약용 경량 모델
return_messages=True
)
# Summary Buffer Memory: 요약 + 최근 대화 결합
from langchain.memory import ConversationSummaryBufferMemory
summary_buffer = ConversationSummaryBufferMemory(
llm=ChatOpenAI(model="gpt-4o-mini", temperature=0),
max_token_limit=2000, # 이 한도 초과 시 오래된 메시지를 요약
return_messages=True
)
# 커스텀 Progressive Summarization 구현
class ProgressiveSummarizer:
"""점진적 요약: 대화가 쌓일수록 단계적으로 요약을 수행"""
def __init__(self, llm, summarize_threshold: int = 10):
self.llm = llm
self.summarize_threshold = summarize_threshold
self.summary = ""
self.recent_messages: list[dict] = []
self.turn_count = 0
async def add_exchange(self, user_msg: str, ai_msg: str):
self.recent_messages.append({"role": "user", "content": user_msg})
self.recent_messages.append({"role": "assistant", "content": ai_msg})
self.turn_count += 1
if self.turn_count % self.summarize_threshold == 0:
await self._summarize()
async def _summarize(self):
"""최근 대화를 기존 요약에 통합"""
messages_text = "\n".join(
f"{m['role']}: {m['content']}" for m in self.recent_messages
)
prompt = f"""이전 요약:
{self.summary if self.summary else '(없음)'}
최근 대화:
{messages_text}
위의 이전 요약과 최근 대화를 통합하여 핵심 정보를 보존하는
간결한 요약을 작성하세요. 사용자의 이름, 선호도, 미해결 이슈를
반드시 포함하세요."""
response = await self.llm.ainvoke(prompt)
self.summary = response.content
self.recent_messages = self.recent_messages[-4:] # 최근 2턴만 유지
def get_context(self) -> str:
parts = []
if self.summary:
parts.append(f"[대화 요약]\n{self.summary}")
if self.recent_messages:
recent = "\n".join(
f"{m['role']}: {m['content']}" for m in self.recent_messages
)
parts.append(f"[최근 대화]\n{recent}")
return "\n\n".join(parts)
4. Vector Store Memory - 의미 기반 검색
대화 이력을 벡터 임베딩으로 저장하고, 현재 질문과 의미적으로 유사한 과거 대화를 검색한다.
from langchain.memory import VectorStoreRetrieverMemory
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
# Vector Store 기반 메모리 설정
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma(
collection_name="conversation_memory",
embedding_function=embeddings,
persist_directory="./chroma_memory"
)
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 5} # 가장 관련성 높은 5개 대화 검색
)
vector_memory = VectorStoreRetrieverMemory(
retriever=retriever,
memory_key="relevant_history",
input_key="input"
)
# 대화 저장
vector_memory.save_context(
{"input": "프로젝트 A의 배포 일정이 어떻게 되나요?"},
{"output": "프로젝트 A는 3월 15일 스테이징, 3월 20일 프로덕션 배포 예정입니다."}
)
vector_memory.save_context(
{"input": "데이터베이스 마이그레이션은 언제 하나요?"},
{"output": "DB 마이그레이션은 3월 18일 새벽 2시에 진행됩니다."}
)
# 관련 대화 검색
relevant = vector_memory.load_memory_variables(
{"input": "프로젝트 A 배포 전에 확인할 사항은?"}
)
print(relevant["relevant_history"])
Sliding Window 전략 심화
적응형 Sliding Window
고정 크기가 아닌, 대화의 중요도에 따라 동적으로 윈도우를 조절하는 전략이다.
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import hashlib
@dataclass
class ConversationTurn:
role: str
content: str
timestamp: datetime
importance: float = 0.5 # 0.0 ~ 1.0
tokens: int = 0
turn_id: str = ""
def __post_init__(self):
if not self.turn_id:
self.turn_id = hashlib.md5(
f"{self.timestamp}{self.content[:50]}".encode()
).hexdigest()[:8]
class AdaptiveSlidingWindow:
"""중요도 기반 적응형 슬라이딩 윈도우"""
def __init__(self, max_tokens: int = 8000, min_turns: int = 4):
self.max_tokens = max_tokens
self.min_turns = min_turns # 최소 유지 턴 수
self.turns: list[ConversationTurn] = []
self.archived: list[ConversationTurn] = []
def add_turn(self, turn: ConversationTurn):
self.turns.append(turn)
self._optimize()
def _calculate_importance(self, turn: ConversationTurn, index: int) -> float:
"""턴의 중요도를 다차원으로 계산"""
score = turn.importance
# 최근 턴일수록 높은 가중치
recency = index / max(len(self.turns) - 1, 1)
score += recency * 0.3
# 질문이 포함된 턴은 중요도 상승
if "?" in turn.content or "어떻게" in turn.content:
score += 0.2
# 에러/장애 관련 키워드
critical_keywords = ["에러", "장애", "오류", "실패", "긴급", "error", "fail"]
if any(kw in turn.content.lower() for kw in critical_keywords):
score += 0.3
return min(score, 1.0)
def _optimize(self):
"""토큰 한도 내에서 중요한 턴을 우선 유지"""
total_tokens = sum(t.tokens for t in self.turns)
if total_tokens <= self.max_tokens:
return
# 중요도 점수 계산
scored = [
(i, self._calculate_importance(t, i), t)
for i, t in enumerate(self.turns)
]
# 최근 min_turns는 반드시 유지
protected = self.turns[-self.min_turns:]
candidates = scored[:-self.min_turns]
# 중요도 낮은 순으로 정렬하여 제거
candidates.sort(key=lambda x: x[1])
while total_tokens > self.max_tokens and candidates:
_, _, turn = candidates.pop(0)
self.archived.append(turn)
self.turns.remove(turn)
total_tokens -= turn.tokens
def get_context(self) -> list[dict]:
return [
{"role": t.role, "content": t.content}
for t in self.turns
]
시간 기반 윈도우와 토큰 기반 윈도우 비교
class TimeBasedWindow:
"""시간 기반 슬라이딩 윈도우 - 최근 N분 이내의 대화만 유지"""
def __init__(self, window_minutes: int = 30):
self.window_minutes = window_minutes
self.messages: list[dict] = []
def add_message(self, role: str, content: str):
self.messages.append({
"role": role,
"content": content,
"timestamp": datetime.now()
})
self._cleanup()
def _cleanup(self):
cutoff = datetime.now() - timedelta(minutes=self.window_minutes)
self.messages = [
m for m in self.messages
if m["timestamp"] > cutoff
]
def get_messages(self) -> list[dict]:
return [
{"role": m["role"], "content": m["content"]}
for m in self.messages
]
class HybridWindow:
"""토큰 + 시간 하이브리드 윈도우"""
def __init__(self, max_tokens: int = 4000, max_minutes: int = 60):
self.max_tokens = max_tokens
self.max_minutes = max_minutes
self.token_window = TokenWindowMemory(max_tokens=max_tokens)
self.time_window = TimeBasedWindow(window_minutes=max_minutes)
def add_message(self, role: str, content: str):
self.token_window.add_message(role, content)
self.time_window.add_message(role, content)
def get_messages(self) -> list[dict]:
# 두 윈도우의 교집합 사용 (더 엄격한 필터링)
token_msgs = set(
m["content"] for m in self.token_window.get_messages()
)
time_msgs = self.time_window.get_messages()
return [m for m in time_msgs if m["content"] in token_msgs]
대화 요약 기법
요약 전략 비교
| 전략 | 요약 시점 | 토큰 절감률 | 정보 손실 | 추가 비용 |
|---|---|---|---|---|
| 매 턴 요약 | 매 교환 후 | 80-90% | 중간 | 높음 |
| 임계치 요약 | N턴마다 | 60-80% | 낮음 | 중간 |
| 계층적 요약 | 단계별 | 70-85% | 매우 낮음 | 중간 |
| 선택적 요약 | 중요도 기반 | 50-70% | 최소 | 낮음 |
계층적 요약 시스템 구현
from enum import Enum
from typing import Any
class MemoryTier(Enum):
SHORT_TERM = "short_term" # 최근 대화 원문
MID_TERM = "mid_term" # 세션 요약
LONG_TERM = "long_term" # 핵심 사실/선호도
class HierarchicalMemory:
"""3계층 메모리 아키텍처"""
def __init__(self, llm, short_term_limit: int = 10,
mid_term_limit: int = 5):
self.llm = llm
self.short_term_limit = short_term_limit
self.mid_term_limit = mid_term_limit
self.short_term: list[dict] = [] # 최근 원문 메시지
self.mid_term: list[str] = [] # 세션 요약들
self.long_term: dict[str, Any] = { # 영구 저장 정보
"user_name": None,
"preferences": [],
"key_facts": [],
"unresolved_issues": []
}
async def add_exchange(self, user_msg: str, ai_msg: str):
# 1. 단기 메모리에 추가
self.short_term.append({"role": "user", "content": user_msg})
self.short_term.append({"role": "assistant", "content": ai_msg})
# 2. 단기 메모리가 한도 초과 시 중기로 승격
if len(self.short_term) > self.short_term_limit * 2:
await self._promote_to_mid_term()
# 3. 중기 메모리가 한도 초과 시 장기로 추출
if len(self.mid_term) > self.mid_term_limit:
await self._extract_to_long_term()
async def _promote_to_mid_term(self):
"""단기 -> 중기: 오래된 메시지를 요약하여 승격"""
old_messages = self.short_term[:-6] # 최근 3턴 제외
self.short_term = self.short_term[-6:]
text = "\n".join(f"{m['role']}: {m['content']}" for m in old_messages)
prompt = f"다음 대화를 3-4문장으로 요약하세요:\n\n{text}"
response = await self.llm.ainvoke(prompt)
self.mid_term.append(response.content)
async def _extract_to_long_term(self):
"""중기 -> 장기: 핵심 사실을 추출하여 영구 저장"""
summaries = "\n\n".join(self.mid_term[:-2])
self.mid_term = self.mid_term[-2:]
prompt = f"""다음 대화 요약에서 핵심 정보를 JSON으로 추출하세요:
{summaries}
추출할 항목:
- user_preferences: 사용자 선호도
- key_facts: 핵심 사실
- unresolved_issues: 미해결 이슈"""
response = await self.llm.ainvoke(prompt)
# JSON 파싱 후 long_term에 병합 (실제 구현에서는 에러 처리 필요)
import json
try:
extracted = json.loads(response.content)
self.long_term["preferences"].extend(
extracted.get("user_preferences", [])
)
self.long_term["key_facts"].extend(
extracted.get("key_facts", [])
)
self.long_term["unresolved_issues"] = extracted.get(
"unresolved_issues", []
)
except json.JSONDecodeError:
pass # 파싱 실패 시 무시
def build_context(self) -> str:
"""전체 컨텍스트를 조합하여 반환"""
parts = []
# 장기 메모리 (항상 포함)
if any(self.long_term.values()):
lt = self.long_term
facts = "\n".join(f"- {f}" for f in lt["key_facts"][-10:])
prefs = ", ".join(lt["preferences"][-5:])
issues = "\n".join(f"- {i}" for i in lt["unresolved_issues"])
parts.append(
f"[사용자 프로필]\n이름: {lt['user_name']}\n"
f"선호도: {prefs}\n핵심 사실:\n{facts}\n"
f"미해결 이슈:\n{issues}"
)
# 중기 메모리 (세션 요약)
if self.mid_term:
parts.append(
"[이전 대화 요약]\n" + "\n---\n".join(self.mid_term)
)
# 단기 메모리 (최근 원문)
if self.short_term:
recent = "\n".join(
f"{m['role']}: {m['content']}" for m in self.short_term
)
parts.append(f"[최근 대화]\n{recent}")
return "\n\n".join(parts)
LangChain / LlamaIndex 실전 구현
LangChain LCEL 기반 메모리 구현
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnablePassthrough
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import RedisChatMessageHistory
# LCEL 기반 체인 구성
prompt = ChatPromptTemplate.from_messages([
("system", "당신은 친절한 기술 지원 전문가입니다. "
"이전 대화 내용을 참고하여 일관된 응답을 제공하세요."),
MessagesPlaceholder(variable_name="history"),
("human", "{input}")
])
chain = prompt | ChatOpenAI(model="gpt-4o", temperature=0.7)
# Redis 기반 영속 세션 관리
def get_session_history(session_id: str):
return RedisChatMessageHistory(
session_id=session_id,
url="redis://localhost:6379"
)
# 메시지 히스토리가 통합된 체인
chain_with_history = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="input",
history_messages_key="history"
)
# 세션별 대화
config = {"configurable": {"session_id": "user-123-session-456"}}
response = chain_with_history.invoke(
{"input": "Kubernetes Pod가 CrashLoopBackOff 상태입니다"},
config=config
)
print(response.content)
LlamaIndex ChatMemoryBuffer 구현
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.chat_engine import SimpleChatEngine
from llama_index.llms.openai import OpenAI
# LlamaIndex 메모리 버퍼 설정
memory = ChatMemoryBuffer.from_defaults(token_limit=4000)
llm = OpenAI(model="gpt-4o", temperature=0.7)
chat_engine = SimpleChatEngine.from_defaults(
llm=llm,
memory=memory,
system_prompt="당신은 DevOps 엔지니어 전문 챗봇입니다."
)
# 대화 진행
response1 = chat_engine.chat("CI/CD 파이프라인이 실패했어요")
response2 = chat_engine.chat("에러 로그를 보여드릴게요: connection timeout")
response3 = chat_engine.chat("이전에 말씀드린 문제 해결 방법은?")
# 메모리 상태 확인
print(f"메모리 내 메시지 수: {len(memory.get_all())}")
벡터 DB 기반 영속 메모리
Pinecone을 활용한 장기 메모리 아키텍처
from pinecone import Pinecone
from langchain_openai import OpenAIEmbeddings
from datetime import datetime
import json
import uuid
class PersistentConversationMemory:
"""Pinecone 기반 영속 대화 메모리"""
def __init__(self, index_name: str = "conversation-memory"):
self.pc = Pinecone()
self.index = self.pc.Index(index_name)
self.embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
def store_exchange(self, user_id: str, session_id: str,
user_msg: str, ai_msg: str,
metadata: dict = None):
"""대화 교환을 벡터 DB에 저장"""
exchange_text = f"User: {user_msg}\nAssistant: {ai_msg}"
embedding = self.embeddings.embed_query(exchange_text)
record_metadata = {
"user_id": user_id,
"session_id": session_id,
"user_message": user_msg[:500],
"ai_message": ai_msg[:500],
"timestamp": datetime.now().isoformat(),
"type": "exchange"
}
if metadata:
record_metadata.update(metadata)
self.index.upsert(vectors=[{
"id": str(uuid.uuid4()),
"values": embedding,
"metadata": record_metadata
}])
def recall(self, user_id: str, query: str,
top_k: int = 5) -> list[dict]:
"""현재 질문과 관련된 과거 대화를 검색"""
query_embedding = self.embeddings.embed_query(query)
results = self.index.query(
vector=query_embedding,
top_k=top_k,
filter={"user_id": {"$eq": user_id}},
include_metadata=True
)
return [
{
"user_message": match.metadata["user_message"],
"ai_message": match.metadata["ai_message"],
"timestamp": match.metadata["timestamp"],
"relevance": match.score
}
for match in results.matches
]
def build_memory_context(self, user_id: str, query: str) -> str:
"""검색된 과거 대화를 컨텍스트 문자열로 조합"""
memories = self.recall(user_id, query)
if not memories:
return ""
lines = ["[관련 과거 대화]"]
for m in memories:
lines.append(f"({m['timestamp'][:10]}) "
f"User: {m['user_message']}")
lines.append(f" AI: {m['ai_message']}")
lines.append("")
return "\n".join(lines)
컨텍스트 드리프트와 Hallucination 대응
문제 패턴과 탐지
대화가 길어지면 두 가지 주요 문제가 발생한다.
- 컨텍스트 드리프트: 초기 대화의 의도와 점차 멀어지는 현상
- 오래된 컨텍스트 기반 Hallucination: 요약 과정에서 왜곡된 정보로 인한 환각
class ContextDriftDetector:
"""컨텍스트 드리프트를 탐지하는 모듈"""
def __init__(self, embeddings, drift_threshold: float = 0.3):
self.embeddings = embeddings
self.drift_threshold = drift_threshold
self.initial_topic_embedding = None
self.recent_embeddings: list[list[float]] = []
def set_initial_topic(self, first_message: str):
"""대화의 초기 주제를 설정"""
self.initial_topic_embedding = self.embeddings.embed_query(
first_message
)
def check_drift(self, current_message: str) -> dict:
"""현재 메시지가 초기 주제에서 얼마나 벗어났는지 측정"""
current_embedding = self.embeddings.embed_query(current_message)
self.recent_embeddings.append(current_embedding)
if self.initial_topic_embedding is None:
self.set_initial_topic(current_message)
return {"drifted": False, "similarity": 1.0}
similarity = self._cosine_similarity(
self.initial_topic_embedding, current_embedding
)
return {
"drifted": similarity < self.drift_threshold,
"similarity": similarity,
"suggestion": (
"대화 주제가 크게 변경되었습니다. "
"새 세션을 시작하거나 컨텍스트를 재설정하는 것을 권장합니다."
if similarity < self.drift_threshold else None
)
}
@staticmethod
def _cosine_similarity(a: list[float], b: list[float]) -> float:
import numpy as np
a, b = np.array(a), np.array(b)
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
요약 정확도 검증
class SummaryValidator:
"""대화 요약의 정확도를 검증"""
def __init__(self, llm):
self.llm = llm
async def validate_summary(self, original_messages: list[dict],
summary: str) -> dict:
"""원본 대화 대비 요약의 충실도를 검증"""
original_text = "\n".join(
f"{m['role']}: {m['content']}" for m in original_messages
)
prompt = f"""원본 대화와 요약을 비교하여 다음을 평가하세요:
1. 핵심 정보 보존율 (0-100)
2. 왜곡된 정보 유무
3. 누락된 중요 정보
원본 대화:
{original_text}
요약:
{summary}
JSON 형식으로 응답하세요."""
response = await self.llm.ainvoke(prompt)
try:
result = json.loads(response.content)
return result
except json.JSONDecodeError:
return {"error": "검증 결과 파싱 실패"}
프로덕션 아키텍처 패턴
전체 아키텍처
# docker-compose.yml - 프로덕션 대화 메모리 스택
version: '3.8'
services:
chat-api:
image: chat-service:latest
ports:
- '8000:8000'
environment:
- REDIS_URL=redis://redis:6379
- PINECONE_API_KEY=pk-xxx
- OPENAI_API_KEY=sk-xxx
depends_on:
- redis
- postgres
redis:
image: redis:7-alpine
ports:
- '6379:6379'
volumes:
- redis-data:/data
command: redis-server --appendonly yes
postgres:
image: pgvector/pgvector:pg16
environment:
POSTGRES_DB: chatbot
POSTGRES_USER: admin
POSTGRES_PASSWORD: secure-password
volumes:
- pg-data:/var/lib/postgresql/data
ports:
- '5432:5432'
volumes:
redis-data:
pg-data:
FastAPI 기반 대화 서버
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import redis.asyncio as redis
import json
app = FastAPI(title="Multi-Turn Chat API")
# Redis 연결
redis_client = redis.from_url("redis://localhost:6379", decode_responses=True)
class ChatRequest(BaseModel):
user_id: str
session_id: str
message: str
class ChatResponse(BaseModel):
reply: str
session_id: str
turn_count: int
tokens_used: int
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
"""멀티턴 대화 엔드포인트"""
session_key = f"session:{request.user_id}:{request.session_id}"
# 1. 세션 이력 로드
history_raw = await redis_client.lrange(session_key, 0, -1)
history = [json.loads(h) for h in history_raw]
# 2. 메모리 관리 (Sliding Window + Summary)
manager = SessionMemoryManager(max_turns=20, summary_threshold=15)
context = await manager.prepare_context(history, request.message)
# 3. LLM 호출
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
messages = context + [{"role": "user", "content": request.message}]
response = await llm.ainvoke(messages)
# 4. 이력 저장
await redis_client.rpush(
session_key,
json.dumps({"role": "user", "content": request.message})
)
await redis_client.rpush(
session_key,
json.dumps({"role": "assistant", "content": response.content})
)
# 5. TTL 설정 (24시간)
await redis_client.expire(session_key, 86400)
turn_count = len(history) // 2 + 1
return ChatResponse(
reply=response.content,
session_id=request.session_id,
turn_count=turn_count,
tokens_used=response.response_metadata.get("token_usage", {}).get(
"total_tokens", 0
)
)
운영 시 주의사항
모니터링 체크리스트
- 토큰 사용량 모니터링: 세션당 평균/최대 토큰 소비를 추적하고, 이상 급증 시 알림을 설정한다.
- 요약 품질 검증: 주기적으로 요약 결과를 샘플링하여 정보 손실 여부를 확인한다.
- 컨텍스트 드리프트 추적: 세션이 길어질 때 주제 이탈 비율을 모니터링한다.
- 지연 시간 분석: 메모리 검색/요약 단계의 지연이 전체 응답 시간에 미치는 영향을 측정한다.
- 비용 추적: 메모리 관리용 LLM 호출(요약 등) 비용을 별도로 추적한다.
흔한 장애 케이스와 복구 절차
class MemoryRecoveryHandler:
"""메모리 관련 장애 복구 핸들러"""
async def handle_token_overflow(self, session_id: str):
"""토큰 한도 초과 시 응급 처리"""
# 1. 최근 5턴만 남기고 긴급 요약
# 2. 요약 실패 시 최근 3턴만 유지하고 나머지 버림
# 3. 사용자에게 컨텍스트 축소 알림
pass
async def handle_summary_failure(self, session_id: str):
"""요약 LLM 호출 실패 시"""
# 1. 재시도 (최대 3회, 백오프)
# 2. 폴백: 단순 메시지 수 기반 윈도우로 전환
# 3. 요약 없이 최근 대화만으로 진행
pass
async def handle_vector_db_failure(self, session_id: str):
"""벡터 DB 연결 실패 시"""
# 1. 로컬 캐시에서 최근 대화 제공
# 2. Redis 단기 메모리로 폴백
# 3. 벡터 검색 없이 기본 대화 진행
pass
async def handle_context_drift(self, session_id: str, drift_score: float):
"""컨텍스트 드리프트 감지 시"""
# 1. 사용자에게 주제 변경 알림
# 2. 새 세션 시작 제안
# 3. 현재 주제 기준으로 컨텍스트 재구성
pass
성능 최적화 팁
# Redis 메모리 사용량 모니터링
redis-cli INFO memory | grep used_memory_human
# 세션별 메모리 크기 확인
redis-cli DEBUG OBJECT "session:user-123:session-456"
# 만료된 세션 정리
redis-cli --scan --pattern "session:*" | while read key; do
ttl=$(redis-cli TTL "$key")
if [ "$ttl" -eq "-1" ]; then
echo "No TTL set for $key"
fi
done
메모리 패턴 선택 가이드
사용 케이스별 추천
| 사용 케이스 | 추천 패턴 | 이유 |
|---|---|---|
| 간단한 FAQ 봇 | Buffer Window (k=5) | 짧은 대화, 최소 비용 |
| 고객 상담 챗봇 | Summary Buffer + Entity | 긴 대화, 고객 정보 추적 |
| 기술 지원 에이전트 | Hierarchical + Vector | 과거 이슈 검색 필요 |
| 개인 비서 봇 | Full Hierarchical | 장기 기억, 개인화 |
| 코드 리뷰 봇 | Token Window | 코드 컨텍스트 최대화 |
의사결정 플로차트
대화 길이는?
|
+-- 5턴 이하 --> Buffer Memory
|
+-- 5~30턴 --> 개인화 필요?
| |
| +-- No --> Sliding Window
| +-- Yes --> Summary Buffer + Entity
|
+-- 30턴 이상 --> 과거 대화 검색 필요?
|
+-- No --> Hierarchical Memory
+-- Yes --> Hierarchical + Vector Store
마치며
멀티턴 대화 관리는 LLM 챗봇의 품질을 결정짓는 핵심 요소다. 단순히 모든 대화를 컨텍스트에 넣는 방식은 비용과 성능 면에서 지속 가능하지 않다. Buffer, Summary, Vector Store 등 다양한 메모리 패턴을 이해하고, 사용 케이스에 맞는 전략을 선택해야 한다.
Sliding Window는 가장 실용적인 기본 전략이며, 여기에 대화 요약과 벡터 검색을 결합하면 긴 대화에서도 높은 품질을 유지할 수 있다. 계층적 메모리 아키텍처는 인간의 기억 구조를 모방하여 단기/중기/장기 기억을 분리 관리함으로써, 토큰 효율과 정보 보존 사이의 균형을 최적화한다.
프로덕션 환경에서는 Redis를 활용한 세션 관리, Pinecone/Chroma 등 벡터 DB를 활용한 영속 메모리, 그리고 장애 복구 전략이 필수적이다. 토큰 사용량과 요약 품질을 지속적으로 모니터링하고, 컨텍스트 드리프트에 대한 대응 방안을 마련해두자.
참고자료
- LangChain Conversational Memory - Pinecone
- Context Window Management - Redis Blog
- Context Window Management Strategies for Long-Context AI Agents - Maxim AI
- AI Agent Memory Architecture - IBM
- LLM Chat History Summarization Guide - Mem0
- Top Techniques to Manage Context Length in LLMs - Agenta
- LangChain Memory Tutorial - Aurelio AI
Multi-Turn Conversation Management and Context Optimization: LLM Chatbot Memory Patterns, Conversation Summarization, and Sliding Window Strategies
- Introduction
- Context Window Limitations and Cost Analysis
- Memory Pattern Comparative Analysis
- Deep Dive into Sliding Window Strategies
- Conversation Summarization Techniques
- LangChain / LlamaIndex Production Implementation
- Vector DB-Based Persistent Memory
- Context Drift and Hallucination Mitigation
- Production Architecture Patterns
- Operational Notes
- Memory Pattern Selection Guide
- Conclusion
- References

Introduction
The most fundamental challenge in LLM-based chatbots is effectively managing context in multi-turn conversations. Since LLMs are inherently stateless, the entire conversation history must be sent with every API call. However, context windows are finite, and token costs increase proportionally with conversation length.
Even with large context windows like GPT-4o's 128K tokens or Claude's 200K tokens, production environments can easily reach these limits during hundreds of turns of customer support conversations or extended technical support sessions. Moreover, the "Lost in the Middle" phenomenon means models cannot effectively utilize information buried in the middle of long contexts.
This article covers various LLM chatbot memory patterns (Buffer, Summary, Vector Store), Sliding Window strategies, conversation summarization techniques, token cost optimization, and production architecture patterns with practical code examples.
Context Window Limitations and Cost Analysis
Major LLM Context Window Comparison
| Model | Context Window | Input Cost (1M tokens) | Output Cost (1M tokens) | Notes |
|---|---|---|---|---|
| GPT-4o | 128K | 2.50 USD | 10.00 USD | General purpose |
| GPT-4o-mini | 128K | 0.15 USD | 0.60 USD | Lightweight |
| Claude 3.5 Sonnet | 200K | 3.00 USD | 15.00 USD | Long context |
| Gemini 1.5 Pro | 2M | 1.25 USD | 5.00 USD | Largest window |
| Llama 3.1 405B | 128K | Self-hosted | Self-hosted | Open source |
Token Budget Design
When designing per-conversation token budgets, you need to separately manage system prompts, conversation history, and response space.
import tiktoken
class TokenBudgetManager:
"""Token budget management class"""
def __init__(self, model: str = "gpt-4o", max_context: int = 128000):
self.encoding = tiktoken.encoding_for_model(model)
self.max_context = max_context
# Budget allocation: System 15%, History 60%, Response 25%
self.system_budget = int(max_context * 0.15)
self.history_budget = int(max_context * 0.60)
self.response_budget = int(max_context * 0.25)
def count_tokens(self, text: str) -> int:
"""Count tokens in text"""
return len(self.encoding.encode(text))
def count_message_tokens(self, messages: list[dict]) -> int:
"""Count total tokens in message list"""
total = 0
for msg in messages:
total += self.count_tokens(msg["content"])
total += 4 # Message metadata overhead
total += 2 # Start/end tokens
return total
def get_available_history_tokens(self, system_tokens: int) -> int:
"""Return available tokens for conversation history"""
used = system_tokens + self.response_budget
return self.max_context - used
def should_summarize(self, history_tokens: int) -> bool:
"""Recommend summarization when history exceeds 80% of budget"""
return history_tokens > self.history_budget * 0.8
# Usage example
budget = TokenBudgetManager(model="gpt-4o")
system_prompt = "You are a customer support AI specialist..."
system_tokens = budget.count_tokens(system_prompt)
print(f"System prompt: {system_tokens} tokens")
print(f"History budget: {budget.history_budget} tokens")
print(f"Response budget: {budget.response_budget} tokens")
Cost Escalation Simulation
import matplotlib.pyplot as plt
import numpy as np
def calculate_cost_per_turn(turns: int, avg_tokens_per_turn: int = 200,
input_cost_per_1m: float = 2.50) -> float:
"""Calculate cumulative input cost by turn count"""
# Assumes entire history is sent with each turn
total_tokens = 0
cumulative_cost = 0
for t in range(1, turns + 1):
total_tokens = t * avg_tokens_per_turn # Input tokens for current turn
turn_cost = (total_tokens / 1_000_000) * input_cost_per_1m
cumulative_cost += turn_cost
return cumulative_cost
# Cost comparison by memory strategy
turns = np.arange(1, 101)
cost_no_memory = [calculate_cost_per_turn(t) for t in turns]
# Sliding Window (keep only last 20 turns)
cost_sliding = [calculate_cost_per_turn(min(t, 20)) for t in turns]
# Summary Memory (1/5 compression via summarization)
cost_summary = [calculate_cost_per_turn(t, avg_tokens_per_turn=40) for t in turns]
print(f"100-turn cost (no memory): ${cost_no_memory[-1]:.4f}")
print(f"100-turn cost (Sliding Window): ${cost_sliding[-1]:.4f}")
print(f"100-turn cost (Summary): ${cost_summary[-1]:.4f}")
Memory Pattern Comparative Analysis
Pattern Characteristics Comparison
| Memory Pattern | Token Usage | Info Retention | Latency | Implementation Complexity | Best For |
|---|---|---|---|---|---|
| Buffer Memory | O(n) linear | 100% | Low | Low | Short conversations |
| Window Memory | O(k) fixed | Last k turns | Low | Low | General chatbots |
| Summary Memory | O(1) fixed | Summary only | Medium | Medium | Long conversations |
| Summary Buffer | O(k+1) | Summary + recent | Medium | Medium | Balanced |
| Vector Store | O(k) search | Semantic-based | High | High | Knowledge-intensive |
| Entity Memory | O(e) entities | Per-entity | Medium | High | Personalization |
1. Buffer Memory - Full History Storage
The simplest pattern that retains the entire conversation history as-is.
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
# Buffer Memory: stores all conversations as-is
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
memory = ConversationBufferMemory(return_messages=True)
chain = ConversationChain(llm=llm, memory=memory, verbose=True)
# Conversation flow
response1 = chain.predict(input="Hello, we have a server outage")
response2 = chain.predict(input="Checking the logs, I see OOM errors")
response3 = chain.predict(input="How can I check current memory usage?")
# Review entire history stored in memory
for msg in memory.chat_memory.messages:
role = "User" if msg.type == "human" else "AI"
print(f"[{role}] {msg.content[:80]}...")
Limitation: As conversations grow longer, token usage increases linearly, causing costs and latency to spike.
2. Sliding Window Memory - Keep Only Recent N Turns
Maintains a fixed-size window while removing older conversations.
from langchain.memory import ConversationBufferWindowMemory
# Sliding Window keeping only the last 10 messages (5 pairs)
window_memory = ConversationBufferWindowMemory(
k=10, # Keep last 10 messages
return_messages=True
)
chain = ConversationChain(llm=llm, memory=window_memory, verbose=True)
# Token-based Window implementation (custom)
class TokenWindowMemory:
"""Memory that manages conversation history based on token count"""
def __init__(self, max_tokens: int = 4000, model: str = "gpt-4o"):
self.max_tokens = max_tokens
self.encoding = tiktoken.encoding_for_model(model)
self.messages: list[dict] = []
def add_message(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
self._trim()
def _trim(self):
"""Remove oldest messages when token limit is exceeded"""
while self._total_tokens() > self.max_tokens and len(self.messages) > 2:
# Preserve first system message
self.messages.pop(0 if self.messages[0]["role"] != "system" else 1)
def _total_tokens(self) -> int:
return sum(
len(self.encoding.encode(m["content"])) + 4
for m in self.messages
)
def get_messages(self) -> list[dict]:
return self.messages.copy()
# Usage example
token_window = TokenWindowMemory(max_tokens=4000)
token_window.add_message("system", "You are a technical support expert.")
token_window.add_message("user", "My Docker container keeps restarting.")
token_window.add_message("assistant", "Let me check if it is in OOMKilled state...")
print(f"Current token usage: {token_window._total_tokens()}")
3. Summary Memory - Compression Through Conversation Summarization
Uses an LLM to summarize previous conversations and leverages the summary as context.
from langchain.memory import ConversationSummaryMemory
# Summary Memory: automatically summarizes conversations with LLM
summary_memory = ConversationSummaryMemory(
llm=ChatOpenAI(model="gpt-4o-mini", temperature=0), # Lightweight model for summarization
return_messages=True
)
# Summary Buffer Memory: combines summary + recent conversations
from langchain.memory import ConversationSummaryBufferMemory
summary_buffer = ConversationSummaryBufferMemory(
llm=ChatOpenAI(model="gpt-4o-mini", temperature=0),
max_token_limit=2000, # Summarizes older messages when this limit is exceeded
return_messages=True
)
# Custom Progressive Summarization implementation
class ProgressiveSummarizer:
"""Progressive summarization: performs staged summarization as conversations accumulate"""
def __init__(self, llm, summarize_threshold: int = 10):
self.llm = llm
self.summarize_threshold = summarize_threshold
self.summary = ""
self.recent_messages: list[dict] = []
self.turn_count = 0
async def add_exchange(self, user_msg: str, ai_msg: str):
self.recent_messages.append({"role": "user", "content": user_msg})
self.recent_messages.append({"role": "assistant", "content": ai_msg})
self.turn_count += 1
if self.turn_count % self.summarize_threshold == 0:
await self._summarize()
async def _summarize(self):
"""Integrate recent conversation into existing summary"""
messages_text = "\n".join(
f"{m['role']}: {m['content']}" for m in self.recent_messages
)
prompt = f"""Previous summary:
{self.summary if self.summary else '(none)'}
Recent conversation:
{messages_text}
Create a concise summary that integrates the previous summary with
the recent conversation, preserving key information. Include user names,
preferences, and unresolved issues."""
response = await self.llm.ainvoke(prompt)
self.summary = response.content
self.recent_messages = self.recent_messages[-4:] # Keep only last 2 turns
def get_context(self) -> str:
parts = []
if self.summary:
parts.append(f"[Conversation Summary]\n{self.summary}")
if self.recent_messages:
recent = "\n".join(
f"{m['role']}: {m['content']}" for m in self.recent_messages
)
parts.append(f"[Recent Conversation]\n{recent}")
return "\n\n".join(parts)
4. Vector Store Memory - Semantic-Based Retrieval
Stores conversation history as vector embeddings and retrieves past conversations semantically similar to the current question.
from langchain.memory import VectorStoreRetrieverMemory
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
# Vector Store-based memory setup
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma(
collection_name="conversation_memory",
embedding_function=embeddings,
persist_directory="./chroma_memory"
)
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 5} # Retrieve top 5 most relevant conversations
)
vector_memory = VectorStoreRetrieverMemory(
retriever=retriever,
memory_key="relevant_history",
input_key="input"
)
# Store conversations
vector_memory.save_context(
{"input": "What is the deployment schedule for Project A?"},
{"output": "Project A is scheduled for staging on March 15 and production on March 20."}
)
vector_memory.save_context(
{"input": "When is the database migration?"},
{"output": "The DB migration is scheduled for 2 AM on March 18."}
)
# Retrieve relevant conversations
relevant = vector_memory.load_memory_variables(
{"input": "What should we check before Project A deployment?"}
)
print(relevant["relevant_history"])
Deep Dive into Sliding Window Strategies
Adaptive Sliding Window
A strategy that dynamically adjusts the window based on conversation importance rather than using a fixed size.
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import hashlib
@dataclass
class ConversationTurn:
role: str
content: str
timestamp: datetime
importance: float = 0.5 # 0.0 ~ 1.0
tokens: int = 0
turn_id: str = ""
def __post_init__(self):
if not self.turn_id:
self.turn_id = hashlib.md5(
f"{self.timestamp}{self.content[:50]}".encode()
).hexdigest()[:8]
class AdaptiveSlidingWindow:
"""Importance-based adaptive sliding window"""
def __init__(self, max_tokens: int = 8000, min_turns: int = 4):
self.max_tokens = max_tokens
self.min_turns = min_turns # Minimum turns to retain
self.turns: list[ConversationTurn] = []
self.archived: list[ConversationTurn] = []
def add_turn(self, turn: ConversationTurn):
self.turns.append(turn)
self._optimize()
def _calculate_importance(self, turn: ConversationTurn, index: int) -> float:
"""Calculate turn importance across multiple dimensions"""
score = turn.importance
# Higher weight for more recent turns
recency = index / max(len(self.turns) - 1, 1)
score += recency * 0.3
# Turns containing questions get higher importance
if "?" in turn.content or "how" in turn.content.lower():
score += 0.2
# Error/incident related keywords
critical_keywords = ["error", "failure", "outage", "critical", "urgent", "fail"]
if any(kw in turn.content.lower() for kw in critical_keywords):
score += 0.3
return min(score, 1.0)
def _optimize(self):
"""Prioritize retaining important turns within token limit"""
total_tokens = sum(t.tokens for t in self.turns)
if total_tokens <= self.max_tokens:
return
# Calculate importance scores
scored = [
(i, self._calculate_importance(t, i), t)
for i, t in enumerate(self.turns)
]
# Always retain recent min_turns
protected = self.turns[-self.min_turns:]
candidates = scored[:-self.min_turns]
# Sort by lowest importance and remove
candidates.sort(key=lambda x: x[1])
while total_tokens > self.max_tokens and candidates:
_, _, turn = candidates.pop(0)
self.archived.append(turn)
self.turns.remove(turn)
total_tokens -= turn.tokens
def get_context(self) -> list[dict]:
return [
{"role": t.role, "content": t.content}
for t in self.turns
]
Time-Based vs Token-Based Window Comparison
class TimeBasedWindow:
"""Time-based sliding window - keeps only conversations within last N minutes"""
def __init__(self, window_minutes: int = 30):
self.window_minutes = window_minutes
self.messages: list[dict] = []
def add_message(self, role: str, content: str):
self.messages.append({
"role": role,
"content": content,
"timestamp": datetime.now()
})
self._cleanup()
def _cleanup(self):
cutoff = datetime.now() - timedelta(minutes=self.window_minutes)
self.messages = [
m for m in self.messages
if m["timestamp"] > cutoff
]
def get_messages(self) -> list[dict]:
return [
{"role": m["role"], "content": m["content"]}
for m in self.messages
]
class HybridWindow:
"""Token + Time hybrid window"""
def __init__(self, max_tokens: int = 4000, max_minutes: int = 60):
self.max_tokens = max_tokens
self.max_minutes = max_minutes
self.token_window = TokenWindowMemory(max_tokens=max_tokens)
self.time_window = TimeBasedWindow(window_minutes=max_minutes)
def add_message(self, role: str, content: str):
self.token_window.add_message(role, content)
self.time_window.add_message(role, content)
def get_messages(self) -> list[dict]:
# Use intersection of both windows (stricter filtering)
token_msgs = set(
m["content"] for m in self.token_window.get_messages()
)
time_msgs = self.time_window.get_messages()
return [m for m in time_msgs if m["content"] in token_msgs]
Conversation Summarization Techniques
Summarization Strategy Comparison
| Strategy | Summarization Timing | Token Savings | Information Loss | Additional Cost |
|---|---|---|---|---|
| Per-turn summary | After every exchange | 80-90% | Medium | High |
| Threshold summary | Every N turns | 60-80% | Low | Medium |
| Hierarchical summary | Staged | 70-85% | Very low | Medium |
| Selective summary | Importance-based | 50-70% | Minimal | Low |
Hierarchical Summarization System Implementation
from enum import Enum
from typing import Any
class MemoryTier(Enum):
SHORT_TERM = "short_term" # Recent conversation verbatim
MID_TERM = "mid_term" # Session summaries
LONG_TERM = "long_term" # Core facts/preferences
class HierarchicalMemory:
"""3-tier memory architecture"""
def __init__(self, llm, short_term_limit: int = 10,
mid_term_limit: int = 5):
self.llm = llm
self.short_term_limit = short_term_limit
self.mid_term_limit = mid_term_limit
self.short_term: list[dict] = [] # Recent verbatim messages
self.mid_term: list[str] = [] # Session summaries
self.long_term: dict[str, Any] = { # Persistent stored info
"user_name": None,
"preferences": [],
"key_facts": [],
"unresolved_issues": []
}
async def add_exchange(self, user_msg: str, ai_msg: str):
# 1. Add to short-term memory
self.short_term.append({"role": "user", "content": user_msg})
self.short_term.append({"role": "assistant", "content": ai_msg})
# 2. Promote to mid-term when short-term exceeds limit
if len(self.short_term) > self.short_term_limit * 2:
await self._promote_to_mid_term()
# 3. Extract to long-term when mid-term exceeds limit
if len(self.mid_term) > self.mid_term_limit:
await self._extract_to_long_term()
async def _promote_to_mid_term(self):
"""Short -> Mid: summarize older messages and promote"""
old_messages = self.short_term[:-6] # Exclude last 3 turns
self.short_term = self.short_term[-6:]
text = "\n".join(f"{m['role']}: {m['content']}" for m in old_messages)
prompt = f"Summarize the following conversation in 3-4 sentences:\n\n{text}"
response = await self.llm.ainvoke(prompt)
self.mid_term.append(response.content)
async def _extract_to_long_term(self):
"""Mid -> Long: extract key facts for permanent storage"""
summaries = "\n\n".join(self.mid_term[:-2])
self.mid_term = self.mid_term[-2:]
prompt = f"""Extract key information from the following conversation summaries as JSON:
{summaries}
Items to extract:
- user_preferences: user preferences
- key_facts: key facts
- unresolved_issues: unresolved issues"""
response = await self.llm.ainvoke(prompt)
# Parse JSON and merge into long_term (error handling needed in production)
import json
try:
extracted = json.loads(response.content)
self.long_term["preferences"].extend(
extracted.get("user_preferences", [])
)
self.long_term["key_facts"].extend(
extracted.get("key_facts", [])
)
self.long_term["unresolved_issues"] = extracted.get(
"unresolved_issues", []
)
except json.JSONDecodeError:
pass # Ignore on parse failure
def build_context(self) -> str:
"""Assemble and return full context"""
parts = []
# Long-term memory (always included)
if any(self.long_term.values()):
lt = self.long_term
facts = "\n".join(f"- {f}" for f in lt["key_facts"][-10:])
prefs = ", ".join(lt["preferences"][-5:])
issues = "\n".join(f"- {i}" for i in lt["unresolved_issues"])
parts.append(
f"[User Profile]\nName: {lt['user_name']}\n"
f"Preferences: {prefs}\nKey Facts:\n{facts}\n"
f"Unresolved Issues:\n{issues}"
)
# Mid-term memory (session summaries)
if self.mid_term:
parts.append(
"[Previous Conversation Summary]\n" + "\n---\n".join(self.mid_term)
)
# Short-term memory (recent verbatim)
if self.short_term:
recent = "\n".join(
f"{m['role']}: {m['content']}" for m in self.short_term
)
parts.append(f"[Recent Conversation]\n{recent}")
return "\n\n".join(parts)
LangChain / LlamaIndex Production Implementation
LangChain LCEL-Based Memory Implementation
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnablePassthrough
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import RedisChatMessageHistory
# LCEL-based chain composition
prompt = ChatPromptTemplate.from_messages([
("system", "You are a friendly technical support expert. "
"Refer to previous conversation history for consistent responses."),
MessagesPlaceholder(variable_name="history"),
("human", "{input}")
])
chain = prompt | ChatOpenAI(model="gpt-4o", temperature=0.7)
# Redis-based persistent session management
def get_session_history(session_id: str):
return RedisChatMessageHistory(
session_id=session_id,
url="redis://localhost:6379"
)
# Chain with integrated message history
chain_with_history = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="input",
history_messages_key="history"
)
# Per-session conversation
config = {"configurable": {"session_id": "user-123-session-456"}}
response = chain_with_history.invoke(
{"input": "My Kubernetes Pod is in CrashLoopBackOff state"},
config=config
)
print(response.content)
LlamaIndex ChatMemoryBuffer Implementation
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.chat_engine import SimpleChatEngine
from llama_index.llms.openai import OpenAI
# LlamaIndex memory buffer setup
memory = ChatMemoryBuffer.from_defaults(token_limit=4000)
llm = OpenAI(model="gpt-4o", temperature=0.7)
chat_engine = SimpleChatEngine.from_defaults(
llm=llm,
memory=memory,
system_prompt="You are a DevOps engineer chatbot specialist."
)
# Conversation flow
response1 = chat_engine.chat("Our CI/CD pipeline has failed")
response2 = chat_engine.chat("Here is the error log: connection timeout")
response3 = chat_engine.chat("What was the solution for the issue I mentioned earlier?")
# Check memory state
print(f"Messages in memory: {len(memory.get_all())}")
Vector DB-Based Persistent Memory
Long-Term Memory Architecture with Pinecone
from pinecone import Pinecone
from langchain_openai import OpenAIEmbeddings
from datetime import datetime
import json
import uuid
class PersistentConversationMemory:
"""Pinecone-based persistent conversation memory"""
def __init__(self, index_name: str = "conversation-memory"):
self.pc = Pinecone()
self.index = self.pc.Index(index_name)
self.embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
def store_exchange(self, user_id: str, session_id: str,
user_msg: str, ai_msg: str,
metadata: dict = None):
"""Store conversation exchange in vector DB"""
exchange_text = f"User: {user_msg}\nAssistant: {ai_msg}"
embedding = self.embeddings.embed_query(exchange_text)
record_metadata = {
"user_id": user_id,
"session_id": session_id,
"user_message": user_msg[:500],
"ai_message": ai_msg[:500],
"timestamp": datetime.now().isoformat(),
"type": "exchange"
}
if metadata:
record_metadata.update(metadata)
self.index.upsert(vectors=[{
"id": str(uuid.uuid4()),
"values": embedding,
"metadata": record_metadata
}])
def recall(self, user_id: str, query: str,
top_k: int = 5) -> list[dict]:
"""Retrieve past conversations relevant to current query"""
query_embedding = self.embeddings.embed_query(query)
results = self.index.query(
vector=query_embedding,
top_k=top_k,
filter={"user_id": {"$eq": user_id}},
include_metadata=True
)
return [
{
"user_message": match.metadata["user_message"],
"ai_message": match.metadata["ai_message"],
"timestamp": match.metadata["timestamp"],
"relevance": match.score
}
for match in results.matches
]
def build_memory_context(self, user_id: str, query: str) -> str:
"""Assemble retrieved past conversations into context string"""
memories = self.recall(user_id, query)
if not memories:
return ""
lines = ["[Relevant Past Conversations]"]
for m in memories:
lines.append(f"({m['timestamp'][:10]}) "
f"User: {m['user_message']}")
lines.append(f" AI: {m['ai_message']}")
lines.append("")
return "\n".join(lines)
Context Drift and Hallucination Mitigation
Problem Patterns and Detection
Two major problems emerge as conversations grow longer:
- Context Drift: The conversation gradually diverges from the original intent
- Stale Context Hallucination: Hallucinations caused by distorted information from the summarization process
class ContextDriftDetector:
"""Module for detecting context drift"""
def __init__(self, embeddings, drift_threshold: float = 0.3):
self.embeddings = embeddings
self.drift_threshold = drift_threshold
self.initial_topic_embedding = None
self.recent_embeddings: list[list[float]] = []
def set_initial_topic(self, first_message: str):
"""Set the initial topic of the conversation"""
self.initial_topic_embedding = self.embeddings.embed_query(
first_message
)
def check_drift(self, current_message: str) -> dict:
"""Measure how far current message has drifted from initial topic"""
current_embedding = self.embeddings.embed_query(current_message)
self.recent_embeddings.append(current_embedding)
if self.initial_topic_embedding is None:
self.set_initial_topic(current_message)
return {"drifted": False, "similarity": 1.0}
similarity = self._cosine_similarity(
self.initial_topic_embedding, current_embedding
)
return {
"drifted": similarity < self.drift_threshold,
"similarity": similarity,
"suggestion": (
"The conversation topic has significantly changed. "
"Consider starting a new session or resetting context."
if similarity < self.drift_threshold else None
)
}
@staticmethod
def _cosine_similarity(a: list[float], b: list[float]) -> float:
import numpy as np
a, b = np.array(a), np.array(b)
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
Summary Accuracy Validation
class SummaryValidator:
"""Validates accuracy of conversation summaries"""
def __init__(self, llm):
self.llm = llm
async def validate_summary(self, original_messages: list[dict],
summary: str) -> dict:
"""Verify summary fidelity against original conversation"""
original_text = "\n".join(
f"{m['role']}: {m['content']}" for m in original_messages
)
prompt = f"""Compare the original conversation with its summary and evaluate:
1. Key information preservation rate (0-100)
2. Presence of distorted information
3. Missing important information
Original conversation:
{original_text}
Summary:
{summary}
Respond in JSON format."""
response = await self.llm.ainvoke(prompt)
try:
result = json.loads(response.content)
return result
except json.JSONDecodeError:
return {"error": "Validation result parsing failed"}
Production Architecture Patterns
Full Architecture
# docker-compose.yml - Production conversation memory stack
version: '3.8'
services:
chat-api:
image: chat-service:latest
ports:
- '8000:8000'
environment:
- REDIS_URL=redis://redis:6379
- PINECONE_API_KEY=pk-xxx
- OPENAI_API_KEY=sk-xxx
depends_on:
- redis
- postgres
redis:
image: redis:7-alpine
ports:
- '6379:6379'
volumes:
- redis-data:/data
command: redis-server --appendonly yes
postgres:
image: pgvector/pgvector:pg16
environment:
POSTGRES_DB: chatbot
POSTGRES_USER: admin
POSTGRES_PASSWORD: secure-password
volumes:
- pg-data:/var/lib/postgresql/data
ports:
- '5432:5432'
volumes:
redis-data:
pg-data:
FastAPI-Based Conversation Server
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import redis.asyncio as redis
import json
app = FastAPI(title="Multi-Turn Chat API")
# Redis connection
redis_client = redis.from_url("redis://localhost:6379", decode_responses=True)
class ChatRequest(BaseModel):
user_id: str
session_id: str
message: str
class ChatResponse(BaseModel):
reply: str
session_id: str
turn_count: int
tokens_used: int
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
"""Multi-turn conversation endpoint"""
session_key = f"session:{request.user_id}:{request.session_id}"
# 1. Load session history
history_raw = await redis_client.lrange(session_key, 0, -1)
history = [json.loads(h) for h in history_raw]
# 2. Memory management (Sliding Window + Summary)
manager = SessionMemoryManager(max_turns=20, summary_threshold=15)
context = await manager.prepare_context(history, request.message)
# 3. LLM call
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
messages = context + [{"role": "user", "content": request.message}]
response = await llm.ainvoke(messages)
# 4. Save history
await redis_client.rpush(
session_key,
json.dumps({"role": "user", "content": request.message})
)
await redis_client.rpush(
session_key,
json.dumps({"role": "assistant", "content": response.content})
)
# 5. Set TTL (24 hours)
await redis_client.expire(session_key, 86400)
turn_count = len(history) // 2 + 1
return ChatResponse(
reply=response.content,
session_id=request.session_id,
turn_count=turn_count,
tokens_used=response.response_metadata.get("token_usage", {}).get(
"total_tokens", 0
)
)
Operational Notes
Monitoring Checklist
- Token Usage Monitoring: Track average/max token consumption per session and set alerts for unusual spikes.
- Summary Quality Verification: Periodically sample summary results to check for information loss.
- Context Drift Tracking: Monitor topic deviation rates as sessions grow longer.
- Latency Analysis: Measure how memory retrieval/summarization stages impact overall response time.
- Cost Tracking: Separately track costs for memory management LLM calls (summarization, etc.).
Common Failure Cases and Recovery Procedures
class MemoryRecoveryHandler:
"""Memory-related failure recovery handler"""
async def handle_token_overflow(self, session_id: str):
"""Emergency handling when token limit is exceeded"""
# 1. Emergency summarization keeping only last 5 turns
# 2. If summarization fails, keep only last 3 turns and discard rest
# 3. Notify user about context reduction
pass
async def handle_summary_failure(self, session_id: str):
"""When summary LLM call fails"""
# 1. Retry (max 3 times with backoff)
# 2. Fallback: switch to simple message-count-based window
# 3. Proceed with only recent conversation without summary
pass
async def handle_vector_db_failure(self, session_id: str):
"""When vector DB connection fails"""
# 1. Serve recent conversation from local cache
# 2. Fallback to Redis short-term memory
# 3. Continue basic conversation without vector search
pass
async def handle_context_drift(self, session_id: str, drift_score: float):
"""When context drift is detected"""
# 1. Notify user about topic change
# 2. Suggest starting new session
# 3. Reconstruct context based on current topic
pass
Performance Optimization Tips
# Monitor Redis memory usage
redis-cli INFO memory | grep used_memory_human
# Check per-session memory size
redis-cli DEBUG OBJECT "session:user-123:session-456"
# Clean up expired sessions
redis-cli --scan --pattern "session:*" | while read key; do
ttl=$(redis-cli TTL "$key")
if [ "$ttl" -eq "-1" ]; then
echo "No TTL set for $key"
fi
done
Memory Pattern Selection Guide
Recommendations by Use Case
| Use Case | Recommended Pattern | Rationale |
|---|---|---|
| Simple FAQ bot | Buffer Window (k=5) | Short conversations, minimal cost |
| Customer support chatbot | Summary Buffer + Entity | Long conversations, customer info tracking |
| Technical support agent | Hierarchical + Vector | Need to search past issues |
| Personal assistant bot | Full Hierarchical | Long-term memory, personalization |
| Code review bot | Token Window | Maximize code context |
Decision Flowchart
Conversation length?
|
+-- 5 turns or less --> Buffer Memory
|
+-- 5~30 turns --> Personalization needed?
| |
| +-- No --> Sliding Window
| +-- Yes --> Summary Buffer + Entity
|
+-- 30+ turns --> Need to search past conversations?
|
+-- No --> Hierarchical Memory
+-- Yes --> Hierarchical + Vector Store
Conclusion
Multi-turn conversation management is the core factor that determines the quality of LLM chatbots. Simply stuffing all conversations into the context is not sustainable in terms of cost and performance. Understanding various memory patterns like Buffer, Summary, and Vector Store, and selecting the right strategy for your use case is essential.
Sliding Window is the most practical baseline strategy, and combining it with conversation summarization and vector search maintains high quality even in long conversations. Hierarchical memory architecture mimics human memory structure by separately managing short-term, mid-term, and long-term memory, optimizing the balance between token efficiency and information preservation.
In production environments, session management with Redis, persistent memory with vector DBs like Pinecone/Chroma, and failure recovery strategies are essential. Continuously monitor token usage and summary quality, and have mitigation plans ready for context drift.
References
- LangChain Conversational Memory - Pinecone
- Context Window Management - Redis Blog
- Context Window Management Strategies for Long-Context AI Agents - Maxim AI
- AI Agent Memory Architecture - IBM
- LLM Chat History Summarization Guide - Mem0
- Top Techniques to Manage Context Length in LLMs - Agenta
- LangChain Memory Tutorial - Aurelio AI