Split View: 챗봇 멀티턴 대화 메모리 관리 가이드: LangChain·LangGraph로 구현하는 컨텍스트 유지 전략
챗봇 멀티턴 대화 메모리 관리 가이드: LangChain·LangGraph로 구현하는 컨텍스트 유지 전략
- 들어가며
- 멀티턴 대화의 핵심 과제
- LangChain 메모리 타입
- 메모리 타입 비교
- LangGraph 상태 기반 에이전트
- 영속 메모리 구현
- 컨텍스트 압축 기법
- RAG 연동 메모리
- 세션 관리 패턴
- 트러블슈팅
- 운영 노트
- 프로덕션 체크리스트
- 참고자료

들어가며
챗봇을 만들 때 가장 기본적이면서도 어려운 문제가 멀티턴 대화에서 컨텍스트를 유지하는 것이다. 단순한 질의응답(Single-turn)은 각 요청을 독립적으로 처리하면 되지만, 실제 대화는 이전 내용을 기반으로 이어진다. "그거 얼마야?"라는 질문에 답하려면 "그거"가 무엇인지 이전 대화에서 파악해야 한다.
LLM의 컨텍스트 윈도우는 유한하다. GPT-4o의 128K 토큰이라 해도 수백 턴의 대화를 모두 담기 어렵고, 토큰 비용도 급격히 증가한다. 따라서 어떤 정보를 얼마나 유지할 것인가에 대한 메모리 관리 전략이 필수적이다.
이 글에서는 LangChain의 다양한 메모리 타입을 비교하고, LangGraph를 활용한 상태 기반 에이전트 구축, 데이터베이스를 활용한 영속 메모리, RAG 연동까지 프로덕션 수준의 멀티턴 대화 시스템 구현 방법을 다룬다.
멀티턴 대화의 핵심 과제
컨텍스트 윈도우 한계
LLM은 한 번의 API 호출에서 처리할 수 있는 토큰 수가 제한되어 있다. 대화가 길어질수록 초기 대화 내용이 잘리거나 비용이 급증한다.
# 문제 시나리오: 대화가 길어지면 초기 컨텍스트가 유실
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# 100턴 대화 시뮬레이션
messages = []
for i in range(100):
messages.append(HumanMessage(content=f"Turn {i}: 이것은 {i}번째 질문입니다."))
messages.append(AIMessage(content=f"Turn {i}: 네, {i}번째 답변입니다."))
# 모든 메시지를 전송하면 토큰 한도 초과 위험
# 해결책: 메모리 관리 전략 적용
print(f"총 메시지 수: {len(messages)}")
관련성 감소 문제
대화가 진행될수록 초기 대화 내용의 관련성은 감소한다. 모든 대화를 동일한 가중치로 보내는 것은 비효율적이다.
| 문제 | 설명 | 영향 |
|---|---|---|
| 토큰 한도 초과 | 긴 대화가 컨텍스트 윈도우를 넘음 | API 오류 또는 초기 대화 유실 |
| 비용 증가 | 불필요한 과거 대화도 매번 전송 | 토큰 비용 급증 |
| 관련성 희석 | 핵심 정보가 불필요한 대화에 묻힘 | 응답 품질 저하 |
| 지연 시간 증가 | 긴 프롬프트 처리에 시간 소요 | 사용자 경험 저하 |
| 할루시네이션 증가 | 과다한 컨텍스트에서 잘못된 추론 | 신뢰도 하락 |
LangChain 메모리 타입
ConversationBufferMemory
가장 단순한 메모리 타입으로, 모든 대화 내용을 그대로 저장한다.
from langchain_openai import ChatOpenAI
from langchain.chains import ConversationChain
from langchain.memory import ConversationBufferMemory
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
# Buffer Memory: 모든 대화 내용을 그대로 유지
memory = ConversationBufferMemory(return_messages=True)
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True,
)
# 대화 진행
response1 = conversation.predict(input="안녕하세요, 저는 김영주입니다. 파이썬 개발자예요.")
print(f"AI: {response1}")
response2 = conversation.predict(input="제가 가장 좋아하는 프레임워크는 FastAPI입니다.")
print(f"AI: {response2}")
response3 = conversation.predict(input="제 이름이 뭐라고 했죠?")
print(f"AI: {response3}")
# AI는 이전 대화를 기억하고 "김영주"라고 답변
# 메모리 내용 확인
print("\n=== Memory Contents ===")
for msg in memory.chat_memory.messages:
role = "Human" if hasattr(msg, "content") and isinstance(msg, HumanMessage) else "AI"
print(f" {type(msg).__name__}: {msg.content[:80]}...")
ConversationBufferWindowMemory
최근 N개의 대화만 유지하는 슬라이딩 윈도우 방식이다.
from langchain.memory import ConversationBufferWindowMemory
# 최근 5턴만 유지
memory = ConversationBufferWindowMemory(
k=5,
return_messages=True,
)
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True,
)
# 10턴 대화 진행
for i in range(10):
response = conversation.predict(input=f"이것은 {i+1}번째 메시지입니다.")
print(f"Turn {i+1}: {response[:50]}...")
# k=5이므로 6번째 메시지부터는 1번째 메시지가 삭제됨
print(f"\n메모리에 저장된 메시지 수: {len(memory.chat_memory.messages)}")
ConversationSummaryMemory
대화 내용을 LLM으로 요약하여 저장한다. 긴 대화에서도 핵심 정보를 유지할 수 있다.
from langchain.memory import ConversationSummaryMemory
# Summary Memory: LLM이 대화를 요약하여 저장
memory = ConversationSummaryMemory(
llm=llm,
return_messages=True,
)
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True,
)
# 여러 턴의 대화
conversation.predict(input="안녕하세요, 저는 서울에 사는 백엔드 개발자입니다.")
conversation.predict(input="주로 Python과 Go를 사용하고, Kubernetes 환경에서 일합니다.")
conversation.predict(input="최근에 LangChain으로 챗봇을 개발하고 있어요.")
conversation.predict(input="RAG 파이프라인 구축이 주요 업무입니다.")
# 요약 내용 확인
print("\n=== Summary ===")
print(memory.buffer)
# 전체 대화가 아닌 요약문이 저장됨
ConversationSummaryBufferMemory
요약과 버퍼를 결합한 하이브리드 방식이다. 최근 대화는 원본으로 유지하고, 오래된 대화는 요약한다.
from langchain.memory import ConversationSummaryBufferMemory
# Summary + Buffer 하이브리드: 최근 대화는 원본, 과거 대화는 요약
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=300, # 이 한도를 초과하면 오래된 대화를 요약
return_messages=True,
)
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True,
)
# 대화 진행
conversation.predict(input="프로젝트 A는 e-commerce 플랫폼입니다.")
conversation.predict(input="기술 스택은 Next.js, FastAPI, PostgreSQL입니다.")
conversation.predict(input="현재 결제 모듈을 구현 중이에요.")
conversation.predict(input="PG사 연동에서 webhook 처리가 까다롭네요.")
conversation.predict(input="테스트 환경 구축도 필요합니다.")
# 메모리 상태 확인
print("\n=== Moving Summary ===")
print(memory.moving_summary_buffer)
print(f"\n현재 버퍼 메시지 수: {len(memory.chat_memory.messages)}")
EntityMemory
대화에서 엔티티(인물, 장소, 개념 등)를 추출하여 관리한다.
from langchain.memory import ConversationEntityMemory
from langchain.memory.prompt import ENTITY_MEMORY_CONVERSATION_TEMPLATE
# Entity Memory: 대화에서 엔티티를 추출하고 업데이트
memory = ConversationEntityMemory(
llm=llm,
return_messages=True,
)
conversation = ConversationChain(
llm=llm,
memory=memory,
prompt=ENTITY_MEMORY_CONVERSATION_TEMPLATE,
verbose=True,
)
# 엔티티가 포함된 대화
conversation.predict(
input="김철수는 우리 팀의 시니어 개발자입니다. Python 전문가이고 5년 경력이에요."
)
conversation.predict(
input="이영희는 프로덕트 매니저인데, 김철수와 함께 추천 시스템 프로젝트를 진행 중입니다."
)
conversation.predict(
input="김철수가 최근에 MLflow를 도입해서 실험 관리를 시작했어요."
)
# 엔티티 정보 확인
print("\n=== Entity Store ===")
for entity, info in memory.entity_store.store.items():
print(f" {entity}: {info}")
메모리 타입 비교
| 메모리 타입 | 장점 | 단점 | 적합한 상황 |
|---|---|---|---|
| Buffer | 전체 대화 보존, 구현 간단 | 토큰 비용 증가, 윈도우 초과 위험 | 짧은 대화, 프로토타입 |
| BufferWindow | 비용 예측 가능, 최신 정보 유지 | 오래된 정보 유실 | 고객 상담, FAQ 봇 |
| Summary | 긴 대화에서도 핵심 유지 | 요약 시 정보 손실, 추가 LLM 호출 비용 | 장기 대화, 상담 이력 |
| SummaryBuffer | 최근 원본 + 과거 요약 결합 | 복잡한 설정, 요약 품질 의존 | 기술 지원, 프로젝트 대화 |
| Entity | 핵심 엔티티 추적 | 엔티티 추출 오류 가능, 추가 비용 | CRM 봇, 인물 관리 시스템 |
LangGraph 상태 기반 에이전트
LangGraph 기본 구조
LangGraph는 대화를 상태 그래프(State Graph)로 모델링한다. 각 노드는 처리 단계, 엣지는 상태 전이를 나타낸다.
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
# 상태 기반 대화 그래프 정의
def chatbot_node(state: MessagesState):
"""메인 챗봇 노드"""
system_message = SystemMessage(
content="당신은 친절한 AI 어시스턴트입니다. 이전 대화 맥락을 고려하여 답변하세요."
)
messages = [system_message] + state["messages"]
response = llm.invoke(messages)
return {"messages": [response]}
# 그래프 구성
graph_builder = StateGraph(MessagesState)
graph_builder.add_node("chatbot", chatbot_node)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)
# 메모리 체크포인터 추가 (대화 상태 영속화)
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)
# 세션별 대화 (thread_id로 세션 구분)
config = {"configurable": {"thread_id": "user-session-001"}}
# 첫 번째 메시지
response1 = graph.invoke(
{"messages": [HumanMessage(content="안녕하세요, 저는 데이터 엔지니어입니다.")]},
config=config,
)
print(f"AI: {response1['messages'][-1].content}")
# 두 번째 메시지 (이전 대화 자동 유지)
response2 = graph.invoke(
{"messages": [HumanMessage(content="제 직업이 뭐라고 했죠?")]},
config=config,
)
print(f"AI: {response2['messages'][-1].content}")
조건부 라우팅과 도구 사용
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
@tool
def search_knowledge_base(query: str) -> str:
"""지식 베이스에서 정보를 검색합니다."""
# 실제로는 벡터 DB 검색 등을 수행
knowledge = {
"환불 정책": "구매 후 14일 이내 전액 환불 가능합니다.",
"배송 기간": "주문 후 2-3 영업일 이내 배송됩니다.",
"회원 등급": "브론즈, 실버, 골드, 플래티넘 4단계입니다.",
}
for key, value in knowledge.items():
if key in query:
return value
return "관련 정보를 찾을 수 없습니다."
@tool
def get_order_status(order_id: str) -> str:
"""주문 상태를 조회합니다."""
# 실제로는 DB 조회
return f"주문 {order_id}: 배송 중 (예상 도착일: 2026-03-13)"
# 도구 바인딩
tools = [search_knowledge_base, get_order_status]
llm = ChatOpenAI(model="gpt-4o", temperature=0).bind_tools(tools)
def assistant_node(state: MessagesState):
"""어시스턴트 노드: LLM 호출 및 도구 사용 결정"""
system_msg = SystemMessage(
content="당신은 e-commerce 고객 지원 챗봇입니다. 필요시 도구를 사용하세요."
)
messages = [system_msg] + state["messages"]
response = llm.invoke(messages)
return {"messages": [response]}
# 그래프 구성
graph_builder = StateGraph(MessagesState)
graph_builder.add_node("assistant", assistant_node)
graph_builder.add_node("tools", ToolNode(tools))
graph_builder.add_edge(START, "assistant")
graph_builder.add_conditional_edges("assistant", tools_condition)
graph_builder.add_edge("tools", "assistant")
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)
# 대화 실행
config = {"configurable": {"thread_id": "customer-123"}}
response = graph.invoke(
{"messages": [HumanMessage(content="주문번호 ORD-2026-0311의 배송 상태를 알려주세요.")]},
config=config,
)
print(f"AI: {response['messages'][-1].content}")
영속 메모리 구현
Redis 기반 세션 관리
import redis
import json
from datetime import datetime, timedelta
from langchain_core.messages import HumanMessage, AIMessage, messages_from_dict, messages_to_dict
class RedisSessionMemory:
"""Redis 기반 대화 세션 메모리"""
def __init__(self, redis_url="redis://localhost:6379", ttl_hours=24):
self.redis = redis.from_url(redis_url)
self.ttl = timedelta(hours=ttl_hours)
def _key(self, session_id: str) -> str:
return f"chat:session:{session_id}"
def save_messages(self, session_id: str, messages: list):
"""메시지 목록을 Redis에 저장"""
key = self._key(session_id)
data = {
"messages": messages_to_dict(messages),
"updated_at": datetime.now().isoformat(),
}
self.redis.setex(key, self.ttl, json.dumps(data, ensure_ascii=False))
def load_messages(self, session_id: str) -> list:
"""Redis에서 메시지 목록 로드"""
key = self._key(session_id)
data = self.redis.get(key)
if data is None:
return []
parsed = json.loads(data)
return messages_from_dict(parsed["messages"])
def add_message(self, session_id: str, message):
"""단일 메시지 추가"""
messages = self.load_messages(session_id)
messages.append(message)
self.save_messages(session_id, messages)
def clear_session(self, session_id: str):
"""세션 삭제"""
self.redis.delete(self._key(session_id))
def get_session_info(self, session_id: str) -> dict:
"""세션 메타정보 조회"""
key = self._key(session_id)
data = self.redis.get(key)
if data is None:
return {"exists": False}
parsed = json.loads(data)
return {
"exists": True,
"message_count": len(parsed["messages"]),
"updated_at": parsed["updated_at"],
"ttl_seconds": self.redis.ttl(key),
}
# 사용 예시
session_memory = RedisSessionMemory(redis_url="redis://localhost:6379")
session_id = "user-abc-123"
session_memory.add_message(session_id, HumanMessage(content="안녕하세요"))
session_memory.add_message(session_id, AIMessage(content="안녕하세요! 무엇을 도와드릴까요?"))
messages = session_memory.load_messages(session_id)
print(f"저장된 메시지 수: {len(messages)}")
PostgreSQL 기반 장기 메모리
from sqlalchemy import create_engine, Column, String, Text, DateTime, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import json
Base = declarative_base()
class ConversationHistory(Base):
"""대화 이력 테이블"""
__tablename__ = "conversation_history"
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String(255), index=True, nullable=False)
user_id = Column(String(255), index=True, nullable=False)
role = Column(String(50), nullable=False) # human, ai, system
content = Column(Text, nullable=False)
metadata_json = Column(Text, default="{}")
created_at = Column(DateTime, default=datetime.utcnow)
class ConversationSummaryStore(Base):
"""대화 요약 테이블"""
__tablename__ = "conversation_summaries"
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String(255), unique=True, nullable=False)
user_id = Column(String(255), index=True, nullable=False)
summary = Column(Text, nullable=False)
entity_data = Column(Text, default="{}")
message_count = Column(Integer, default=0)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class PostgresMemoryManager:
"""PostgreSQL 기반 대화 메모리 관리자"""
def __init__(self, database_url: str):
self.engine = create_engine(database_url)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
def save_message(self, session_id: str, user_id: str, role: str, content: str):
"""메시지 저장"""
session = self.Session()
try:
msg = ConversationHistory(
session_id=session_id,
user_id=user_id,
role=role,
content=content,
)
session.add(msg)
session.commit()
finally:
session.close()
def get_recent_messages(self, session_id: str, limit: int = 20):
"""최근 메시지 조회"""
session = self.Session()
try:
messages = (
session.query(ConversationHistory)
.filter(ConversationHistory.session_id == session_id)
.order_by(ConversationHistory.created_at.desc())
.limit(limit)
.all()
)
return list(reversed(messages))
finally:
session.close()
def save_summary(self, session_id: str, user_id: str, summary: str,
entity_data: dict, message_count: int):
"""대화 요약 저장/업데이트"""
session = self.Session()
try:
existing = (
session.query(ConversationSummaryStore)
.filter(ConversationSummaryStore.session_id == session_id)
.first()
)
if existing:
existing.summary = summary
existing.entity_data = json.dumps(entity_data, ensure_ascii=False)
existing.message_count = message_count
else:
new_summary = ConversationSummaryStore(
session_id=session_id,
user_id=user_id,
summary=summary,
entity_data=json.dumps(entity_data, ensure_ascii=False),
message_count=message_count,
)
session.add(new_summary)
session.commit()
finally:
session.close()
# 사용 예시
db_url = "postgresql://chatbot:password@localhost:5432/chatbot_db"
memory_manager = PostgresMemoryManager(db_url)
컨텍스트 압축 기법
대화 요약 + 최근 메시지 결합
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
class HybridMemoryManager:
"""요약 + 최근 버퍼를 결합한 하이브리드 메모리"""
def __init__(self, llm, max_buffer_messages=10):
self.llm = llm
self.max_buffer_messages = max_buffer_messages
self.summary = ""
self.buffer = []
def add_exchange(self, human_msg: str, ai_msg: str):
"""대화 교환 추가"""
self.buffer.append(HumanMessage(content=human_msg))
self.buffer.append(AIMessage(content=ai_msg))
# 버퍼가 한도를 초과하면 오래된 메시지를 요약에 통합
if len(self.buffer) > self.max_buffer_messages * 2:
self._compress()
def _compress(self):
"""오래된 메시지를 요약으로 통합"""
# 앞쪽 절반을 요약 대상으로 선정
to_summarize = self.buffer[: self.max_buffer_messages]
self.buffer = self.buffer[self.max_buffer_messages:]
# 요약 생성
conversation_text = "\n".join(
f"{'Human' if isinstance(m, HumanMessage) else 'AI'}: {m.content}"
for m in to_summarize
)
summary_prompt = f"""다음은 이전 대화의 요약과 새로운 대화입니다. 통합된 요약을 작성하세요.
이전 요약: {self.summary if self.summary else '없음'}
새로운 대화:
{conversation_text}
핵심 정보와 맥락을 유지하면서 간결하게 요약하세요:"""
response = self.llm.invoke([HumanMessage(content=summary_prompt)])
self.summary = response.content
def get_context_messages(self) -> list:
"""현재 컨텍스트 메시지 반환"""
messages = []
if self.summary:
messages.append(SystemMessage(
content=f"이전 대화 요약: {self.summary}"
))
messages.extend(self.buffer)
return messages
def get_stats(self) -> dict:
"""메모리 통계 반환"""
return {
"summary_length": len(self.summary),
"buffer_messages": len(self.buffer),
"has_summary": bool(self.summary),
}
# 사용 예시
llm = ChatOpenAI(model="gpt-4o", temperature=0)
hybrid_memory = HybridMemoryManager(llm=llm, max_buffer_messages=6)
# 대화 시뮬레이션
exchanges = [
("프로젝트 일정이 어떻게 되나요?", "현재 3월 말 배포를 목표로 하고 있습니다."),
("백엔드 개발은 누가 담당하나요?", "김철수 시니어 개발자가 담당합니다."),
("프론트엔드는요?", "이영희 개발자가 React로 개발 중입니다."),
("테스트 계획은 있나요?", "QA팀에서 4월 첫째 주에 통합 테스트를 진행합니다."),
("배포 환경은 무엇인가요?", "AWS EKS 기반의 Kubernetes 환경입니다."),
("CI/CD 파이프라인은요?", "GitHub Actions와 ArgoCD를 사용합니다."),
("모니터링은 어떻게 하나요?", "Grafana와 Prometheus로 메트릭을 수집합니다."),
]
for human, ai in exchanges:
hybrid_memory.add_exchange(human, ai)
stats = hybrid_memory.get_stats()
print(f"메모리 상태: {stats}")
RAG 연동 메모리
대화 기반 RAG 파이프라인
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.messages import HumanMessage, SystemMessage
from langchain.text_splitter import RecursiveCharacterTextSplitter
class RAGEnhancedMemory:
"""RAG로 강화된 대화 메모리"""
def __init__(self, llm, embeddings, collection_name="chat_memory"):
self.llm = llm
self.embeddings = embeddings
self.vectorstore = Chroma(
collection_name=collection_name,
embedding_function=embeddings,
)
self.recent_messages = []
self.max_recent = 10
def add_exchange(self, session_id: str, human_msg: str, ai_msg: str):
"""대화를 벡터 스토어에 저장"""
# 최근 메시지 버퍼에 추가
self.recent_messages.append(("human", human_msg))
self.recent_messages.append(("ai", ai_msg))
# 벡터 스토어에 대화 임베딩 저장
exchange_text = f"사용자: {human_msg}\nAI: {ai_msg}"
self.vectorstore.add_texts(
texts=[exchange_text],
metadatas=[{"session_id": session_id, "type": "exchange"}],
)
# 최근 메시지 한도 유지
if len(self.recent_messages) > self.max_recent * 2:
self.recent_messages = self.recent_messages[-self.max_recent * 2:]
def retrieve_relevant_context(self, query: str, k: int = 3) -> list:
"""질의와 관련된 과거 대화 검색"""
results = self.vectorstore.similarity_search(query, k=k)
return [doc.page_content for doc in results]
def generate_response(self, session_id: str, user_input: str) -> str:
"""RAG 기반 응답 생성"""
# 관련 과거 대화 검색
relevant_context = self.retrieve_relevant_context(user_input)
# 컨텍스트 구성
context_parts = []
if relevant_context:
context_parts.append("관련된 이전 대화:")
for ctx in relevant_context:
context_parts.append(f" - {ctx}")
system_content = "당신은 이전 대화 맥락을 활용하여 답변하는 AI 어시스턴트입니다."
if context_parts:
system_content += "\n\n" + "\n".join(context_parts)
messages = [SystemMessage(content=system_content)]
# 최근 메시지 추가
for role, content in self.recent_messages[-6:]:
if role == "human":
messages.append(HumanMessage(content=content))
else:
from langchain_core.messages import AIMessage
messages.append(AIMessage(content=content))
messages.append(HumanMessage(content=user_input))
response = self.llm.invoke(messages)
# 대화 저장
self.add_exchange(session_id, user_input, response.content)
return response.content
# 사용 예시
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
embeddings = OpenAIEmbeddings()
rag_memory = RAGEnhancedMemory(llm=llm, embeddings=embeddings)
세션 관리 패턴
멀티테넌트 세션 관리
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime
import uuid
@dataclass
class ChatSession:
"""채팅 세션"""
session_id: str
user_id: str
created_at: datetime = field(default_factory=datetime.now)
last_active: datetime = field(default_factory=datetime.now)
metadata: dict = field(default_factory=dict)
is_active: bool = True
class SessionManager:
"""멀티테넌트 세션 관리자"""
def __init__(self, max_sessions_per_user: int = 5):
self.sessions: dict = {} # session_id -> ChatSession
self.user_sessions: dict = {} # user_id -> list of session_ids
self.max_sessions_per_user = max_sessions_per_user
def create_session(self, user_id: str, metadata: Optional[dict] = None) -> str:
"""새 세션 생성"""
# 사용자별 세션 수 제한 확인
user_session_ids = self.user_sessions.get(user_id, [])
active_sessions = [
sid for sid in user_session_ids
if sid in self.sessions and self.sessions[sid].is_active
]
if len(active_sessions) >= self.max_sessions_per_user:
# 가장 오래된 세션 비활성화
oldest = min(
active_sessions,
key=lambda sid: self.sessions[sid].last_active,
)
self.sessions[oldest].is_active = False
session_id = str(uuid.uuid4())
session = ChatSession(
session_id=session_id,
user_id=user_id,
metadata=metadata or {},
)
self.sessions[session_id] = session
if user_id not in self.user_sessions:
self.user_sessions[user_id] = []
self.user_sessions[user_id].append(session_id)
return session_id
def get_session(self, session_id: str) -> Optional[ChatSession]:
"""세션 조회"""
session = self.sessions.get(session_id)
if session and session.is_active:
session.last_active = datetime.now()
return session
return None
def list_user_sessions(self, user_id: str) -> list:
"""사용자 세션 목록 조회"""
session_ids = self.user_sessions.get(user_id, [])
return [
self.sessions[sid]
for sid in session_ids
if sid in self.sessions and self.sessions[sid].is_active
]
def close_session(self, session_id: str):
"""세션 종료"""
if session_id in self.sessions:
self.sessions[session_id].is_active = False
# 사용 예시
session_mgr = SessionManager(max_sessions_per_user=3)
session_id = session_mgr.create_session("user-001", {"channel": "web"})
print(f"Created session: {session_id}")
트러블슈팅
메모리 비대화(Memory Bloat) 해결
대화가 길어지면 메모리 사용량이 급격히 증가하여 성능이 저하될 수 있다.
class MemoryBloatGuard:
"""메모리 비대화 방지 가드"""
def __init__(self, max_messages=100, max_token_estimate=50000):
self.max_messages = max_messages
self.max_token_estimate = max_token_estimate
def check_and_trim(self, messages: list) -> tuple:
"""메모리 상태 확인 및 트리밍"""
total_chars = sum(len(m.content) for m in messages)
estimated_tokens = total_chars // 4 # 대략적인 토큰 추정
warnings = []
trimmed = messages
if len(messages) > self.max_messages:
warnings.append(
f"메시지 수 초과: {len(messages)} > {self.max_messages}"
)
# 시스템 메시지 보존, 나머지에서 앞부분 제거
system_msgs = [m for m in messages if isinstance(m, SystemMessage)]
non_system = [m for m in messages if not isinstance(m, SystemMessage)]
trimmed = system_msgs + non_system[-(self.max_messages - len(system_msgs)):]
if estimated_tokens > self.max_token_estimate:
warnings.append(
f"토큰 추정 초과: {estimated_tokens} > {self.max_token_estimate}"
)
return trimmed, warnings
# 사용
guard = MemoryBloatGuard(max_messages=50, max_token_estimate=30000)
컨텍스트 혼란(Context Confusion) 방지
긴 대화에서 AI가 이전 대화 내용을 혼동하여 잘못된 정보를 제공하는 문제를 해결한다.
class ContextClarityChecker:
"""컨텍스트 명확성 검사기"""
def __init__(self, llm):
self.llm = llm
def check_ambiguity(self, user_input: str, recent_messages: list) -> dict:
"""사용자 입력의 모호성 검사"""
# 대명사나 지시어가 포함된 경우 확인
ambiguous_patterns = ["그것", "이것", "저것", "그거", "이거", "거기", "여기"]
has_ambiguity = any(p in user_input for p in ambiguous_patterns)
if not has_ambiguity:
return {"is_ambiguous": False, "resolved_input": user_input}
# LLM으로 모호성 해소
context_text = "\n".join(
f"{type(m).__name__}: {m.content}" for m in recent_messages[-6:]
)
clarification_prompt = f"""이전 대화 맥락:
{context_text}
사용자의 새 입력: {user_input}
이 입력에서 대명사나 지시어가 무엇을 가리키는지 명확히 해주세요.
대명사를 실제 대상으로 치환한 문장을 반환하세요."""
response = self.llm.invoke([HumanMessage(content=clarification_prompt)])
return {
"is_ambiguous": True,
"original_input": user_input,
"resolved_input": response.content,
}
운영 노트
성능 최적화 팁
- 메모리 타입 선택: 대화 길이에 따라 적절한 메모리 타입을 선택한다. 10턴 이하면 Buffer, 그 이상이면 SummaryBuffer를 권장한다
- 벡터 스토어 인덱싱: RAG 메모리 사용 시 적절한 인덱스(HNSW, IVF)를 설정하여 검색 성능을 최적화한다
- Redis TTL 관리: 세션 만료 시간을 적절히 설정하여 메모리 누수를 방지한다
- 비동기 요약: 대화 요약은 응답 반환 후 비동기로 수행하여 지연 시간을 줄인다
보안 고려사항
- 세션 ID에 UUID v4를 사용하여 예측 불가능성을 보장한다
- 대화 내용을 암호화하여 저장한다 (AES-256)
- 사용자 간 세션 격리를 철저히 한다
- PII(개인 식별 정보) 마스킹 처리를 적용한다
- 대화 이력 보존 기간 정책을 수립한다
프로덕션 체크리스트
- [ ] 대화 길이에 적합한 메모리 타입 선택 완료
- [ ] 세션 관리 시스템 구현 (생성, 조회, 만료, 삭제)
- [ ] 영속 저장소(Redis/PostgreSQL) 연동 및 장애 대응
- [ ] 메모리 비대화 방지 로직 구현 (최대 메시지 수, 토큰 제한)
- [ ] 컨텍스트 압축(요약) 파이프라인 구축
- [ ] 멀티테넌트 세션 격리 테스트 통과
- [ ] PII 마스킹 및 대화 암호화 적용
- [ ] 세션 만료 정책 및 TTL 설정
- [ ] 대화 이력 백업 및 보존 정책 수립
- [ ] 모니터링 대시보드 구축 (세션 수, 메모리 사용량, 응답 시간)
- [ ] 부하 테스트 수행 (동시 세션 수, 대화 길이별 성능)
- [ ] 장애 복구 시나리오 테스트 (Redis 다운, DB 연결 실패)
참고자료
Chatbot Multi-Turn Memory Management Guide: Context Retention Strategies with LangChain and LangGraph
- Introduction
- Core Challenges of Multi-Turn Conversations
- LangChain Memory Types
- Memory Type Comparison
- LangGraph Stateful Agents
- Persistent Memory Implementation
- Context Compression Techniques
- RAG-Enhanced Memory
- Session Management Patterns
- Troubleshooting
- Operational Notes
- Production Checklist
- References

