Split View: AI 플랫폼: Feature Store와 RAGOps 블루프린트 2026
AI 플랫폼: Feature Store와 RAGOps 블루프린트 2026

- 블루프린트 개요: Feature Store와 RAG 파이프라인의 합류
- 아키텍처 청사진
- 통합 조회 API 설계
- 문서 임베딩 파이프라인: 청킹 전략과 인덱스 관리
- Feature Store 피처와 RAG 결과의 결합 패턴
- RAGOps: 검색 품질 모니터링과 인덱스 갱신 자동화
- 비용과 성능의 트레이드오프
- 장애 시나리오별 대응 런북
- 블루프린트 도입 로드맵
- 퀴즈
- References
블루프린트 개요: Feature Store와 RAG 파이프라인의 합류
"블루프린트"라는 제목답게, 이 글은 Feature Store와 RAG(Retrieval-Augmented Generation) 파이프라인을 하나의 AI 플랫폼 안에서 통합 운영하기 위한 전체 설계도를 제시한다. 개별 도구 사용법이 아니라, 두 시스템이 만나는 접합부에서 발생하는 설계 결정과 운영 규칙을 다룬다.
Feature Store는 정형 데이터 피처를 관리하고, RAG 파이프라인은 비정형 텍스트를 벡터로 변환하여 LLM에 컨텍스트를 제공한다. 2026년 현재 이 두 시스템은 별도로 운영되는 경우가 많지만, 실제 AI 제품에서는 "유저의 최근 구매 이력(Feature Store) + 관련 상품 리뷰 요약(RAG)"처럼 합쳐서 사용하는 케이스가 늘고 있다. 이 글은 그 통합 아키텍처의 설계 원칙, 구현 패턴, 운영 런북을 청사진으로 정리한다.
아키텍처 청사진
전체 데이터 흐름을 네 개의 레이어로 구분한다.
┌──────────────────────────────────────────────────────┐
│ Layer 4: Serving (KServe / API Gateway) │
│ - Feature 조회 + Vector 검색 결과를 LLM에 전달 │
│ - Guardrail, Citation 검증, 응답 필터링 │
├──────────────────────────────────────────────────────┤
│ Layer 3: Feature Store + Vector Store │
│ - Feast Online Store (Redis) : 정형 피처 │
│ - Vector DB (Qdrant/Weaviate) : 비정형 임베딩 │
│ - 통합 조회 API : 두 store를 하나의 인터페이스로 │
├──────────────────────────────────────────────────────┤
│ Layer 2: Ingestion & Transformation │
│ - CDC 파이프라인 (Debezium -> Kafka) │
│ - 문서 청킹 + 임베딩 파이프라인 (Kubeflow) │
│ - Feature View 정의 + Vector Index 관리 │
├──────────────────────────────────────────────────────┤
│ Layer 1: Source Data │
│ - OLTP DB, Data Warehouse, 문서 저장소, API 로그 │
└──────────────────────────────────────────────────────┘
통합 조회 API 설계
Feature Store와 Vector Store를 하나의 요청으로 조회하는 통합 API가 이 블루프린트의 핵심이다.
"""
Feature Store(Feast)와 Vector Store(Qdrant)를 통합 조회하는 서비스.
서빙 레이어에서 모델이나 LLM에 컨텍스트를 제공할 때 사용한다.
"""
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
import asyncio
from feast import FeatureStore
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue
@dataclass
class UnifiedContext:
"""Feature Store 피처 + Vector 검색 결과를 합친 통합 컨텍스트"""
entity_id: str
structured_features: Dict[str, Any] # Feature Store에서 조회
retrieved_documents: List[Dict[str, Any]] # Vector Store에서 검색
feature_freshness_ms: float = 0.0
retrieval_latency_ms: float = 0.0
class UnifiedContextService:
def __init__(
self,
feast_repo_path: str,
qdrant_url: str,
qdrant_collection: str,
):
self.feast_store = FeatureStore(repo_path=feast_repo_path)
self.qdrant = QdrantClient(url=qdrant_url, timeout=5.0)
self.collection = qdrant_collection
async def get_context(
self,
entity_id: str,
query_embedding: List[float],
feature_list: List[str],
top_k: int = 5,
metadata_filter: Optional[Dict[str, str]] = None,
) -> UnifiedContext:
"""
Feature Store 조회와 Vector 검색을 병렬로 수행하여
통합 컨텍스트를 반환한다.
"""
# 병렬 실행: Feature Store 조회 + Vector 검색
feature_task = asyncio.to_thread(
self._fetch_features, entity_id, feature_list
)
vector_task = asyncio.to_thread(
self._search_vectors, query_embedding, top_k, metadata_filter, entity_id
)
features_result, vectors_result = await asyncio.gather(
feature_task, vector_task
)
return UnifiedContext(
entity_id=entity_id,
structured_features=features_result["features"],
retrieved_documents=vectors_result["documents"],
feature_freshness_ms=features_result["latency_ms"],
retrieval_latency_ms=vectors_result["latency_ms"],
)
def _fetch_features(self, entity_id: str, feature_list: List[str]) -> dict:
import time
start = time.monotonic()
result = self.feast_store.get_online_features(
features=feature_list,
entity_rows=[{"user_id": entity_id}],
).to_dict()
elapsed = (time.monotonic() - start) * 1000
# dict에서 리스트 값을 스칼라로 변환
features = {k: v[0] if isinstance(v, list) and v else v for k, v in result.items()}
return {"features": features, "latency_ms": elapsed}
def _search_vectors(
self,
query_embedding: List[float],
top_k: int,
metadata_filter: Optional[Dict[str, str]],
entity_id: str,
) -> dict:
import time
start = time.monotonic()
search_filter = None
if metadata_filter:
conditions = [
FieldCondition(key=k, match=MatchValue(value=v))
for k, v in metadata_filter.items()
]
search_filter = Filter(must=conditions)
results = self.qdrant.search(
collection_name=self.collection,
query_vector=query_embedding,
query_filter=search_filter,
limit=top_k,
with_payload=True,
)
elapsed = (time.monotonic() - start) * 1000
documents = [
{
"text": hit.payload.get("text", ""),
"source": hit.payload.get("source", ""),
"score": hit.score,
"chunk_id": hit.id,
}
for hit in results
]
return {"documents": documents, "latency_ms": elapsed}
문서 임베딩 파이프라인: 청킹 전략과 인덱스 관리
RAG 파이프라인에서 문서를 벡터로 변환하는 과정은 Feature Store의 Feature View 정의와 대칭되는 개념이다. Feature View가 "어떤 변환을 거쳐 피처를 만들 것인가"를 정의하듯, 임베딩 파이프라인은 "어떤 청킹과 임베딩을 거쳐 벡터를 만들 것인가"를 정의한다.
"""
문서 청킹 + 임베딩 파이프라인.
Kubeflow Pipeline component로 실행하거나 독립 스크립트로 사용한다.
"""
from dataclasses import dataclass
from typing import List, Generator
import hashlib
import json
@dataclass
class ChunkConfig:
"""청킹 설정. 문서 유형별로 다른 전략을 적용한다."""
chunk_size: int = 512 # 토큰 기준
chunk_overlap: int = 64 # 청크 간 겹침
separators: tuple = ("\n\n", "\n", ". ", " ")
min_chunk_size: int = 50 # 이 미만은 이전 청크에 병합
metadata_fields: tuple = ("source", "doc_type", "updated_at")
# 문서 유형별 청킹 전략
CHUNK_CONFIGS = {
"product_review": ChunkConfig(chunk_size=256, chunk_overlap=32),
"technical_doc": ChunkConfig(chunk_size=512, chunk_overlap=64),
"faq": ChunkConfig(chunk_size=128, chunk_overlap=0), # FAQ는 겹침 불필요
"legal": ChunkConfig(chunk_size=1024, chunk_overlap=128),
}
def recursive_split(text: str, config: ChunkConfig) -> List[str]:
"""재귀적 텍스트 분할. separators를 순서대로 시도한다."""
if len(text.split()) <= config.chunk_size:
return [text] if len(text.split()) >= config.min_chunk_size else []
for sep in config.separators:
parts = text.split(sep)
if len(parts) > 1:
chunks = []
current = ""
for part in parts:
candidate = current + sep + part if current else part
if len(candidate.split()) > config.chunk_size:
if current:
chunks.append(current.strip())
current = part
else:
current = candidate
if current:
chunks.append(current.strip())
return [c for c in chunks if len(c.split()) >= config.min_chunk_size]
# 모든 separator로 분할 불가 시 강제 분할
words = text.split()
return [
" ".join(words[i:i + config.chunk_size])
for i in range(0, len(words), config.chunk_size - config.chunk_overlap)
]
def create_chunk_id(source: str, chunk_index: int, text: str) -> str:
"""결정론적 chunk ID 생성. 동일 문서의 동일 청크는 같은 ID를 갖는다."""
content_hash = hashlib.sha256(text.encode()).hexdigest()[:12]
return f"{source}::{chunk_index}::{content_hash}"
def process_document(
text: str,
source: str,
doc_type: str,
metadata: dict,
embedding_fn,
) -> List[dict]:
"""
하나의 문서를 청킹하고 임베딩하여 Vector Store에 upsert할
레코드 리스트를 반환한다.
"""
config = CHUNK_CONFIGS.get(doc_type, ChunkConfig())
chunks = recursive_split(text, config)
records = []
for i, chunk_text in enumerate(chunks):
embedding = embedding_fn(chunk_text)
chunk_id = create_chunk_id(source, i, chunk_text)
records.append({
"id": chunk_id,
"vector": embedding,
"payload": {
"text": chunk_text,
"source": source,
"doc_type": doc_type,
"chunk_index": i,
"total_chunks": len(chunks),
"chunk_size_tokens": len(chunk_text.split()),
**{k: metadata.get(k, "") for k in config.metadata_fields},
},
})
return records
Feature Store 피처와 RAG 결과의 결합 패턴
LLM에 전달하는 프롬프트를 구성할 때, Feature Store의 정형 피처와 RAG의 검색 결과를 어떻게 결합하는지가 응답 품질을 결정한다.
"""
Feature Store 피처 + RAG 검색 결과를 결합하여
LLM 프롬프트를 구성하는 패턴.
"""
from typing import Dict, List, Any
def build_augmented_prompt(
user_query: str,
structured_features: Dict[str, Any],
retrieved_documents: List[Dict[str, Any]],
system_instruction: str = "",
) -> str:
"""
정형 피처와 비정형 검색 결과를 결합한 프롬프트 생성.
원칙:
1. 정형 피처는 "사용자 컨텍스트" 섹션에 표 형태로 제공
2. RAG 검색 결과는 "참고 문서" 섹션에 출처와 함께 제공
3. 피처와 문서 간 충돌 시 피처(실시간 데이터)를 우선하라고 명시
"""
# 정형 피처 섹션
feature_lines = []
feature_display_names = {
"total_purchases_7d": "최근 7일 구매 횟수",
"avg_order_value_30d": "최근 30일 평균 주문 금액",
"preferred_category": "선호 카테고리",
"membership_tier": "멤버십 등급",
}
for key, value in structured_features.items():
display = feature_display_names.get(key, key)
feature_lines.append(f"- {display}: {value}")
features_section = "\n".join(feature_lines) if feature_lines else "없음"
# RAG 검색 결과 섹션
doc_sections = []
for i, doc in enumerate(retrieved_documents, 1):
score = doc.get("score", 0)
# relevance score가 낮은 문서는 제외
if score < 0.7:
continue
doc_sections.append(
f"[문서 {i}] (출처: {doc['source']}, 관련도: {score:.2f})\n{doc['text']}"
)
docs_section = "\n\n".join(doc_sections) if doc_sections else "관련 문서 없음"
prompt = f"""{system_instruction}
## 사용자 컨텍스트 (Feature Store 실시간 데이터)
{features_section}
## 참고 문서 (RAG 검색 결과)
{docs_section}
## 지침
- 사용자 컨텍스트와 참고 문서를 모두 활용하여 답변하세요.
- 정형 데이터(사용자 컨텍스트)와 문서 내용이 충돌하면 정형 데이터를 우선합니다.
- 답변에 사용한 참고 문서 번호를 인용하세요.
- 참고 문서에 없는 내용은 추측하지 마세요.
## 사용자 질문
{user_query}"""
return prompt
RAGOps: 검색 품질 모니터링과 인덱스 갱신 자동화
Feature Store에 Freshness SLA가 있듯이, Vector Store에도 인덱스 최신성 관리가 필요하다. 이것을 RAGOps라고 부른다.
인덱스 Freshness 추적
"""
Vector Store 인덱스의 최신성을 추적하는 모니터링 모듈.
문서 원천의 최신 업데이트 시각과 인덱스의 마지막 갱신 시각을
비교하여 stale 상태를 감지한다.
"""
from datetime import datetime, timedelta
from typing import Dict, List
from dataclasses import dataclass
import json
@dataclass
class IndexFreshnessReport:
collection_name: str
total_documents: int
stale_documents: int
freshness_ratio: float
oldest_document_age_hours: float
avg_document_age_hours: float
recommendations: List[str]
def check_index_freshness(
qdrant_client,
collection_name: str,
freshness_threshold_hours: int = 24,
sample_size: int = 1000,
) -> IndexFreshnessReport:
"""
Vector Store 인덱스의 문서 최신성을 점검한다.
"""
# 샘플 문서의 updated_at 필드를 조회
points, _ = qdrant_client.scroll(
collection_name=collection_name,
limit=sample_size,
with_payload=["updated_at", "source"],
)
now = datetime.utcnow()
ages_hours = []
stale_count = 0
stale_sources = set()
for point in points:
updated_at_str = point.payload.get("updated_at", "")
if not updated_at_str:
stale_count += 1
continue
try:
updated_at = datetime.fromisoformat(updated_at_str)
age_hours = (now - updated_at).total_seconds() / 3600
ages_hours.append(age_hours)
if age_hours > freshness_threshold_hours:
stale_count += 1
stale_sources.add(point.payload.get("source", "unknown"))
except ValueError:
stale_count += 1
total = len(points)
freshness_ratio = 1.0 - (stale_count / max(total, 1))
recommendations = []
if freshness_ratio < 0.95:
recommendations.append(
f"Stale 문서 비율이 {(1-freshness_ratio)*100:.1f}%입니다. "
f"임베딩 파이프라인 실행이 필요합니다."
)
if stale_sources:
recommendations.append(
f"Stale 문서 출처: {', '.join(list(stale_sources)[:5])}"
)
if ages_hours and max(ages_hours) > freshness_threshold_hours * 3:
recommendations.append(
"매우 오래된 문서가 존재합니다. 전체 재인덱싱을 고려하세요."
)
return IndexFreshnessReport(
collection_name=collection_name,
total_documents=total,
stale_documents=stale_count,
freshness_ratio=freshness_ratio,
oldest_document_age_hours=max(ages_hours) if ages_hours else 0,
avg_document_age_hours=sum(ages_hours) / len(ages_hours) if ages_hours else 0,
recommendations=recommendations,
)
인덱스 갱신 Kubeflow Pipeline
# rag-index-update-pipeline.yaml
# 주기적으로 변경된 문서를 감지하여 벡터 인덱스를 갱신한다.
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: rag-index-updater
namespace: ml-pipelines
spec:
schedule: '0 */4 * * *' # 4시간마다 실행
concurrencyPolicy: Replace
workflowSpec:
entrypoint: update-index
templates:
- name: update-index
steps:
- - name: detect-changes
template: detect-document-changes
- - name: chunk-and-embed
template: run-embedding-pipeline
arguments:
parameters:
- name: changed-docs
value: '{{steps.detect-changes.outputs.parameters.changed-docs}}'
- - name: verify-freshness
template: check-freshness
- name: detect-document-changes
container:
image: ml-platform/doc-change-detector:v1.2.0
command: [python, detect_changes.py]
args:
- --since=4h
- --output=/tmp/changed_docs.json
env:
- name: DOC_STORE_URL
valueFrom:
secretKeyRef:
name: doc-store-credentials
key: url
- name: run-embedding-pipeline
container:
image: ml-platform/embedding-worker:v2.1.0
command: [python, embed_and_upsert.py]
args:
- --docs-file={{inputs.parameters.changed-docs}}
- --collection=product_knowledge
- --model=text-embedding-3-large
resources:
requests:
nvidia.com/gpu: 1
memory: 8Gi
- name: check-freshness
container:
image: ml-platform/rag-monitor:v1.0.0
command: [python, check_freshness.py]
args:
- --collection=product_knowledge
- --threshold-hours=24
- --alert-on-failure
비용과 성능의 트레이드오프
Feature Store와 Vector Store를 함께 운영할 때 비용 구조를 이해해야 한다.
| 구성 요소 | 비용 동인 | 절감 전략 | 주의사항 |
|---|---|---|---|
| Feast Online Store (Redis) | 메모리 용량 * 시간 | TTL 설정으로 미사용 엔티티 제거 | TTL이 너무 짧으면 cache miss 급증 |
| Vector Store (Qdrant) | 벡터 차원 수 _ 문서 수 _ 복제본 | 차원 축소 (1536 -> 512), 양자화 적용 | 양자화 시 recall 1-3% 하락 |
| 임베딩 API 비용 | 토큰 수 * 호출 횟수 | 변경분만 재임베딩, 배치 처리 | 모델 변경 시 전체 재임베딩 필요 |
| CDC 파이프라인 (Kafka) | 파티션 수 * 보관 기간 | 압축, 보관 기간 단축 | 보관 기간 < backfill 윈도우이면 재복구 불가 |
| 서빙 GPU (LLM) | GPU 시간 * 모델 크기 | 배치 추론, 모델 양자화 | 양자화 레벨별 품질 벤치마크 필수 |
장애 시나리오별 대응 런북
시나리오 1: Vector Store 검색 결과가 갑자기 부정확해짐
증상: RAG 챗봇 답변 품질이 급락. 사용자 피드백 "엉뚱한 답변" 증가.
Retrieval recall@5가 0.85에서 0.52로 하락.
점검 순서:
1. 최근 임베딩 파이프라인 실행 로그 확인
-> 임베딩 모델 버전이 바뀌었는지 체크
2. Qdrant collection info 확인
-> vector dimension이 변경되었는지 체크
3. 원인: 임베딩 모델을 text-embedding-3-small에서 text-embedding-3-large로
업그레이드했으나 기존 벡터는 재임베딩하지 않아 차원 불일치
해결:
1. 임베딩 모델 변경 시 반드시 전체 재인덱싱 실행
2. 새 collection 생성 -> 전체 임베딩 -> alias 전환 (blue-green 패턴)
3. 재인덱싱 완료 전까지 이전 모델로 롤백
시나리오 2: Feature Store 조회와 Vector 검색 중 하나만 실패
증상: 통합 조회 API에서 간헐적 500 에러.
에러 로그: "TimeoutError: Feast online store read timed out after 50ms"
원인: Feature Store(Redis)는 타임아웃이지만 Vector Store(Qdrant)는 정상.
하나의 실패가 전체 요청을 실패시킴.
해결:
1. 통합 조회 API에 partial failure 허용 로직 추가
2. Feature Store 실패 시: 기본값(인구통계 평균) 사용, 로그 기록
3. Vector Store 실패 시: 사전 캐시된 인기 문서로 fallback
4. 양쪽 모두 실패 시: LLM에 컨텍스트 없이 일반 응답 생성 (품질 저하 경고 포함)
블루프린트 도입 로드맵
이 블루프린트를 한 번에 구현하려 하지 말고, 4단계로 나눠서 도입한다.
Phase 1 (4주): 기반 구축
- Feast + Redis Online Store 구성
- Qdrant 단일 노드 배포
- 통합 조회 API 프로토타입
- 검증: 단일 use case에서 Feature Store 조회 + Vector 검색 응답 시간 < 100ms
Phase 2 (4주): 파이프라인 자동화
- CDC 파이프라인 구축 (Feature Store freshness SLA 달성)
- 임베딩 파이프라인 자동화 (변경 감지 -> 재임베딩)
- 검증: Feature freshness p95 < 5분, 인덱스 freshness < 4시간
Phase 3 (4주): 품질 모니터링
- Retrieval 품질 메트릭 (recall@k, MRR) 대시보드
- Feature Store parity 테스트 CI 통합
- Alerting 체계 구축
- 검증: 품질 회귀 시 30분 이내 알림 수신
Phase 4 (4주): 최적화와 확장
- 벡터 양자화, TTL 최적화, 비용 대시보드
- 멀티 collection 지원 (문서 유형별 분리)
- Blue-green 인덱스 전환 자동화
- 검증: 월 인프라 비용 Phase 2 대비 30% 절감
퀴즈
퀴즈
Q1. Feature Store의 Feature View와 RAG 파이프라인의 청킹 설정이 대칭 관계인 이유는?
||둘 다 "원천 데이터를 어떤 변환을 거쳐 서빙 가능한 형태로 만들 것인가"를 정의하는 선언적 명세이며, 변환 로직의 일관성이 학습-서빙/인덱싱-검색 품질을 결정하기 때문이다.||
Q2. 통합 조회 API에서 Feature Store 조회와 Vector 검색을 병렬로 실행하는 이유와 리스크는?
||전체 응답 시간을 max(Feature Store latency, Vector Store latency)로 줄이기 위해서다. 리스크는 하나가 실패하면 전체가 실패할 수 있으므로 partial failure 허용 로직이 필요하다.||
Q3. 임베딩 모델을 업그레이드할 때 기존 벡터를 재임베딩하지 않으면 어떤 문제가 발생하는가?
||새 모델로 생성한 query 임베딩과 구 모델로 생성한 document 임베딩이 같은 벡터 공간에 있지 않으므로 similarity 계산이 무의미해지고 retrieval recall이 급락한다.||
Q4. Blue-green 인덱스 전환 패턴의 핵심 단계는?
||새 collection에 전체 재임베딩 완료 -> alias를 새 collection으로 전환 -> 이전 collection을 일정 기간 유지 후 삭제. 전환 시점에 다운타임이 없다.||
Q5. RAGOps에서 인덱스 freshness SLA를 설정할 때 고려해야 할 주요 요소는?
||문서 변경 빈도, 임베딩 파이프라인 실행 비용, 비즈니스 요구사항(실시간성 필요 여부), 임베딩 API 호출 비용을 종합적으로 고려해야 한다. FAQ처럼 변경이 드문 문서는 24시간, 상품 리뷰는 4시간 등 문서 유형별로 차별화한다.||
Q6. Feature Store 정형 피처와 RAG 문서 내용이 충돌할 때 정형 데이터를 우선하라고 LLM에 지시하는 이유는?
||Feature Store의 데이터는 실시간 CDC로 갱신되어 최신성이 보장되지만, RAG 문서는 인덱싱 주기에 따라 지연될 수 있다. 실시간 데이터가 더 신뢰할 수 있는 출처이기 때문이다.||
Q7. 블루프린트를 4단계로 나눠 도입하는 가장 큰 이유는?
||각 단계에서 정량적 검증 기준을 만족하는지 확인한 후 다음 단계로 진행해야 하며, 한 번에 구현하면 장애 원인을 분리하기 어렵고 팀의 학습 곡선을 소화할 수 없기 때문이다.||
References
AI Platform: Feature Store and RAGOps Blueprint 2026

