- Authors

- Name
- Youngju Kim
- @fjvbn20031

- 블루프린트 개요: Feature Store와 RAG 파이프라인의 합류
- 아키텍처 청사진
- 통합 조회 API 설계
- 문서 임베딩 파이프라인: 청킹 전략과 인덱스 관리
- Feature Store 피처와 RAG 결과의 결합 패턴
- RAGOps: 검색 품질 모니터링과 인덱스 갱신 자동화
- 비용과 성능의 트레이드오프
- 장애 시나리오별 대응 런북
- 블루프린트 도입 로드맵
- 퀴즈
- References
블루프린트 개요: Feature Store와 RAG 파이프라인의 합류
"블루프린트"라는 제목답게, 이 글은 Feature Store와 RAG(Retrieval-Augmented Generation) 파이프라인을 하나의 AI 플랫폼 안에서 통합 운영하기 위한 전체 설계도를 제시한다. 개별 도구 사용법이 아니라, 두 시스템이 만나는 접합부에서 발생하는 설계 결정과 운영 규칙을 다룬다.
Feature Store는 정형 데이터 피처를 관리하고, RAG 파이프라인은 비정형 텍스트를 벡터로 변환하여 LLM에 컨텍스트를 제공한다. 2026년 현재 이 두 시스템은 별도로 운영되는 경우가 많지만, 실제 AI 제품에서는 "유저의 최근 구매 이력(Feature Store) + 관련 상품 리뷰 요약(RAG)"처럼 합쳐서 사용하는 케이스가 늘고 있다. 이 글은 그 통합 아키텍처의 설계 원칙, 구현 패턴, 운영 런북을 청사진으로 정리한다.
아키텍처 청사진
전체 데이터 흐름을 네 개의 레이어로 구분한다.
┌──────────────────────────────────────────────────────┐
│ Layer 4: Serving (KServe / API Gateway) │
│ - Feature 조회 + Vector 검색 결과를 LLM에 전달 │
│ - Guardrail, Citation 검증, 응답 필터링 │
├──────────────────────────────────────────────────────┤
│ Layer 3: Feature Store + Vector Store │
│ - Feast Online Store (Redis) : 정형 피처 │
│ - Vector DB (Qdrant/Weaviate) : 비정형 임베딩 │
│ - 통합 조회 API : 두 store를 하나의 인터페이스로 │
├──────────────────────────────────────────────────────┤
│ Layer 2: Ingestion & Transformation │
│ - CDC 파이프라인 (Debezium -> Kafka) │
│ - 문서 청킹 + 임베딩 파이프라인 (Kubeflow) │
│ - Feature View 정의 + Vector Index 관리 │
├──────────────────────────────────────────────────────┤
│ Layer 1: Source Data │
│ - OLTP DB, Data Warehouse, 문서 저장소, API 로그 │
└──────────────────────────────────────────────────────┘
통합 조회 API 설계
Feature Store와 Vector Store를 하나의 요청으로 조회하는 통합 API가 이 블루프린트의 핵심이다.
"""
Feature Store(Feast)와 Vector Store(Qdrant)를 통합 조회하는 서비스.
서빙 레이어에서 모델이나 LLM에 컨텍스트를 제공할 때 사용한다.
"""
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
import asyncio
from feast import FeatureStore
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue
@dataclass
class UnifiedContext:
"""Feature Store 피처 + Vector 검색 결과를 합친 통합 컨텍스트"""
entity_id: str
structured_features: Dict[str, Any] # Feature Store에서 조회
retrieved_documents: List[Dict[str, Any]] # Vector Store에서 검색
feature_freshness_ms: float = 0.0
retrieval_latency_ms: float = 0.0
class UnifiedContextService:
def __init__(
self,
feast_repo_path: str,
qdrant_url: str,
qdrant_collection: str,
):
self.feast_store = FeatureStore(repo_path=feast_repo_path)
self.qdrant = QdrantClient(url=qdrant_url, timeout=5.0)
self.collection = qdrant_collection
async def get_context(
self,
entity_id: str,
query_embedding: List[float],
feature_list: List[str],
top_k: int = 5,
metadata_filter: Optional[Dict[str, str]] = None,
) -> UnifiedContext:
"""
Feature Store 조회와 Vector 검색을 병렬로 수행하여
통합 컨텍스트를 반환한다.
"""
# 병렬 실행: Feature Store 조회 + Vector 검색
feature_task = asyncio.to_thread(
self._fetch_features, entity_id, feature_list
)
vector_task = asyncio.to_thread(
self._search_vectors, query_embedding, top_k, metadata_filter, entity_id
)
features_result, vectors_result = await asyncio.gather(
feature_task, vector_task
)
return UnifiedContext(
entity_id=entity_id,
structured_features=features_result["features"],
retrieved_documents=vectors_result["documents"],
feature_freshness_ms=features_result["latency_ms"],
retrieval_latency_ms=vectors_result["latency_ms"],
)
def _fetch_features(self, entity_id: str, feature_list: List[str]) -> dict:
import time
start = time.monotonic()
result = self.feast_store.get_online_features(
features=feature_list,
entity_rows=[{"user_id": entity_id}],
).to_dict()
elapsed = (time.monotonic() - start) * 1000
# dict에서 리스트 값을 스칼라로 변환
features = {k: v[0] if isinstance(v, list) and v else v for k, v in result.items()}
return {"features": features, "latency_ms": elapsed}
def _search_vectors(
self,
query_embedding: List[float],
top_k: int,
metadata_filter: Optional[Dict[str, str]],
entity_id: str,
) -> dict:
import time
start = time.monotonic()
search_filter = None
if metadata_filter:
conditions = [
FieldCondition(key=k, match=MatchValue(value=v))
for k, v in metadata_filter.items()
]
search_filter = Filter(must=conditions)
results = self.qdrant.search(
collection_name=self.collection,
query_vector=query_embedding,
query_filter=search_filter,
limit=top_k,
with_payload=True,
)
elapsed = (time.monotonic() - start) * 1000
documents = [
{
"text": hit.payload.get("text", ""),
"source": hit.payload.get("source", ""),
"score": hit.score,
"chunk_id": hit.id,
}
for hit in results
]
return {"documents": documents, "latency_ms": elapsed}
문서 임베딩 파이프라인: 청킹 전략과 인덱스 관리
RAG 파이프라인에서 문서를 벡터로 변환하는 과정은 Feature Store의 Feature View 정의와 대칭되는 개념이다. Feature View가 "어떤 변환을 거쳐 피처를 만들 것인가"를 정의하듯, 임베딩 파이프라인은 "어떤 청킹과 임베딩을 거쳐 벡터를 만들 것인가"를 정의한다.
"""
문서 청킹 + 임베딩 파이프라인.
Kubeflow Pipeline component로 실행하거나 독립 스크립트로 사용한다.
"""
from dataclasses import dataclass
from typing import List, Generator
import hashlib
import json
@dataclass
class ChunkConfig:
"""청킹 설정. 문서 유형별로 다른 전략을 적용한다."""
chunk_size: int = 512 # 토큰 기준
chunk_overlap: int = 64 # 청크 간 겹침
separators: tuple = ("\n\n", "\n", ". ", " ")
min_chunk_size: int = 50 # 이 미만은 이전 청크에 병합
metadata_fields: tuple = ("source", "doc_type", "updated_at")
# 문서 유형별 청킹 전략
CHUNK_CONFIGS = {
"product_review": ChunkConfig(chunk_size=256, chunk_overlap=32),
"technical_doc": ChunkConfig(chunk_size=512, chunk_overlap=64),
"faq": ChunkConfig(chunk_size=128, chunk_overlap=0), # FAQ는 겹침 불필요
"legal": ChunkConfig(chunk_size=1024, chunk_overlap=128),
}
def recursive_split(text: str, config: ChunkConfig) -> List[str]:
"""재귀적 텍스트 분할. separators를 순서대로 시도한다."""
if len(text.split()) <= config.chunk_size:
return [text] if len(text.split()) >= config.min_chunk_size else []
for sep in config.separators:
parts = text.split(sep)
if len(parts) > 1:
chunks = []
current = ""
for part in parts:
candidate = current + sep + part if current else part
if len(candidate.split()) > config.chunk_size:
if current:
chunks.append(current.strip())
current = part
else:
current = candidate
if current:
chunks.append(current.strip())
return [c for c in chunks if len(c.split()) >= config.min_chunk_size]
# 모든 separator로 분할 불가 시 강제 분할
words = text.split()
return [
" ".join(words[i:i + config.chunk_size])
for i in range(0, len(words), config.chunk_size - config.chunk_overlap)
]
def create_chunk_id(source: str, chunk_index: int, text: str) -> str:
"""결정론적 chunk ID 생성. 동일 문서의 동일 청크는 같은 ID를 갖는다."""
content_hash = hashlib.sha256(text.encode()).hexdigest()[:12]
return f"{source}::{chunk_index}::{content_hash}"
def process_document(
text: str,
source: str,
doc_type: str,
metadata: dict,
embedding_fn,
) -> List[dict]:
"""
하나의 문서를 청킹하고 임베딩하여 Vector Store에 upsert할
레코드 리스트를 반환한다.
"""
config = CHUNK_CONFIGS.get(doc_type, ChunkConfig())
chunks = recursive_split(text, config)
records = []
for i, chunk_text in enumerate(chunks):
embedding = embedding_fn(chunk_text)
chunk_id = create_chunk_id(source, i, chunk_text)
records.append({
"id": chunk_id,
"vector": embedding,
"payload": {
"text": chunk_text,
"source": source,
"doc_type": doc_type,
"chunk_index": i,
"total_chunks": len(chunks),
"chunk_size_tokens": len(chunk_text.split()),
**{k: metadata.get(k, "") for k in config.metadata_fields},
},
})
return records
Feature Store 피처와 RAG 결과의 결합 패턴
LLM에 전달하는 프롬프트를 구성할 때, Feature Store의 정형 피처와 RAG의 검색 결과를 어떻게 결합하는지가 응답 품질을 결정한다.
"""
Feature Store 피처 + RAG 검색 결과를 결합하여
LLM 프롬프트를 구성하는 패턴.
"""
from typing import Dict, List, Any
def build_augmented_prompt(
user_query: str,
structured_features: Dict[str, Any],
retrieved_documents: List[Dict[str, Any]],
system_instruction: str = "",
) -> str:
"""
정형 피처와 비정형 검색 결과를 결합한 프롬프트 생성.
원칙:
1. 정형 피처는 "사용자 컨텍스트" 섹션에 표 형태로 제공
2. RAG 검색 결과는 "참고 문서" 섹션에 출처와 함께 제공
3. 피처와 문서 간 충돌 시 피처(실시간 데이터)를 우선하라고 명시
"""
# 정형 피처 섹션
feature_lines = []
feature_display_names = {
"total_purchases_7d": "최근 7일 구매 횟수",
"avg_order_value_30d": "최근 30일 평균 주문 금액",
"preferred_category": "선호 카테고리",
"membership_tier": "멤버십 등급",
}
for key, value in structured_features.items():
display = feature_display_names.get(key, key)
feature_lines.append(f"- {display}: {value}")
features_section = "\n".join(feature_lines) if feature_lines else "없음"
# RAG 검색 결과 섹션
doc_sections = []
for i, doc in enumerate(retrieved_documents, 1):
score = doc.get("score", 0)
# relevance score가 낮은 문서는 제외
if score < 0.7:
continue
doc_sections.append(
f"[문서 {i}] (출처: {doc['source']}, 관련도: {score:.2f})\n{doc['text']}"
)
docs_section = "\n\n".join(doc_sections) if doc_sections else "관련 문서 없음"
prompt = f"""{system_instruction}
## 사용자 컨텍스트 (Feature Store 실시간 데이터)
{features_section}
## 참고 문서 (RAG 검색 결과)
{docs_section}
## 지침
- 사용자 컨텍스트와 참고 문서를 모두 활용하여 답변하세요.
- 정형 데이터(사용자 컨텍스트)와 문서 내용이 충돌하면 정형 데이터를 우선합니다.
- 답변에 사용한 참고 문서 번호를 인용하세요.
- 참고 문서에 없는 내용은 추측하지 마세요.
## 사용자 질문
{user_query}"""
return prompt
RAGOps: 검색 품질 모니터링과 인덱스 갱신 자동화
Feature Store에 Freshness SLA가 있듯이, Vector Store에도 인덱스 최신성 관리가 필요하다. 이것을 RAGOps라고 부른다.
인덱스 Freshness 추적
"""
Vector Store 인덱스의 최신성을 추적하는 모니터링 모듈.
문서 원천의 최신 업데이트 시각과 인덱스의 마지막 갱신 시각을
비교하여 stale 상태를 감지한다.
"""
from datetime import datetime, timedelta
from typing import Dict, List
from dataclasses import dataclass
import json
@dataclass
class IndexFreshnessReport:
collection_name: str
total_documents: int
stale_documents: int
freshness_ratio: float
oldest_document_age_hours: float
avg_document_age_hours: float
recommendations: List[str]
def check_index_freshness(
qdrant_client,
collection_name: str,
freshness_threshold_hours: int = 24,
sample_size: int = 1000,
) -> IndexFreshnessReport:
"""
Vector Store 인덱스의 문서 최신성을 점검한다.
"""
# 샘플 문서의 updated_at 필드를 조회
points, _ = qdrant_client.scroll(
collection_name=collection_name,
limit=sample_size,
with_payload=["updated_at", "source"],
)
now = datetime.utcnow()
ages_hours = []
stale_count = 0
stale_sources = set()
for point in points:
updated_at_str = point.payload.get("updated_at", "")
if not updated_at_str:
stale_count += 1
continue
try:
updated_at = datetime.fromisoformat(updated_at_str)
age_hours = (now - updated_at).total_seconds() / 3600
ages_hours.append(age_hours)
if age_hours > freshness_threshold_hours:
stale_count += 1
stale_sources.add(point.payload.get("source", "unknown"))
except ValueError:
stale_count += 1
total = len(points)
freshness_ratio = 1.0 - (stale_count / max(total, 1))
recommendations = []
if freshness_ratio < 0.95:
recommendations.append(
f"Stale 문서 비율이 {(1-freshness_ratio)*100:.1f}%입니다. "
f"임베딩 파이프라인 실행이 필요합니다."
)
if stale_sources:
recommendations.append(
f"Stale 문서 출처: {', '.join(list(stale_sources)[:5])}"
)
if ages_hours and max(ages_hours) > freshness_threshold_hours * 3:
recommendations.append(
"매우 오래된 문서가 존재합니다. 전체 재인덱싱을 고려하세요."
)
return IndexFreshnessReport(
collection_name=collection_name,
total_documents=total,
stale_documents=stale_count,
freshness_ratio=freshness_ratio,
oldest_document_age_hours=max(ages_hours) if ages_hours else 0,
avg_document_age_hours=sum(ages_hours) / len(ages_hours) if ages_hours else 0,
recommendations=recommendations,
)
인덱스 갱신 Kubeflow Pipeline
# rag-index-update-pipeline.yaml
# 주기적으로 변경된 문서를 감지하여 벡터 인덱스를 갱신한다.
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: rag-index-updater
namespace: ml-pipelines
spec:
schedule: '0 */4 * * *' # 4시간마다 실행
concurrencyPolicy: Replace
workflowSpec:
entrypoint: update-index
templates:
- name: update-index
steps:
- - name: detect-changes
template: detect-document-changes
- - name: chunk-and-embed
template: run-embedding-pipeline
arguments:
parameters:
- name: changed-docs
value: '{{steps.detect-changes.outputs.parameters.changed-docs}}'
- - name: verify-freshness
template: check-freshness
- name: detect-document-changes
container:
image: ml-platform/doc-change-detector:v1.2.0
command: [python, detect_changes.py]
args:
- --since=4h
- --output=/tmp/changed_docs.json
env:
- name: DOC_STORE_URL
valueFrom:
secretKeyRef:
name: doc-store-credentials
key: url
- name: run-embedding-pipeline
container:
image: ml-platform/embedding-worker:v2.1.0
command: [python, embed_and_upsert.py]
args:
- --docs-file={{inputs.parameters.changed-docs}}
- --collection=product_knowledge
- --model=text-embedding-3-large
resources:
requests:
nvidia.com/gpu: 1
memory: 8Gi
- name: check-freshness
container:
image: ml-platform/rag-monitor:v1.0.0
command: [python, check_freshness.py]
args:
- --collection=product_knowledge
- --threshold-hours=24
- --alert-on-failure
비용과 성능의 트레이드오프
Feature Store와 Vector Store를 함께 운영할 때 비용 구조를 이해해야 한다.
| 구성 요소 | 비용 동인 | 절감 전략 | 주의사항 |
|---|---|---|---|
| Feast Online Store (Redis) | 메모리 용량 * 시간 | TTL 설정으로 미사용 엔티티 제거 | TTL이 너무 짧으면 cache miss 급증 |
| Vector Store (Qdrant) | 벡터 차원 수 _ 문서 수 _ 복제본 | 차원 축소 (1536 -> 512), 양자화 적용 | 양자화 시 recall 1-3% 하락 |
| 임베딩 API 비용 | 토큰 수 * 호출 횟수 | 변경분만 재임베딩, 배치 처리 | 모델 변경 시 전체 재임베딩 필요 |
| CDC 파이프라인 (Kafka) | 파티션 수 * 보관 기간 | 압축, 보관 기간 단축 | 보관 기간 < backfill 윈도우이면 재복구 불가 |
| 서빙 GPU (LLM) | GPU 시간 * 모델 크기 | 배치 추론, 모델 양자화 | 양자화 레벨별 품질 벤치마크 필수 |
장애 시나리오별 대응 런북
시나리오 1: Vector Store 검색 결과가 갑자기 부정확해짐
증상: RAG 챗봇 답변 품질이 급락. 사용자 피드백 "엉뚱한 답변" 증가.
Retrieval recall@5가 0.85에서 0.52로 하락.
점검 순서:
1. 최근 임베딩 파이프라인 실행 로그 확인
-> 임베딩 모델 버전이 바뀌었는지 체크
2. Qdrant collection info 확인
-> vector dimension이 변경되었는지 체크
3. 원인: 임베딩 모델을 text-embedding-3-small에서 text-embedding-3-large로
업그레이드했으나 기존 벡터는 재임베딩하지 않아 차원 불일치
해결:
1. 임베딩 모델 변경 시 반드시 전체 재인덱싱 실행
2. 새 collection 생성 -> 전체 임베딩 -> alias 전환 (blue-green 패턴)
3. 재인덱싱 완료 전까지 이전 모델로 롤백
시나리오 2: Feature Store 조회와 Vector 검색 중 하나만 실패
증상: 통합 조회 API에서 간헐적 500 에러.
에러 로그: "TimeoutError: Feast online store read timed out after 50ms"
원인: Feature Store(Redis)는 타임아웃이지만 Vector Store(Qdrant)는 정상.
하나의 실패가 전체 요청을 실패시킴.
해결:
1. 통합 조회 API에 partial failure 허용 로직 추가
2. Feature Store 실패 시: 기본값(인구통계 평균) 사용, 로그 기록
3. Vector Store 실패 시: 사전 캐시된 인기 문서로 fallback
4. 양쪽 모두 실패 시: LLM에 컨텍스트 없이 일반 응답 생성 (품질 저하 경고 포함)
블루프린트 도입 로드맵
이 블루프린트를 한 번에 구현하려 하지 말고, 4단계로 나눠서 도입한다.
Phase 1 (4주): 기반 구축
- Feast + Redis Online Store 구성
- Qdrant 단일 노드 배포
- 통합 조회 API 프로토타입
- 검증: 단일 use case에서 Feature Store 조회 + Vector 검색 응답 시간 < 100ms
Phase 2 (4주): 파이프라인 자동화
- CDC 파이프라인 구축 (Feature Store freshness SLA 달성)
- 임베딩 파이프라인 자동화 (변경 감지 -> 재임베딩)
- 검증: Feature freshness p95 < 5분, 인덱스 freshness < 4시간
Phase 3 (4주): 품질 모니터링
- Retrieval 품질 메트릭 (recall@k, MRR) 대시보드
- Feature Store parity 테스트 CI 통합
- Alerting 체계 구축
- 검증: 품질 회귀 시 30분 이내 알림 수신
Phase 4 (4주): 최적화와 확장
- 벡터 양자화, TTL 최적화, 비용 대시보드
- 멀티 collection 지원 (문서 유형별 분리)
- Blue-green 인덱스 전환 자동화
- 검증: 월 인프라 비용 Phase 2 대비 30% 절감
퀴즈
퀴즈
Q1. Feature Store의 Feature View와 RAG 파이프라인의 청킹 설정이 대칭 관계인 이유는?
||둘 다 "원천 데이터를 어떤 변환을 거쳐 서빙 가능한 형태로 만들 것인가"를 정의하는 선언적 명세이며, 변환 로직의 일관성이 학습-서빙/인덱싱-검색 품질을 결정하기 때문이다.||
Q2. 통합 조회 API에서 Feature Store 조회와 Vector 검색을 병렬로 실행하는 이유와 리스크는?
||전체 응답 시간을 max(Feature Store latency, Vector Store latency)로 줄이기 위해서다. 리스크는 하나가 실패하면 전체가 실패할 수 있으므로 partial failure 허용 로직이 필요하다.||
Q3. 임베딩 모델을 업그레이드할 때 기존 벡터를 재임베딩하지 않으면 어떤 문제가 발생하는가?
||새 모델로 생성한 query 임베딩과 구 모델로 생성한 document 임베딩이 같은 벡터 공간에 있지 않으므로 similarity 계산이 무의미해지고 retrieval recall이 급락한다.||
Q4. Blue-green 인덱스 전환 패턴의 핵심 단계는?
||새 collection에 전체 재임베딩 완료 -> alias를 새 collection으로 전환 -> 이전 collection을 일정 기간 유지 후 삭제. 전환 시점에 다운타임이 없다.||
Q5. RAGOps에서 인덱스 freshness SLA를 설정할 때 고려해야 할 주요 요소는?
||문서 변경 빈도, 임베딩 파이프라인 실행 비용, 비즈니스 요구사항(실시간성 필요 여부), 임베딩 API 호출 비용을 종합적으로 고려해야 한다. FAQ처럼 변경이 드문 문서는 24시간, 상품 리뷰는 4시간 등 문서 유형별로 차별화한다.||
Q6. Feature Store 정형 피처와 RAG 문서 내용이 충돌할 때 정형 데이터를 우선하라고 LLM에 지시하는 이유는?
||Feature Store의 데이터는 실시간 CDC로 갱신되어 최신성이 보장되지만, RAG 문서는 인덱싱 주기에 따라 지연될 수 있다. 실시간 데이터가 더 신뢰할 수 있는 출처이기 때문이다.||
Q7. 블루프린트를 4단계로 나눠 도입하는 가장 큰 이유는?
||각 단계에서 정량적 검증 기준을 만족하는지 확인한 후 다음 단계로 진행해야 하며, 한 번에 구현하면 장애 원인을 분리하기 어렵고 팀의 학습 곡선을 소화할 수 없기 때문이다.||