Introduction
When building chatbots, the most fundamental yet challenging problem is maintaining context across multi-turn conversations. Simple question-answering (single-turn) can handle each request independently, but real conversations build on previous content. To answer "How much is that?", the bot needs to understand what "that" refers to from the prior conversation.
LLM context windows are finite. Even GPT-4o's 128K tokens may not accommodate hundreds of conversation turns, and token costs increase rapidly. Therefore, a memory management strategy addressing what information to retain and how much is essential.
This guide compares LangChain's various memory types, demonstrates building stateful agents with LangGraph, implementing persistent memory with databases, and integrating RAG -- all aimed at production-grade multi-turn conversation systems.
Core Challenges of Multi-Turn Conversations
Context Window Limitations
LLMs have a finite number of tokens they can process in a single API call. As conversations grow longer, early conversation content gets truncated or costs skyrocket.
# Problem scenario: early context is lost as conversations grow
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# Simulating a 100-turn conversation
messages = []
for i in range(100):
messages.append(HumanMessage(content=f"Turn {i}: This is question number {i}."))
messages.append(AIMessage(content=f"Turn {i}: This is answer number {i}."))
# Sending all messages risks exceeding the token limit
# Solution: Apply memory management strategies
print(f"Total messages: {len(messages)}")
Relevance Decay
As conversations progress, the relevance of early messages decreases. Sending all conversation history with equal weight is inefficient.
| Problem | Description | Impact |
|---|---|---|
| Token Limit Exceeded | Long conversations exceed context window | API errors or early conversation loss |
| Cost Increase | Unnecessary past conversations sent every time | Token costs skyrocket |
| Relevance Dilution | Key information buried in irrelevant conversation | Response quality degrades |
| Latency Increase | Long prompts take time to process | User experience suffers |
| Hallucination Increase | Incorrect reasoning from excessive context | Reliability drops |
LangChain Memory Types
ConversationBufferMemory
The simplest memory type that stores all conversation content as-is.
from langchain_openai import ChatOpenAI
from langchain.chains import ConversationChain
from langchain.memory import ConversationBufferMemory
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
# Buffer Memory: retains all conversation content as-is
memory = ConversationBufferMemory(return_messages=True)
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True,
)
# Conduct conversation
response1 = conversation.predict(input="Hi, my name is John. I'm a Python developer.")
print(f"AI: {response1}")
response2 = conversation.predict(input="My favorite framework is FastAPI.")
print(f"AI: {response2}")
response3 = conversation.predict(input="What's my name again?")
print(f"AI: {response3}")
# AI remembers previous conversation and answers "John"
# Check memory contents
print("\n=== Memory Contents ===")
for msg in memory.chat_memory.messages:
print(f" {type(msg).__name__}: {msg.content[:80]}...")
ConversationBufferWindowMemory
A sliding window approach that retains only the most recent N conversations.
from langchain.memory import ConversationBufferWindowMemory
# Keep only the last 5 turns
memory = ConversationBufferWindowMemory(
k=5,
return_messages=True,
)
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True,
)
# Conduct 10 turns of conversation
for i in range(10):
response = conversation.predict(input=f"This is message number {i+1}.")
print(f"Turn {i+1}: {response[:50]}...")
# With k=5, message 1 is deleted when message 6 arrives
print(f"\nMessages stored in memory: {len(memory.chat_memory.messages)}")
ConversationSummaryMemory
Uses an LLM to summarize and store conversation content. Retains key information even across long conversations.
from langchain.memory import ConversationSummaryMemory
# Summary Memory: LLM summarizes and stores conversations
memory = ConversationSummaryMemory(
llm=llm,
return_messages=True,
)
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True,
)
# Multiple turns of conversation
conversation.predict(input="Hello, I'm a backend developer based in Seoul.")
conversation.predict(input="I mainly use Python and Go, working in Kubernetes environments.")
conversation.predict(input="Recently I've been developing chatbots with LangChain.")
conversation.predict(input="Building RAG pipelines is my main task.")
# Check summary content
print("\n=== Summary ===")
print(memory.buffer)
# A summary is stored instead of the full conversation
ConversationSummaryBufferMemory
A hybrid approach combining summary and buffer. Recent conversations are kept as originals while older ones are summarized.
from langchain.memory import ConversationSummaryBufferMemory
# Summary + Buffer hybrid: recent as original, past as summary
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=300, # Older conversations are summarized when this limit is exceeded
return_messages=True,
)
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True,
)
# Conduct conversation
conversation.predict(input="Project A is an e-commerce platform.")
conversation.predict(input="The tech stack is Next.js, FastAPI, PostgreSQL.")
conversation.predict(input="Currently implementing the payment module.")
conversation.predict(input="Webhook handling with the payment gateway is tricky.")
conversation.predict(input="We also need to set up a test environment.")
# Check memory state
print("\n=== Moving Summary ===")
print(memory.moving_summary_buffer)
print(f"\nCurrent buffer message count: {len(memory.chat_memory.messages)}")
EntityMemory
Extracts and manages entities (people, places, concepts) from conversations.
from langchain.memory import ConversationEntityMemory
from langchain.memory.prompt import ENTITY_MEMORY_CONVERSATION_TEMPLATE
# Entity Memory: extracts and updates entities from conversations
memory = ConversationEntityMemory(
llm=llm,
return_messages=True,
)
conversation = ConversationChain(
llm=llm,
memory=memory,
prompt=ENTITY_MEMORY_CONVERSATION_TEMPLATE,
verbose=True,
)
# Conversations containing entities
conversation.predict(
input="Alice is our team's senior developer. She's a Python expert with 5 years of experience."
)
conversation.predict(
input="Bob is a product manager working with Alice on a recommendation system project."
)
conversation.predict(
input="Alice recently introduced MLflow for experiment management."
)
# Check entity information
print("\n=== Entity Store ===")
for entity, info in memory.entity_store.store.items():
print(f" {entity}: {info}")
Memory Type Comparison
| Memory Type | Pros | Cons | Best For |
|---|---|---|---|
| Buffer | Full conversation preserved, simple implementation | Token costs increase, window overflow risk | Short conversations, prototypes |
| BufferWindow | Predictable costs, retains latest info | Older information lost | Customer support, FAQ bots |
| Summary | Retains key info across long conversations | Information loss during summarization, extra LLM call cost | Long conversations, support history |
| SummaryBuffer | Combines recent originals with past summaries | Complex setup, depends on summary quality | Tech support, project conversations |
| Entity | Tracks key entities | Entity extraction errors possible, extra cost | CRM bots, contact management systems |
LangGraph Stateful Agents
LangGraph Basic Structure
LangGraph models conversations as state graphs. Each node represents a processing step, and edges represent state transitions.
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
# Define a state-based conversation graph
def chatbot_node(state: MessagesState):
"""Main chatbot node"""
system_message = SystemMessage(
content="You are a friendly AI assistant. Respond considering the previous conversation context."
)
messages = [system_message] + state["messages"]
response = llm.invoke(messages)
return {"messages": [response]}
# Build graph
graph_builder = StateGraph(MessagesState)
graph_builder.add_node("chatbot", chatbot_node)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)
# Add memory checkpointer (persist conversation state)
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)
# Per-session conversation (sessions distinguished by thread_id)
config = {"configurable": {"thread_id": "user-session-001"}}
# First message
response1 = graph.invoke(
{"messages": [HumanMessage(content="Hello, I'm a data engineer.")]},
config=config,
)
print(f"AI: {response1['messages'][-1].content}")
# Second message (previous conversation automatically maintained)
response2 = graph.invoke(
{"messages": [HumanMessage(content="What's my profession again?")]},
config=config,
)
print(f"AI: {response2['messages'][-1].content}")
Conditional Routing and Tool Usage
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
@tool
def search_knowledge_base(query: str) -> str:
"""Search the knowledge base for information."""
# In practice, this would perform vector DB searches
knowledge = {
"refund policy": "Full refund within 14 days of purchase.",
"shipping time": "Delivery within 2-3 business days after ordering.",
"membership tiers": "Bronze, Silver, Gold, and Platinum - 4 tiers.",
}
for key, value in knowledge.items():
if key in query.lower():
return value
return "No relevant information found."
@tool
def get_order_status(order_id: str) -> str:
"""Look up order status."""
# In practice, this would query a database
return f"Order {order_id}: In transit (expected arrival: 2026-03-13)"
# Bind tools
tools = [search_knowledge_base, get_order_status]
llm = ChatOpenAI(model="gpt-4o", temperature=0).bind_tools(tools)
def assistant_node(state: MessagesState):
"""Assistant node: LLM invocation and tool usage decisions"""
system_msg = SystemMessage(
content="You are an e-commerce customer support chatbot. Use tools when needed."
)
messages = [system_msg] + state["messages"]
response = llm.invoke(messages)
return {"messages": [response]}
# Build graph
graph_builder = StateGraph(MessagesState)
graph_builder.add_node("assistant", assistant_node)
graph_builder.add_node("tools", ToolNode(tools))
graph_builder.add_edge(START, "assistant")
graph_builder.add_conditional_edges("assistant", tools_condition)
graph_builder.add_edge("tools", "assistant")
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)
# Run conversation
config = {"configurable": {"thread_id": "customer-123"}}
response = graph.invoke(
{"messages": [HumanMessage(content="What's the shipping status for order ORD-2026-0311?")]},
config=config,
)
print(f"AI: {response['messages'][-1].content}")
Persistent Memory Implementation
Redis-Based Session Management
import redis
import json
from datetime import datetime, timedelta
from langchain_core.messages import HumanMessage, AIMessage, messages_from_dict, messages_to_dict
class RedisSessionMemory:
"""Redis-based conversation session memory"""
def __init__(self, redis_url="redis://localhost:6379", ttl_hours=24):
self.redis = redis.from_url(redis_url)
self.ttl = timedelta(hours=ttl_hours)
def _key(self, session_id: str) -> str:
return f"chat:session:{session_id}"
def save_messages(self, session_id: str, messages: list):
"""Save message list to Redis"""
key = self._key(session_id)
data = {
"messages": messages_to_dict(messages),
"updated_at": datetime.now().isoformat(),
}
self.redis.setex(key, self.ttl, json.dumps(data, ensure_ascii=False))
def load_messages(self, session_id: str) -> list:
"""Load message list from Redis"""
key = self._key(session_id)
data = self.redis.get(key)
if data is None:
return []
parsed = json.loads(data)
return messages_from_dict(parsed["messages"])
def add_message(self, session_id: str, message):
"""Add a single message"""
messages = self.load_messages(session_id)
messages.append(message)
self.save_messages(session_id, messages)
def clear_session(self, session_id: str):
"""Delete a session"""
self.redis.delete(self._key(session_id))
def get_session_info(self, session_id: str) -> dict:
"""Query session metadata"""
key = self._key(session_id)
data = self.redis.get(key)
if data is None:
return {"exists": False}
parsed = json.loads(data)
return {
"exists": True,
"message_count": len(parsed["messages"]),
"updated_at": parsed["updated_at"],
"ttl_seconds": self.redis.ttl(key),
}
# Usage example
session_memory = RedisSessionMemory(redis_url="redis://localhost:6379")
session_id = "user-abc-123"
session_memory.add_message(session_id, HumanMessage(content="Hello"))
session_memory.add_message(session_id, AIMessage(content="Hello! How can I help you?"))
messages = session_memory.load_messages(session_id)
print(f"Stored messages: {len(messages)}")
PostgreSQL-Based Long-Term Memory
from sqlalchemy import create_engine, Column, String, Text, DateTime, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import json
Base = declarative_base()
class ConversationHistory(Base):
"""Conversation history table"""
__tablename__ = "conversation_history"
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String(255), index=True, nullable=False)
user_id = Column(String(255), index=True, nullable=False)
role = Column(String(50), nullable=False) # human, ai, system
content = Column(Text, nullable=False)
metadata_json = Column(Text, default="{}")
created_at = Column(DateTime, default=datetime.utcnow)
class ConversationSummaryStore(Base):
"""Conversation summary table"""
__tablename__ = "conversation_summaries"
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String(255), unique=True, nullable=False)
user_id = Column(String(255), index=True, nullable=False)
summary = Column(Text, nullable=False)
entity_data = Column(Text, default="{}")
message_count = Column(Integer, default=0)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class PostgresMemoryManager:
"""PostgreSQL-based conversation memory manager"""
def __init__(self, database_url: str):
self.engine = create_engine(database_url)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
def save_message(self, session_id: str, user_id: str, role: str, content: str):
"""Save a message"""
session = self.Session()
try:
msg = ConversationHistory(
session_id=session_id,
user_id=user_id,
role=role,
content=content,
)
session.add(msg)
session.commit()
finally:
session.close()
def get_recent_messages(self, session_id: str, limit: int = 20):
"""Query recent messages"""
session = self.Session()
try:
messages = (
session.query(ConversationHistory)
.filter(ConversationHistory.session_id == session_id)
.order_by(ConversationHistory.created_at.desc())
.limit(limit)
.all()
)
return list(reversed(messages))
finally:
session.close()
def save_summary(self, session_id: str, user_id: str, summary: str,
entity_data: dict, message_count: int):
"""Save/update conversation summary"""
session = self.Session()
try:
existing = (
session.query(ConversationSummaryStore)
.filter(ConversationSummaryStore.session_id == session_id)
.first()
)
if existing:
existing.summary = summary
existing.entity_data = json.dumps(entity_data, ensure_ascii=False)
existing.message_count = message_count
else:
new_summary = ConversationSummaryStore(
session_id=session_id,
user_id=user_id,
summary=summary,
entity_data=json.dumps(entity_data, ensure_ascii=False),
message_count=message_count,
)
session.add(new_summary)
session.commit()
finally:
session.close()
# Usage example
db_url = "postgresql://chatbot:password@localhost:5432/chatbot_db"
memory_manager = PostgresMemoryManager(db_url)
Context Compression Techniques
Summary + Recent Messages Combination
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
class HybridMemoryManager:
"""Hybrid memory combining summary and recent buffer"""
def __init__(self, llm, max_buffer_messages=10):
self.llm = llm
self.max_buffer_messages = max_buffer_messages
self.summary = ""
self.buffer = []
def add_exchange(self, human_msg: str, ai_msg: str):
"""Add a conversation exchange"""
self.buffer.append(HumanMessage(content=human_msg))
self.buffer.append(AIMessage(content=ai_msg))
# Compress old messages into summary when buffer exceeds limit
if len(self.buffer) > self.max_buffer_messages * 2:
self._compress()
def _compress(self):
"""Integrate old messages into summary"""
# Select first half for summarization
to_summarize = self.buffer[: self.max_buffer_messages]
self.buffer = self.buffer[self.max_buffer_messages:]
# Generate summary
conversation_text = "\n".join(
f"{'Human' if isinstance(m, HumanMessage) else 'AI'}: {m.content}"
for m in to_summarize
)
summary_prompt = f"""Below is the previous summary and new conversation. Write an integrated summary.
Previous summary: {self.summary if self.summary else 'None'}
New conversation:
{conversation_text}
Summarize concisely while maintaining key information and context:"""
response = self.llm.invoke([HumanMessage(content=summary_prompt)])
self.summary = response.content
def get_context_messages(self) -> list:
"""Return current context messages"""
messages = []
if self.summary:
messages.append(SystemMessage(
content=f"Previous conversation summary: {self.summary}"
))
messages.extend(self.buffer)
return messages
def get_stats(self) -> dict:
"""Return memory statistics"""
return {
"summary_length": len(self.summary),
"buffer_messages": len(self.buffer),
"has_summary": bool(self.summary),
}
# Usage example
llm = ChatOpenAI(model="gpt-4o", temperature=0)
hybrid_memory = HybridMemoryManager(llm=llm, max_buffer_messages=6)
# Simulate conversation
exchanges = [
("What's the project timeline?", "We're targeting end of March for deployment."),
("Who handles backend development?", "Senior developer Kim is responsible."),
("And frontend?", "Developer Lee is building it with React."),
("Is there a test plan?", "QA team will run integration tests in the first week of April."),
("What's the deployment environment?", "AWS EKS-based Kubernetes environment."),
("What about CI/CD pipeline?", "We use GitHub Actions and ArgoCD."),
("How do you handle monitoring?", "We collect metrics with Grafana and Prometheus."),
]
for human, ai in exchanges:
hybrid_memory.add_exchange(human, ai)
stats = hybrid_memory.get_stats()
print(f"Memory state: {stats}")
RAG-Enhanced Memory
Conversation-Based RAG Pipeline
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.messages import HumanMessage, SystemMessage
from langchain.text_splitter import RecursiveCharacterTextSplitter
class RAGEnhancedMemory:
"""RAG-enhanced conversation memory"""
def __init__(self, llm, embeddings, collection_name="chat_memory"):
self.llm = llm
self.embeddings = embeddings
self.vectorstore = Chroma(
collection_name=collection_name,
embedding_function=embeddings,
)
self.recent_messages = []
self.max_recent = 10
def add_exchange(self, session_id: str, human_msg: str, ai_msg: str):
"""Store conversation in vector store"""
# Add to recent messages buffer
self.recent_messages.append(("human", human_msg))
self.recent_messages.append(("ai", ai_msg))
# Store conversation embedding in vector store
exchange_text = f"User: {human_msg}\nAI: {ai_msg}"
self.vectorstore.add_texts(
texts=[exchange_text],
metadatas=[{"session_id": session_id, "type": "exchange"}],
)
# Maintain recent message limit
if len(self.recent_messages) > self.max_recent * 2:
self.recent_messages = self.recent_messages[-self.max_recent * 2:]
def retrieve_relevant_context(self, query: str, k: int = 3) -> list:
"""Search for past conversations relevant to the query"""
results = self.vectorstore.similarity_search(query, k=k)
return [doc.page_content for doc in results]
def generate_response(self, session_id: str, user_input: str) -> str:
"""Generate RAG-based response"""
# Retrieve relevant past conversations
relevant_context = self.retrieve_relevant_context(user_input)
# Build context
context_parts = []
if relevant_context:
context_parts.append("Relevant previous conversations:")
for ctx in relevant_context:
context_parts.append(f" - {ctx}")
system_content = "You are an AI assistant that leverages previous conversation context in your responses."
if context_parts:
system_content += "\n\n" + "\n".join(context_parts)
messages = [SystemMessage(content=system_content)]
# Add recent messages
for role, content in self.recent_messages[-6:]:
if role == "human":
messages.append(HumanMessage(content=content))
else:
from langchain_core.messages import AIMessage
messages.append(AIMessage(content=content))
messages.append(HumanMessage(content=user_input))
response = self.llm.invoke(messages)
# Save conversation
self.add_exchange(session_id, user_input, response.content)
return response.content
# Usage example
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
embeddings = OpenAIEmbeddings()
rag_memory = RAGEnhancedMemory(llm=llm, embeddings=embeddings)
Session Management Patterns
Multi-Tenant Session Management
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime
import uuid
@dataclass
class ChatSession:
"""Chat session"""
session_id: str
user_id: str
created_at: datetime = field(default_factory=datetime.now)
last_active: datetime = field(default_factory=datetime.now)
metadata: dict = field(default_factory=dict)
is_active: bool = True
class SessionManager:
"""Multi-tenant session manager"""
def __init__(self, max_sessions_per_user: int = 5):
self.sessions: dict = {} # session_id -> ChatSession
self.user_sessions: dict = {} # user_id -> list of session_ids
self.max_sessions_per_user = max_sessions_per_user
def create_session(self, user_id: str, metadata: Optional[dict] = None) -> str:
"""Create a new session"""
# Check per-user session limit
user_session_ids = self.user_sessions.get(user_id, [])
active_sessions = [
sid for sid in user_session_ids
if sid in self.sessions and self.sessions[sid].is_active
]
if len(active_sessions) >= self.max_sessions_per_user:
# Deactivate oldest session
oldest = min(
active_sessions,
key=lambda sid: self.sessions[sid].last_active,
)
self.sessions[oldest].is_active = False
session_id = str(uuid.uuid4())
session = ChatSession(
session_id=session_id,
user_id=user_id,
metadata=metadata or {},
)
self.sessions[session_id] = session
if user_id not in self.user_sessions:
self.user_sessions[user_id] = []
self.user_sessions[user_id].append(session_id)
return session_id
def get_session(self, session_id: str) -> Optional[ChatSession]:
"""Retrieve a session"""
session = self.sessions.get(session_id)
if session and session.is_active:
session.last_active = datetime.now()
return session
return None
def list_user_sessions(self, user_id: str) -> list:
"""List user sessions"""
session_ids = self.user_sessions.get(user_id, [])
return [
self.sessions[sid]
for sid in session_ids
if sid in self.sessions and self.sessions[sid].is_active
]
def close_session(self, session_id: str):
"""Close a session"""
if session_id in self.sessions:
self.sessions[session_id].is_active = False
# Usage example
session_mgr = SessionManager(max_sessions_per_user=3)
session_id = session_mgr.create_session("user-001", {"channel": "web"})
print(f"Created session: {session_id}")
Troubleshooting
Resolving Memory Bloat
When conversations grow long, memory usage can increase rapidly and degrade performance.
class MemoryBloatGuard:
"""Memory bloat prevention guard"""
def __init__(self, max_messages=100, max_token_estimate=50000):
self.max_messages = max_messages
self.max_token_estimate = max_token_estimate
def check_and_trim(self, messages: list) -> tuple:
"""Check memory state and trim"""
total_chars = sum(len(m.content) for m in messages)
estimated_tokens = total_chars // 4 # Rough token estimation
warnings = []
trimmed = messages
if len(messages) > self.max_messages:
warnings.append(
f"Message count exceeded: {len(messages)} > {self.max_messages}"
)
# Preserve system messages, remove oldest from the rest
system_msgs = [m for m in messages if isinstance(m, SystemMessage)]
non_system = [m for m in messages if not isinstance(m, SystemMessage)]
trimmed = system_msgs + non_system[-(self.max_messages - len(system_msgs)):]
if estimated_tokens > self.max_token_estimate:
warnings.append(
f"Token estimate exceeded: {estimated_tokens} > {self.max_token_estimate}"
)
return trimmed, warnings
# Usage
guard = MemoryBloatGuard(max_messages=50, max_token_estimate=30000)
Preventing Context Confusion
Solving the problem where AI confuses previous conversation content and provides incorrect information in long conversations.
class ContextClarityChecker:
"""Context clarity checker"""
def __init__(self, llm):
self.llm = llm
def check_ambiguity(self, user_input: str, recent_messages: list) -> dict:
"""Check user input for ambiguity"""
# Check for pronouns or demonstratives
ambiguous_patterns = ["that", "this", "it", "those", "them", "there", "here"]
has_ambiguity = any(p in user_input.lower().split() for p in ambiguous_patterns)
if not has_ambiguity:
return {"is_ambiguous": False, "resolved_input": user_input}
# Resolve ambiguity with LLM
context_text = "\n".join(
f"{type(m).__name__}: {m.content}" for m in recent_messages[-6:]
)
clarification_prompt = f"""Previous conversation context:
{context_text}
User's new input: {user_input}
Clarify what the pronouns or demonstratives in this input refer to.
Return the sentence with pronouns replaced by their actual referents."""
response = self.llm.invoke([HumanMessage(content=clarification_prompt)])
return {
"is_ambiguous": True,
"original_input": user_input,
"resolved_input": response.content,
}
Operational Notes
Performance Optimization Tips
- Memory type selection: Choose the appropriate memory type based on conversation length. Buffer is recommended for 10 turns or fewer, SummaryBuffer for longer conversations
- Vector store indexing: When using RAG memory, configure appropriate indexes (HNSW, IVF) to optimize search performance
- Redis TTL management: Set appropriate session expiration times to prevent memory leaks
- Asynchronous summarization: Perform conversation summarization asynchronously after response delivery to reduce latency
Security Considerations
- Use UUID v4 for session IDs to ensure unpredictability
- Encrypt conversation content at rest (AES-256)
- Ensure strict session isolation between users
- Apply PII (Personally Identifiable Information) masking
- Establish conversation history retention policies
Production Checklist
- [ ] Select appropriate memory type for conversation length
- [ ] Implement session management system (create, query, expire, delete)
- [ ] Integrate persistent storage (Redis/PostgreSQL) with failure handling
- [ ] Implement memory bloat prevention logic (max messages, token limits)
- [ ] Build context compression (summarization) pipeline
- [ ] Pass multi-tenant session isolation tests
- [ ] Apply PII masking and conversation encryption
- [ ] Configure session expiration policies and TTL
- [ ] Establish conversation history backup and retention policies
- [ ] Build monitoring dashboards (session count, memory usage, response time)
- [ ] Perform load testing (concurrent sessions, performance by conversation length)
- [ ] Test failure recovery scenarios (Redis down, DB connection failure)