- Blueprint Overview: The Convergence of Feature Store and RAG Pipeline
- Architecture Blueprint
- Unified Query API Design
- Document Embedding Pipeline: Chunking Strategy and Index Management
- Combining Feature Store Features and RAG Results
- RAGOps: Search Quality Monitoring and Index Update Automation
- Cost and Performance Trade-offs
- Failure Scenario Runbooks
- Blueprint Adoption Roadmap
- Quiz
- References
Blueprint Overview: The Convergence of Feature Store and RAG Pipeline
True to the "Blueprint" title, this article presents a complete design plan for integrating Feature Store and RAG (Retrieval-Augmented Generation) pipeline operations within a single AI platform. Rather than individual tool usage, it addresses the design decisions and operational rules that arise at the junction where these two systems meet.
Feature Store manages structured data features, while the RAG pipeline converts unstructured text into vectors to provide context to LLMs. As of 2026, these two systems are often operated separately, but in actual AI products, use cases that combine them are growing -- such as "the user's recent purchase history (Feature Store) + related product review summaries (RAG)." This article organizes the design principles, implementation patterns, and operational runbooks of this integrated architecture as a blueprint.
Architecture Blueprint
The overall data flow is divided into four layers.
┌──────────────────────────────────────────────────────┐
│ Layer 4: Serving (KServe / API Gateway) │
│ - Deliver Feature lookup + Vector search to LLM │
│ - Guardrail, Citation verification, Response filter │
├──────────────────────────────────────────────────────┤
│ Layer 3: Feature Store + Vector Store │
│ - Feast Online Store (Redis): Structured features │
│ - Vector DB (Qdrant/Weaviate): Unstructured embeds │
│ - Unified Query API: Single interface for both │
├──────────────────────────────────────────────────────┤
│ Layer 2: Ingestion & Transformation │
│ - CDC Pipeline (Debezium -> Kafka) │
│ - Document chunking + embedding pipeline (Kubeflow) │
│ - Feature View definition + Vector Index management │
├──────────────────────────────────────────────────────┤
│ Layer 1: Source Data │
│ - OLTP DB, Data Warehouse, Document Store, API Logs │
└──────────────────────────────────────────────────────┘
Unified Query API Design
The unified API that queries both Feature Store and Vector Store in a single request is the core of this blueprint.
"""
Service that unifies Feature Store (Feast) and Vector Store (Qdrant) queries.
Used in the serving layer to provide context to models or LLMs.
"""
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
import asyncio
from feast import FeatureStore
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue
@dataclass
class UnifiedContext:
"""Unified context combining Feature Store features + Vector search results"""
entity_id: str
structured_features: Dict[str, Any] # Retrieved from Feature Store
retrieved_documents: List[Dict[str, Any]] # Searched from Vector Store
feature_freshness_ms: float = 0.0
retrieval_latency_ms: float = 0.0
class UnifiedContextService:
def __init__(
self,
feast_repo_path: str,
qdrant_url: str,
qdrant_collection: str,
):
self.feast_store = FeatureStore(repo_path=feast_repo_path)
self.qdrant = QdrantClient(url=qdrant_url, timeout=5.0)
self.collection = qdrant_collection
async def get_context(
self,
entity_id: str,
query_embedding: List[float],
feature_list: List[str],
top_k: int = 5,
metadata_filter: Optional[Dict[str, str]] = None,
) -> UnifiedContext:
"""
Performs Feature Store lookup and Vector search in parallel
to return a unified context.
"""
# Parallel execution: Feature Store lookup + Vector search
feature_task = asyncio.to_thread(
self._fetch_features, entity_id, feature_list
)
vector_task = asyncio.to_thread(
self._search_vectors, query_embedding, top_k, metadata_filter, entity_id
)
features_result, vectors_result = await asyncio.gather(
feature_task, vector_task
)
return UnifiedContext(
entity_id=entity_id,
structured_features=features_result["features"],
retrieved_documents=vectors_result["documents"],
feature_freshness_ms=features_result["latency_ms"],
retrieval_latency_ms=vectors_result["latency_ms"],
)
def _fetch_features(self, entity_id: str, feature_list: List[str]) -> dict:
import time
start = time.monotonic()
result = self.feast_store.get_online_features(
features=feature_list,
entity_rows=[{"user_id": entity_id}],
).to_dict()
elapsed = (time.monotonic() - start) * 1000
# Convert list values to scalars in the dict
features = {k: v[0] if isinstance(v, list) and v else v for k, v in result.items()}
return {"features": features, "latency_ms": elapsed}
def _search_vectors(
self,
query_embedding: List[float],
top_k: int,
metadata_filter: Optional[Dict[str, str]],
entity_id: str,
) -> dict:
import time
start = time.monotonic()
search_filter = None
if metadata_filter:
conditions = [
FieldCondition(key=k, match=MatchValue(value=v))
for k, v in metadata_filter.items()
]
search_filter = Filter(must=conditions)
results = self.qdrant.search(
collection_name=self.collection,
query_vector=query_embedding,
query_filter=search_filter,
limit=top_k,
with_payload=True,
)
elapsed = (time.monotonic() - start) * 1000
documents = [
{
"text": hit.payload.get("text", ""),
"source": hit.payload.get("source", ""),
"score": hit.score,
"chunk_id": hit.id,
}
for hit in results
]
return {"documents": documents, "latency_ms": elapsed}
Document Embedding Pipeline: Chunking Strategy and Index Management
In the RAG pipeline, the process of converting documents to vectors is a concept symmetric to the Feature View definition in Feature Store. Just as a Feature View defines "what transformations to apply to create features," the embedding pipeline defines "what chunking and embedding to apply to create vectors."
"""
Document chunking + embedding pipeline.
Can be run as a Kubeflow Pipeline component or as a standalone script.
"""
from dataclasses import dataclass
from typing import List, Generator
import hashlib
import json
@dataclass
class ChunkConfig:
"""Chunking configuration. Apply different strategies per document type."""
chunk_size: int = 512 # Token-based
chunk_overlap: int = 64 # Overlap between chunks
separators: tuple = ("\n\n", "\n", ". ", " ")
min_chunk_size: int = 50 # Merge with previous chunk if under this
metadata_fields: tuple = ("source", "doc_type", "updated_at")
# Per document type chunking strategies
CHUNK_CONFIGS = {
"product_review": ChunkConfig(chunk_size=256, chunk_overlap=32),
"technical_doc": ChunkConfig(chunk_size=512, chunk_overlap=64),
"faq": ChunkConfig(chunk_size=128, chunk_overlap=0), # No overlap needed for FAQ
"legal": ChunkConfig(chunk_size=1024, chunk_overlap=128),
}
def recursive_split(text: str, config: ChunkConfig) -> List[str]:
"""Recursive text splitting. Tries separators in order."""
if len(text.split()) <= config.chunk_size:
return [text] if len(text.split()) >= config.min_chunk_size else []
for sep in config.separators:
parts = text.split(sep)
if len(parts) > 1:
chunks = []
current = ""
for part in parts:
candidate = current + sep + part if current else part
if len(candidate.split()) > config.chunk_size:
if current:
chunks.append(current.strip())
current = part
else:
current = candidate
if current:
chunks.append(current.strip())
return [c for c in chunks if len(c.split()) >= config.min_chunk_size]
# Force split if no separator works
words = text.split()
return [
" ".join(words[i:i + config.chunk_size])
for i in range(0, len(words), config.chunk_size - config.chunk_overlap)
]
def create_chunk_id(source: str, chunk_index: int, text: str) -> str:
"""Deterministic chunk ID generation. Same document + same chunk = same ID."""
content_hash = hashlib.sha256(text.encode()).hexdigest()[:12]
return f"{source}::{chunk_index}::{content_hash}"
def process_document(
text: str,
source: str,
doc_type: str,
metadata: dict,
embedding_fn,
) -> List[dict]:
"""
Chunks and embeds a single document, returning a list of records
ready to upsert into the Vector Store.
"""
config = CHUNK_CONFIGS.get(doc_type, ChunkConfig())
chunks = recursive_split(text, config)
records = []
for i, chunk_text in enumerate(chunks):
embedding = embedding_fn(chunk_text)
chunk_id = create_chunk_id(source, i, chunk_text)
records.append({
"id": chunk_id,
"vector": embedding,
"payload": {
"text": chunk_text,
"source": source,
"doc_type": doc_type,
"chunk_index": i,
"total_chunks": len(chunks),
"chunk_size_tokens": len(chunk_text.split()),
**{k: metadata.get(k, "") for k in config.metadata_fields},
},
})
return records
Combining Feature Store Features and RAG Results
How you combine Feature Store structured features with RAG search results when composing the prompt sent to the LLM determines the response quality.
"""
Pattern for combining Feature Store features + RAG search results
to compose an LLM prompt.
"""
from typing import Dict, List, Any
def build_augmented_prompt(
user_query: str,
structured_features: Dict[str, Any],
retrieved_documents: List[Dict[str, Any]],
system_instruction: str = "",
) -> str:
"""
Generate a prompt combining structured features and unstructured search results.
Principles:
1. Structured features are provided in table format in a "User Context" section
2. RAG search results are provided in a "Reference Documents" section with sources
3. Explicitly state that structured data (real-time) takes priority when there is a conflict
"""
# Structured features section
feature_lines = []
feature_display_names = {
"total_purchases_7d": "Purchases in last 7 days",
"avg_order_value_30d": "Avg order value (30 days)",
"preferred_category": "Preferred category",
"membership_tier": "Membership tier",
}
for key, value in structured_features.items():
display = feature_display_names.get(key, key)
feature_lines.append(f"- {display}: {value}")
features_section = "\n".join(feature_lines) if feature_lines else "None"
# RAG search results section
doc_sections = []
for i, doc in enumerate(retrieved_documents, 1):
score = doc.get("score", 0)
# Exclude documents with low relevance scores
if score < 0.7:
continue
doc_sections.append(
f"[Document {i}] (Source: {doc['source']}, Relevance: {score:.2f})\n{doc['text']}"
)
docs_section = "\n\n".join(doc_sections) if doc_sections else "No relevant documents found"
prompt = f"""{system_instruction}
## User Context (Feature Store Real-Time Data)
{features_section}
## Reference Documents (RAG Search Results)
{docs_section}
## Instructions
- Use both user context and reference documents to answer.
- When structured data (user context) conflicts with document content, prioritize the structured data.
- Cite the reference document numbers used in your answer.
- Do not speculate about content not found in the reference documents.
## User Question
{user_query}"""
return prompt
RAGOps: Search Quality Monitoring and Index Update Automation
Just as Feature Store has Freshness SLAs, Vector Store also requires index freshness management. This is called RAGOps.
Index Freshness Tracking
"""
Monitoring module that tracks Vector Store index freshness.
Compares the latest update time of the document source with the
last update time of the index to detect stale states.
"""
from datetime import datetime, timedelta
from typing import Dict, List
from dataclasses import dataclass
import json
@dataclass
class IndexFreshnessReport:
collection_name: str
total_documents: int
stale_documents: int
freshness_ratio: float
oldest_document_age_hours: float
avg_document_age_hours: float
recommendations: List[str]
def check_index_freshness(
qdrant_client,
collection_name: str,
freshness_threshold_hours: int = 24,
sample_size: int = 1000,
) -> IndexFreshnessReport:
"""
Checks the document freshness of a Vector Store index.
"""
# Query updated_at fields from sample documents
points, _ = qdrant_client.scroll(
collection_name=collection_name,
limit=sample_size,
with_payload=["updated_at", "source"],
)
now = datetime.utcnow()
ages_hours = []
stale_count = 0
stale_sources = set()
for point in points:
updated_at_str = point.payload.get("updated_at", "")
if not updated_at_str:
stale_count += 1
continue
try:
updated_at = datetime.fromisoformat(updated_at_str)
age_hours = (now - updated_at).total_seconds() / 3600
ages_hours.append(age_hours)
if age_hours > freshness_threshold_hours:
stale_count += 1
stale_sources.add(point.payload.get("source", "unknown"))
except ValueError:
stale_count += 1
total = len(points)
freshness_ratio = 1.0 - (stale_count / max(total, 1))
recommendations = []
if freshness_ratio < 0.95:
recommendations.append(
f"Stale document ratio is {(1-freshness_ratio)*100:.1f}%. "
f"Embedding pipeline execution required."
)
if stale_sources:
recommendations.append(
f"Stale document sources: {', '.join(list(stale_sources)[:5])}"
)
if ages_hours and max(ages_hours) > freshness_threshold_hours * 3:
recommendations.append(
"Very old documents exist. Consider a full re-indexing."
)
return IndexFreshnessReport(
collection_name=collection_name,
total_documents=total,
stale_documents=stale_count,
freshness_ratio=freshness_ratio,
oldest_document_age_hours=max(ages_hours) if ages_hours else 0,
avg_document_age_hours=sum(ages_hours) / len(ages_hours) if ages_hours else 0,
recommendations=recommendations,
)
Index Update Kubeflow Pipeline
# rag-index-update-pipeline.yaml
# Periodically detects changed documents and updates the vector index.
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: rag-index-updater
namespace: ml-pipelines
spec:
schedule: '0 */4 * * *' # Run every 4 hours
concurrencyPolicy: Replace
workflowSpec:
entrypoint: update-index
templates:
- name: update-index
steps:
- - name: detect-changes
template: detect-document-changes
- - name: chunk-and-embed
template: run-embedding-pipeline
arguments:
parameters:
- name: changed-docs
value: '{{steps.detect-changes.outputs.parameters.changed-docs}}'
- - name: verify-freshness
template: check-freshness
- name: detect-document-changes
container:
image: ml-platform/doc-change-detector:v1.2.0
command: [python, detect_changes.py]
args:
- --since=4h
- --output=/tmp/changed_docs.json
env:
- name: DOC_STORE_URL
valueFrom:
secretKeyRef:
name: doc-store-credentials
key: url
- name: run-embedding-pipeline
container:
image: ml-platform/embedding-worker:v2.1.0
command: [python, embed_and_upsert.py]
args:
- --docs-file={{inputs.parameters.changed-docs}}
- --collection=product_knowledge
- --model=text-embedding-3-large
resources:
requests:
nvidia.com/gpu: 1
memory: 8Gi
- name: check-freshness
container:
image: ml-platform/rag-monitor:v1.0.0
command: [python, check_freshness.py]
args:
- --collection=product_knowledge
- --threshold-hours=24
- --alert-on-failure
Cost and Performance Trade-offs
When operating Feature Store and Vector Store together, you need to understand the cost structure.
| Component | Cost Driver | Savings Strategy | Caveats |
|---|---|---|---|
| Feast Online Store (Redis) | Memory capacity * time | Remove unused entities with TTL | Too-short TTL causes cache miss spikes |
| Vector Store (Qdrant) | Vector dims _ doc count _ replicas | Dimension reduction (1536 -> 512), quantize | 1-3% recall drop with quantization |
| Embedding API cost | Token count * call count | Re-embed only changes, batch processing | Full re-embedding needed on model change |
| CDC Pipeline (Kafka) | Partition count * retention period | Compression, shorten retention | Recovery impossible if retention is under backfill window |
| Serving GPU (LLM) | GPU hours * model size | Batch inference, model quantization | Quality benchmark required per quantization level |
Failure Scenario Runbooks
Scenario 1: Vector Store Search Results Suddenly Become Inaccurate
Symptom: RAG chatbot answer quality drops sharply. User feedback of "irrelevant answers" increases.
Retrieval recall@5 drops from 0.85 to 0.52.
Investigation order:
1. Check recent embedding pipeline execution logs
-> Check if embedding model version changed
2. Check Qdrant collection info
-> Check if vector dimension changed
3. Root cause: Upgraded embedding model from text-embedding-3-small to text-embedding-3-large
but did not re-embed existing vectors, causing dimension mismatch
Resolution:
1. Always run full re-indexing when changing embedding models
2. Create new collection -> full embedding -> alias switch (blue-green pattern)
3. Roll back to previous model until re-indexing is complete
Scenario 2: Only One of Feature Store Lookup or Vector Search Fails
Symptom: Intermittent 500 errors from the unified query API.
Error log: "TimeoutError: Feast online store read timed out after 50ms"
Root cause: Feature Store (Redis) is timing out but Vector Store (Qdrant) is fine.
One failure causes the entire request to fail.
Resolution:
1. Add partial failure tolerance logic to the unified query API
2. On Feature Store failure: use default values (demographic averages), log the event
3. On Vector Store failure: fall back to pre-cached popular documents
4. On both failures: generate general response without context via LLM (include quality degradation warning)
Blueprint Adoption Roadmap
Do not try to implement this blueprint all at once. Adopt it in 4 phases.
Phase 1 (4 weeks): Foundation Setup
- Set up Feast + Redis Online Store
- Deploy Qdrant single node
- Unified query API prototype
- Validation: Feature Store lookup + Vector search response time under 100ms for a single use case
Phase 2 (4 weeks): Pipeline Automation
- Build CDC pipeline (achieve Feature Store freshness SLA)
- Automate embedding pipeline (change detection -> re-embedding)
- Validation: Feature freshness p95 under 5 minutes, index freshness under 4 hours
Phase 3 (4 weeks): Quality Monitoring
- Retrieval quality metrics (recall@k, MRR) dashboard
- Feature Store parity test CI integration
- Alerting system setup
- Validation: Receive alerts within 30 minutes of quality regression
Phase 4 (4 weeks): Optimization and Scaling
- Vector quantization, TTL optimization, cost dashboard
- Multi-collection support (separated by document type)
- Blue-green index switching automation
- Validation: Monthly infra cost reduced 30% compared to Phase 2
Quiz
Quiz
Q1. Why are Feature Store's Feature View and RAG pipeline's chunking configuration in a symmetric relationship?
Both are declarative specifications that define "how to transform source data into a servable format," and the consistency of the transformation logic determines training-serving / indexing-retrieval quality.
Q2. What is the reason and risk of executing Feature Store lookup and Vector search in parallel in the unified query API?
To reduce overall response time to max(Feature Store latency, Vector Store latency). The risk is that if one fails, the entire request can fail, so partial failure tolerance logic is needed.
Q3. What problem occurs when upgrading the embedding model without re-embedding existing vectors?
The query embedding generated by the new model and the document embeddings generated by the old model are not in the same vector space, making similarity calculations meaningless and causing retrieval recall to plummet.
Q4. What are the key steps of the blue-green index switching pattern?
Complete full re-embedding in the new collection -> switch alias to the new collection -> keep the old collection for a period then delete. There is no downtime at the switching point.
Q5. What are the key factors to consider when setting an index freshness SLA in RAGOps?
Document change frequency, embedding pipeline execution cost, business requirements (whether real-time nature is needed), and embedding API call cost must be considered comprehensively. Documents with rare changes like FAQs can use 24 hours, while product reviews can use 4 hours -- differentiate by document type.
Q6. Why instruct the LLM to prioritize structured data when Feature Store structured features and RAG document content conflict?
Feature Store data is refreshed via real-time CDC, guaranteeing freshness, while RAG documents may be delayed by the indexing cycle. Real-time data is the more trustworthy source.
Q7. What is the biggest reason for adopting the blueprint in 4 phases?
Each phase requires verifying that quantitative validation criteria are met before proceeding to the next, and implementing everything at once makes it difficult to isolate failure causes and the team cannot absorb the learning curve.