Skip to content

Split View: AI 플랫폼: Feature Store와 RAGOps 블루프린트 2026

|

AI 플랫폼: Feature Store와 RAGOps 블루프린트 2026

AI 플랫폼: Feature Store와 RAGOps 블루프린트 2026

블루프린트 개요: Feature Store와 RAG 파이프라인의 합류

"블루프린트"라는 제목답게, 이 글은 Feature Store와 RAG(Retrieval-Augmented Generation) 파이프라인을 하나의 AI 플랫폼 안에서 통합 운영하기 위한 전체 설계도를 제시한다. 개별 도구 사용법이 아니라, 두 시스템이 만나는 접합부에서 발생하는 설계 결정과 운영 규칙을 다룬다.

Feature Store는 정형 데이터 피처를 관리하고, RAG 파이프라인은 비정형 텍스트를 벡터로 변환하여 LLM에 컨텍스트를 제공한다. 2026년 현재 이 두 시스템은 별도로 운영되는 경우가 많지만, 실제 AI 제품에서는 "유저의 최근 구매 이력(Feature Store) + 관련 상품 리뷰 요약(RAG)"처럼 합쳐서 사용하는 케이스가 늘고 있다. 이 글은 그 통합 아키텍처의 설계 원칙, 구현 패턴, 운영 런북을 청사진으로 정리한다.

아키텍처 청사진

전체 데이터 흐름을 네 개의 레이어로 구분한다.

┌──────────────────────────────────────────────────────┐
Layer 4: Serving (KServe / API Gateway)- Feature 조회 + Vector 검색 결과를 LLM에 전달      │
- Guardrail, Citation 검증, 응답 필터링             │
├──────────────────────────────────────────────────────┤
Layer 3: Feature Store + Vector Store- Feast Online Store (Redis) : 정형 피처            │
- Vector DB (Qdrant/Weaviate) : 비정형 임베딩       │
- 통합 조회 API : 두 store를 하나의 인터페이스로    │
├──────────────────────────────────────────────────────┤
Layer 2: Ingestion & Transformation- CDC 파이프라인 (Debezium -> Kafka)- 문서 청킹 + 임베딩 파이프라인 (Kubeflow)- Feature View 정의 + Vector Index 관리             │
├──────────────────────────────────────────────────────┤
Layer 1: Source Data- OLTP DB, Data Warehouse, 문서 저장소, API 로그   │
└──────────────────────────────────────────────────────┘

통합 조회 API 설계

Feature Store와 Vector Store를 하나의 요청으로 조회하는 통합 API가 이 블루프린트의 핵심이다.

"""
Feature Store(Feast)와 Vector Store(Qdrant)를 통합 조회하는 서비스.
서빙 레이어에서 모델이나 LLM에 컨텍스트를 제공할 때 사용한다.
"""
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
import asyncio
from feast import FeatureStore
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue

@dataclass
class UnifiedContext:
    """Feature Store 피처 + Vector 검색 결과를 합친 통합 컨텍스트"""
    entity_id: str
    structured_features: Dict[str, Any]          # Feature Store에서 조회
    retrieved_documents: List[Dict[str, Any]]    # Vector Store에서 검색
    feature_freshness_ms: float = 0.0
    retrieval_latency_ms: float = 0.0

class UnifiedContextService:
    def __init__(
        self,
        feast_repo_path: str,
        qdrant_url: str,
        qdrant_collection: str,
    ):
        self.feast_store = FeatureStore(repo_path=feast_repo_path)
        self.qdrant = QdrantClient(url=qdrant_url, timeout=5.0)
        self.collection = qdrant_collection

    async def get_context(
        self,
        entity_id: str,
        query_embedding: List[float],
        feature_list: List[str],
        top_k: int = 5,
        metadata_filter: Optional[Dict[str, str]] = None,
    ) -> UnifiedContext:
        """
        Feature Store 조회와 Vector 검색을 병렬로 수행하여
        통합 컨텍스트를 반환한다.
        """
        # 병렬 실행: Feature Store 조회 + Vector 검색
        feature_task = asyncio.to_thread(
            self._fetch_features, entity_id, feature_list
        )
        vector_task = asyncio.to_thread(
            self._search_vectors, query_embedding, top_k, metadata_filter, entity_id
        )

        features_result, vectors_result = await asyncio.gather(
            feature_task, vector_task
        )

        return UnifiedContext(
            entity_id=entity_id,
            structured_features=features_result["features"],
            retrieved_documents=vectors_result["documents"],
            feature_freshness_ms=features_result["latency_ms"],
            retrieval_latency_ms=vectors_result["latency_ms"],
        )

    def _fetch_features(self, entity_id: str, feature_list: List[str]) -> dict:
        import time
        start = time.monotonic()
        result = self.feast_store.get_online_features(
            features=feature_list,
            entity_rows=[{"user_id": entity_id}],
        ).to_dict()
        elapsed = (time.monotonic() - start) * 1000

        # dict에서 리스트 값을 스칼라로 변환
        features = {k: v[0] if isinstance(v, list) and v else v for k, v in result.items()}
        return {"features": features, "latency_ms": elapsed}

    def _search_vectors(
        self,
        query_embedding: List[float],
        top_k: int,
        metadata_filter: Optional[Dict[str, str]],
        entity_id: str,
    ) -> dict:
        import time
        start = time.monotonic()

        search_filter = None
        if metadata_filter:
            conditions = [
                FieldCondition(key=k, match=MatchValue(value=v))
                for k, v in metadata_filter.items()
            ]
            search_filter = Filter(must=conditions)

        results = self.qdrant.search(
            collection_name=self.collection,
            query_vector=query_embedding,
            query_filter=search_filter,
            limit=top_k,
            with_payload=True,
        )
        elapsed = (time.monotonic() - start) * 1000

        documents = [
            {
                "text": hit.payload.get("text", ""),
                "source": hit.payload.get("source", ""),
                "score": hit.score,
                "chunk_id": hit.id,
            }
            for hit in results
        ]
        return {"documents": documents, "latency_ms": elapsed}

문서 임베딩 파이프라인: 청킹 전략과 인덱스 관리

RAG 파이프라인에서 문서를 벡터로 변환하는 과정은 Feature Store의 Feature View 정의와 대칭되는 개념이다. Feature View가 "어떤 변환을 거쳐 피처를 만들 것인가"를 정의하듯, 임베딩 파이프라인은 "어떤 청킹과 임베딩을 거쳐 벡터를 만들 것인가"를 정의한다.

"""
문서 청킹 + 임베딩 파이프라인.
Kubeflow Pipeline component로 실행하거나 독립 스크립트로 사용한다.
"""
from dataclasses import dataclass
from typing import List, Generator
import hashlib
import json

@dataclass
class ChunkConfig:
    """청킹 설정. 문서 유형별로 다른 전략을 적용한다."""
    chunk_size: int = 512          # 토큰 기준
    chunk_overlap: int = 64        # 청크 간 겹침
    separators: tuple = ("\n\n", "\n", ". ", " ")
    min_chunk_size: int = 50       # 이 미만은 이전 청크에 병합
    metadata_fields: tuple = ("source", "doc_type", "updated_at")

# 문서 유형별 청킹 전략
CHUNK_CONFIGS = {
    "product_review": ChunkConfig(chunk_size=256, chunk_overlap=32),
    "technical_doc": ChunkConfig(chunk_size=512, chunk_overlap=64),
    "faq": ChunkConfig(chunk_size=128, chunk_overlap=0),  # FAQ는 겹침 불필요
    "legal": ChunkConfig(chunk_size=1024, chunk_overlap=128),
}

def recursive_split(text: str, config: ChunkConfig) -> List[str]:
    """재귀적 텍스트 분할. separators를 순서대로 시도한다."""
    if len(text.split()) <= config.chunk_size:
        return [text] if len(text.split()) >= config.min_chunk_size else []

    for sep in config.separators:
        parts = text.split(sep)
        if len(parts) > 1:
            chunks = []
            current = ""
            for part in parts:
                candidate = current + sep + part if current else part
                if len(candidate.split()) > config.chunk_size:
                    if current:
                        chunks.append(current.strip())
                    current = part
                else:
                    current = candidate
            if current:
                chunks.append(current.strip())
            return [c for c in chunks if len(c.split()) >= config.min_chunk_size]

    # 모든 separator로 분할 불가 시 강제 분할
    words = text.split()
    return [
        " ".join(words[i:i + config.chunk_size])
        for i in range(0, len(words), config.chunk_size - config.chunk_overlap)
    ]

def create_chunk_id(source: str, chunk_index: int, text: str) -> str:
    """결정론적 chunk ID 생성. 동일 문서의 동일 청크는 같은 ID를 갖는다."""
    content_hash = hashlib.sha256(text.encode()).hexdigest()[:12]
    return f"{source}::{chunk_index}::{content_hash}"

def process_document(
    text: str,
    source: str,
    doc_type: str,
    metadata: dict,
    embedding_fn,
) -> List[dict]:
    """
    하나의 문서를 청킹하고 임베딩하여 Vector Store에 upsert할
    레코드 리스트를 반환한다.
    """
    config = CHUNK_CONFIGS.get(doc_type, ChunkConfig())
    chunks = recursive_split(text, config)

    records = []
    for i, chunk_text in enumerate(chunks):
        embedding = embedding_fn(chunk_text)
        chunk_id = create_chunk_id(source, i, chunk_text)
        records.append({
            "id": chunk_id,
            "vector": embedding,
            "payload": {
                "text": chunk_text,
                "source": source,
                "doc_type": doc_type,
                "chunk_index": i,
                "total_chunks": len(chunks),
                "chunk_size_tokens": len(chunk_text.split()),
                **{k: metadata.get(k, "") for k in config.metadata_fields},
            },
        })
    return records

Feature Store 피처와 RAG 결과의 결합 패턴

LLM에 전달하는 프롬프트를 구성할 때, Feature Store의 정형 피처와 RAG의 검색 결과를 어떻게 결합하는지가 응답 품질을 결정한다.

"""
Feature Store 피처 + RAG 검색 결과를 결합하여
LLM 프롬프트를 구성하는 패턴.
"""
from typing import Dict, List, Any

def build_augmented_prompt(
    user_query: str,
    structured_features: Dict[str, Any],
    retrieved_documents: List[Dict[str, Any]],
    system_instruction: str = "",
) -> str:
    """
    정형 피처와 비정형 검색 결과를 결합한 프롬프트 생성.

    원칙:
    1. 정형 피처는 "사용자 컨텍스트" 섹션에 표 형태로 제공
    2. RAG 검색 결과는 "참고 문서" 섹션에 출처와 함께 제공
    3. 피처와 문서 간 충돌 시 피처(실시간 데이터)를 우선하라고 명시
    """
    # 정형 피처 섹션
    feature_lines = []
    feature_display_names = {
        "total_purchases_7d": "최근 7일 구매 횟수",
        "avg_order_value_30d": "최근 30일 평균 주문 금액",
        "preferred_category": "선호 카테고리",
        "membership_tier": "멤버십 등급",
    }
    for key, value in structured_features.items():
        display = feature_display_names.get(key, key)
        feature_lines.append(f"- {display}: {value}")

    features_section = "\n".join(feature_lines) if feature_lines else "없음"

    # RAG 검색 결과 섹션
    doc_sections = []
    for i, doc in enumerate(retrieved_documents, 1):
        score = doc.get("score", 0)
        # relevance score가 낮은 문서는 제외
        if score < 0.7:
            continue
        doc_sections.append(
            f"[문서 {i}] (출처: {doc['source']}, 관련도: {score:.2f})\n{doc['text']}"
        )

    docs_section = "\n\n".join(doc_sections) if doc_sections else "관련 문서 없음"

    prompt = f"""{system_instruction}

## 사용자 컨텍스트 (Feature Store 실시간 데이터)
{features_section}

## 참고 문서 (RAG 검색 결과)
{docs_section}

## 지침
- 사용자 컨텍스트와 참고 문서를 모두 활용하여 답변하세요.
- 정형 데이터(사용자 컨텍스트)와 문서 내용이 충돌하면 정형 데이터를 우선합니다.
- 답변에 사용한 참고 문서 번호를 인용하세요.
- 참고 문서에 없는 내용은 추측하지 마세요.

## 사용자 질문
{user_query}"""

    return prompt

RAGOps: 검색 품질 모니터링과 인덱스 갱신 자동화

Feature Store에 Freshness SLA가 있듯이, Vector Store에도 인덱스 최신성 관리가 필요하다. 이것을 RAGOps라고 부른다.

인덱스 Freshness 추적

"""
Vector Store 인덱스의 최신성을 추적하는 모니터링 모듈.
문서 원천의 최신 업데이트 시각과 인덱스의 마지막 갱신 시각을
비교하여 stale 상태를 감지한다.
"""
from datetime import datetime, timedelta
from typing import Dict, List
from dataclasses import dataclass
import json

@dataclass
class IndexFreshnessReport:
    collection_name: str
    total_documents: int
    stale_documents: int
    freshness_ratio: float
    oldest_document_age_hours: float
    avg_document_age_hours: float
    recommendations: List[str]

def check_index_freshness(
    qdrant_client,
    collection_name: str,
    freshness_threshold_hours: int = 24,
    sample_size: int = 1000,
) -> IndexFreshnessReport:
    """
    Vector Store 인덱스의 문서 최신성을 점검한다.
    """
    # 샘플 문서의 updated_at 필드를 조회
    points, _ = qdrant_client.scroll(
        collection_name=collection_name,
        limit=sample_size,
        with_payload=["updated_at", "source"],
    )

    now = datetime.utcnow()
    ages_hours = []
    stale_count = 0
    stale_sources = set()

    for point in points:
        updated_at_str = point.payload.get("updated_at", "")
        if not updated_at_str:
            stale_count += 1
            continue
        try:
            updated_at = datetime.fromisoformat(updated_at_str)
            age_hours = (now - updated_at).total_seconds() / 3600
            ages_hours.append(age_hours)
            if age_hours > freshness_threshold_hours:
                stale_count += 1
                stale_sources.add(point.payload.get("source", "unknown"))
        except ValueError:
            stale_count += 1

    total = len(points)
    freshness_ratio = 1.0 - (stale_count / max(total, 1))

    recommendations = []
    if freshness_ratio < 0.95:
        recommendations.append(
            f"Stale 문서 비율이 {(1-freshness_ratio)*100:.1f}%입니다. "
            f"임베딩 파이프라인 실행이 필요합니다."
        )
    if stale_sources:
        recommendations.append(
            f"Stale 문서 출처: {', '.join(list(stale_sources)[:5])}"
        )
    if ages_hours and max(ages_hours) > freshness_threshold_hours * 3:
        recommendations.append(
            "매우 오래된 문서가 존재합니다. 전체 재인덱싱을 고려하세요."
        )

    return IndexFreshnessReport(
        collection_name=collection_name,
        total_documents=total,
        stale_documents=stale_count,
        freshness_ratio=freshness_ratio,
        oldest_document_age_hours=max(ages_hours) if ages_hours else 0,
        avg_document_age_hours=sum(ages_hours) / len(ages_hours) if ages_hours else 0,
        recommendations=recommendations,
    )

인덱스 갱신 Kubeflow Pipeline

# rag-index-update-pipeline.yaml
# 주기적으로 변경된 문서를 감지하여 벡터 인덱스를 갱신한다.
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: rag-index-updater
  namespace: ml-pipelines
spec:
  schedule: '0 */4 * * *' # 4시간마다 실행
  concurrencyPolicy: Replace
  workflowSpec:
    entrypoint: update-index
    templates:
      - name: update-index
        steps:
          - - name: detect-changes
              template: detect-document-changes
          - - name: chunk-and-embed
              template: run-embedding-pipeline
              arguments:
                parameters:
                  - name: changed-docs
                    value: '{{steps.detect-changes.outputs.parameters.changed-docs}}'
          - - name: verify-freshness
              template: check-freshness

      - name: detect-document-changes
        container:
          image: ml-platform/doc-change-detector:v1.2.0
          command: [python, detect_changes.py]
          args:
            - --since=4h
            - --output=/tmp/changed_docs.json
          env:
            - name: DOC_STORE_URL
              valueFrom:
                secretKeyRef:
                  name: doc-store-credentials
                  key: url

      - name: run-embedding-pipeline
        container:
          image: ml-platform/embedding-worker:v2.1.0
          command: [python, embed_and_upsert.py]
          args:
            - --docs-file={{inputs.parameters.changed-docs}}
            - --collection=product_knowledge
            - --model=text-embedding-3-large
          resources:
            requests:
              nvidia.com/gpu: 1
              memory: 8Gi

      - name: check-freshness
        container:
          image: ml-platform/rag-monitor:v1.0.0
          command: [python, check_freshness.py]
          args:
            - --collection=product_knowledge
            - --threshold-hours=24
            - --alert-on-failure

비용과 성능의 트레이드오프

Feature Store와 Vector Store를 함께 운영할 때 비용 구조를 이해해야 한다.

구성 요소비용 동인절감 전략주의사항
Feast Online Store (Redis)메모리 용량 * 시간TTL 설정으로 미사용 엔티티 제거TTL이 너무 짧으면 cache miss 급증
Vector Store (Qdrant)벡터 차원 수 _ 문서 수 _ 복제본차원 축소 (1536 -> 512), 양자화 적용양자화 시 recall 1-3% 하락
임베딩 API 비용토큰 수 * 호출 횟수변경분만 재임베딩, 배치 처리모델 변경 시 전체 재임베딩 필요
CDC 파이프라인 (Kafka)파티션 수 * 보관 기간압축, 보관 기간 단축보관 기간 < backfill 윈도우이면 재복구 불가
서빙 GPU (LLM)GPU 시간 * 모델 크기배치 추론, 모델 양자화양자화 레벨별 품질 벤치마크 필수

장애 시나리오별 대응 런북

시나리오 1: Vector Store 검색 결과가 갑자기 부정확해짐

증상: RAG 챗봇 답변 품질이 급락. 사용자 피드백 "엉뚱한 답변" 증가.
     Retrieval recall@50.85에서 0.52로 하락.

점검 순서:
  1. 최근 임베딩 파이프라인 실행 로그 확인
     -> 임베딩 모델 버전이 바뀌었는지 체크
  2. Qdrant collection info 확인
     -> vector dimension이 변경되었는지 체크
  3. 원인: 임베딩 모델을 text-embedding-3-small에서 text-embedding-3-large로
     업그레이드했으나 기존 벡터는 재임베딩하지 않아 차원 불일치

해결:
  1. 임베딩 모델 변경 시 반드시 전체 재인덱싱 실행
  2. 새 collection 생성 -> 전체 임베딩 -> alias 전환 (blue-green 패턴)
  3. 재인덱싱 완료 전까지 이전 모델로 롤백

시나리오 2: Feature Store 조회와 Vector 검색 중 하나만 실패

증상: 통합 조회 API에서 간헐적 500 에러.
     에러 로그: "TimeoutError: Feast online store read timed out after 50ms"

원인: Feature Store(Redis)는 타임아웃이지만 Vector Store(Qdrant)는 정상.
     하나의 실패가 전체 요청을 실패시킴.

해결:
  1. 통합 조회 API에 partial failure 허용 로직 추가
  2. Feature Store 실패 시: 기본값(인구통계 평균) 사용, 로그 기록
  3. Vector Store 실패 시: 사전 캐시된 인기 문서로 fallback
  4. 양쪽 모두 실패 시: LLM에 컨텍스트 없이 일반 응답 생성 (품질 저하 경고 포함)

블루프린트 도입 로드맵

이 블루프린트를 한 번에 구현하려 하지 말고, 4단계로 나눠서 도입한다.

Phase 1 (4주): 기반 구축

  • Feast + Redis Online Store 구성
  • Qdrant 단일 노드 배포
  • 통합 조회 API 프로토타입
  • 검증: 단일 use case에서 Feature Store 조회 + Vector 검색 응답 시간 < 100ms

Phase 2 (4주): 파이프라인 자동화

  • CDC 파이프라인 구축 (Feature Store freshness SLA 달성)
  • 임베딩 파이프라인 자동화 (변경 감지 -> 재임베딩)
  • 검증: Feature freshness p95 < 5분, 인덱스 freshness < 4시간

Phase 3 (4주): 품질 모니터링

  • Retrieval 품질 메트릭 (recall@k, MRR) 대시보드
  • Feature Store parity 테스트 CI 통합
  • Alerting 체계 구축
  • 검증: 품질 회귀 시 30분 이내 알림 수신

Phase 4 (4주): 최적화와 확장

  • 벡터 양자화, TTL 최적화, 비용 대시보드
  • 멀티 collection 지원 (문서 유형별 분리)
  • Blue-green 인덱스 전환 자동화
  • 검증: 월 인프라 비용 Phase 2 대비 30% 절감

퀴즈

퀴즈

Q1. Feature Store의 Feature View와 RAG 파이프라인의 청킹 설정이 대칭 관계인 이유는?

||둘 다 "원천 데이터를 어떤 변환을 거쳐 서빙 가능한 형태로 만들 것인가"를 정의하는 선언적 명세이며, 변환 로직의 일관성이 학습-서빙/인덱싱-검색 품질을 결정하기 때문이다.||

Q2. 통합 조회 API에서 Feature Store 조회와 Vector 검색을 병렬로 실행하는 이유와 리스크는?

||전체 응답 시간을 max(Feature Store latency, Vector Store latency)로 줄이기 위해서다. 리스크는 하나가 실패하면 전체가 실패할 수 있으므로 partial failure 허용 로직이 필요하다.||

Q3. 임베딩 모델을 업그레이드할 때 기존 벡터를 재임베딩하지 않으면 어떤 문제가 발생하는가?

||새 모델로 생성한 query 임베딩과 구 모델로 생성한 document 임베딩이 같은 벡터 공간에 있지 않으므로 similarity 계산이 무의미해지고 retrieval recall이 급락한다.||

Q4. Blue-green 인덱스 전환 패턴의 핵심 단계는?

||새 collection에 전체 재임베딩 완료 -> alias를 새 collection으로 전환 -> 이전 collection을 일정 기간 유지 후 삭제. 전환 시점에 다운타임이 없다.||

Q5. RAGOps에서 인덱스 freshness SLA를 설정할 때 고려해야 할 주요 요소는?

||문서 변경 빈도, 임베딩 파이프라인 실행 비용, 비즈니스 요구사항(실시간성 필요 여부), 임베딩 API 호출 비용을 종합적으로 고려해야 한다. FAQ처럼 변경이 드문 문서는 24시간, 상품 리뷰는 4시간 등 문서 유형별로 차별화한다.||

Q6. Feature Store 정형 피처와 RAG 문서 내용이 충돌할 때 정형 데이터를 우선하라고 LLM에 지시하는 이유는?

||Feature Store의 데이터는 실시간 CDC로 갱신되어 최신성이 보장되지만, RAG 문서는 인덱싱 주기에 따라 지연될 수 있다. 실시간 데이터가 더 신뢰할 수 있는 출처이기 때문이다.||

Q7. 블루프린트를 4단계로 나눠 도입하는 가장 큰 이유는?

||각 단계에서 정량적 검증 기준을 만족하는지 확인한 후 다음 단계로 진행해야 하며, 한 번에 구현하면 장애 원인을 분리하기 어렵고 팀의 학습 곡선을 소화할 수 없기 때문이다.||

References

AI Platform: Feature Store and RAGOps Blueprint 2026

AI Platform: Feature Store and RAGOps Blueprint 2026

Blueprint Overview: The Convergence of Feature Store and RAG Pipeline

True to the "Blueprint" title, this article presents a complete design plan for integrating Feature Store and RAG (Retrieval-Augmented Generation) pipeline operations within a single AI platform. Rather than individual tool usage, it addresses the design decisions and operational rules that arise at the junction where these two systems meet.

Feature Store manages structured data features, while the RAG pipeline converts unstructured text into vectors to provide context to LLMs. As of 2026, these two systems are often operated separately, but in actual AI products, use cases that combine them are growing -- such as "the user's recent purchase history (Feature Store) + related product review summaries (RAG)." This article organizes the design principles, implementation patterns, and operational runbooks of this integrated architecture as a blueprint.

Architecture Blueprint

The overall data flow is divided into four layers.

┌──────────────────────────────────────────────────────┐
Layer 4: Serving (KServe / API Gateway)- Deliver Feature lookup + Vector search to LLM- Guardrail, Citation verification, Response filter  │
├──────────────────────────────────────────────────────┤
Layer 3: Feature Store + Vector Store- Feast Online Store (Redis): Structured features    │
- Vector DB (Qdrant/Weaviate): Unstructured embeds   │
- Unified Query API: Single interface for both       │
├──────────────────────────────────────────────────────┤
Layer 2: Ingestion & Transformation- CDC Pipeline (Debezium -> Kafka)- Document chunking + embedding pipeline (Kubeflow)- Feature View definition + Vector Index management  │
├──────────────────────────────────────────────────────┤
Layer 1: Source Data- OLTP DB, Data Warehouse, Document Store, API Logs└──────────────────────────────────────────────────────┘

Unified Query API Design

The unified API that queries both Feature Store and Vector Store in a single request is the core of this blueprint.

"""
Service that unifies Feature Store (Feast) and Vector Store (Qdrant) queries.
Used in the serving layer to provide context to models or LLMs.
"""
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
import asyncio
from feast import FeatureStore
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue

@dataclass
class UnifiedContext:
    """Unified context combining Feature Store features + Vector search results"""
    entity_id: str
    structured_features: Dict[str, Any]          # Retrieved from Feature Store
    retrieved_documents: List[Dict[str, Any]]    # Searched from Vector Store
    feature_freshness_ms: float = 0.0
    retrieval_latency_ms: float = 0.0

class UnifiedContextService:
    def __init__(
        self,
        feast_repo_path: str,
        qdrant_url: str,
        qdrant_collection: str,
    ):
        self.feast_store = FeatureStore(repo_path=feast_repo_path)
        self.qdrant = QdrantClient(url=qdrant_url, timeout=5.0)
        self.collection = qdrant_collection

    async def get_context(
        self,
        entity_id: str,
        query_embedding: List[float],
        feature_list: List[str],
        top_k: int = 5,
        metadata_filter: Optional[Dict[str, str]] = None,
    ) -> UnifiedContext:
        """
        Performs Feature Store lookup and Vector search in parallel
        to return a unified context.
        """
        # Parallel execution: Feature Store lookup + Vector search
        feature_task = asyncio.to_thread(
            self._fetch_features, entity_id, feature_list
        )
        vector_task = asyncio.to_thread(
            self._search_vectors, query_embedding, top_k, metadata_filter, entity_id
        )

        features_result, vectors_result = await asyncio.gather(
            feature_task, vector_task
        )

        return UnifiedContext(
            entity_id=entity_id,
            structured_features=features_result["features"],
            retrieved_documents=vectors_result["documents"],
            feature_freshness_ms=features_result["latency_ms"],
            retrieval_latency_ms=vectors_result["latency_ms"],
        )

    def _fetch_features(self, entity_id: str, feature_list: List[str]) -> dict:
        import time
        start = time.monotonic()
        result = self.feast_store.get_online_features(
            features=feature_list,
            entity_rows=[{"user_id": entity_id}],
        ).to_dict()
        elapsed = (time.monotonic() - start) * 1000

        # Convert list values to scalars in the dict
        features = {k: v[0] if isinstance(v, list) and v else v for k, v in result.items()}
        return {"features": features, "latency_ms": elapsed}

    def _search_vectors(
        self,
        query_embedding: List[float],
        top_k: int,
        metadata_filter: Optional[Dict[str, str]],
        entity_id: str,
    ) -> dict:
        import time
        start = time.monotonic()

        search_filter = None
        if metadata_filter:
            conditions = [
                FieldCondition(key=k, match=MatchValue(value=v))
                for k, v in metadata_filter.items()
            ]
            search_filter = Filter(must=conditions)

        results = self.qdrant.search(
            collection_name=self.collection,
            query_vector=query_embedding,
            query_filter=search_filter,
            limit=top_k,
            with_payload=True,
        )
        elapsed = (time.monotonic() - start) * 1000

        documents = [
            {
                "text": hit.payload.get("text", ""),
                "source": hit.payload.get("source", ""),
                "score": hit.score,
                "chunk_id": hit.id,
            }
            for hit in results
        ]
        return {"documents": documents, "latency_ms": elapsed}

Document Embedding Pipeline: Chunking Strategy and Index Management

In the RAG pipeline, the process of converting documents to vectors is a concept symmetric to the Feature View definition in Feature Store. Just as a Feature View defines "what transformations to apply to create features," the embedding pipeline defines "what chunking and embedding to apply to create vectors."

"""
Document chunking + embedding pipeline.
Can be run as a Kubeflow Pipeline component or as a standalone script.
"""
from dataclasses import dataclass
from typing import List, Generator
import hashlib
import json

@dataclass
class ChunkConfig:
    """Chunking configuration. Apply different strategies per document type."""
    chunk_size: int = 512          # Token-based
    chunk_overlap: int = 64        # Overlap between chunks
    separators: tuple = ("\n\n", "\n", ". ", " ")
    min_chunk_size: int = 50       # Merge with previous chunk if under this
    metadata_fields: tuple = ("source", "doc_type", "updated_at")

# Per document type chunking strategies
CHUNK_CONFIGS = {
    "product_review": ChunkConfig(chunk_size=256, chunk_overlap=32),
    "technical_doc": ChunkConfig(chunk_size=512, chunk_overlap=64),
    "faq": ChunkConfig(chunk_size=128, chunk_overlap=0),  # No overlap needed for FAQ
    "legal": ChunkConfig(chunk_size=1024, chunk_overlap=128),
}

def recursive_split(text: str, config: ChunkConfig) -> List[str]:
    """Recursive text splitting. Tries separators in order."""
    if len(text.split()) <= config.chunk_size:
        return [text] if len(text.split()) >= config.min_chunk_size else []

    for sep in config.separators:
        parts = text.split(sep)
        if len(parts) > 1:
            chunks = []
            current = ""
            for part in parts:
                candidate = current + sep + part if current else part
                if len(candidate.split()) > config.chunk_size:
                    if current:
                        chunks.append(current.strip())
                    current = part
                else:
                    current = candidate
            if current:
                chunks.append(current.strip())
            return [c for c in chunks if len(c.split()) >= config.min_chunk_size]

    # Force split if no separator works
    words = text.split()
    return [
        " ".join(words[i:i + config.chunk_size])
        for i in range(0, len(words), config.chunk_size - config.chunk_overlap)
    ]

def create_chunk_id(source: str, chunk_index: int, text: str) -> str:
    """Deterministic chunk ID generation. Same document + same chunk = same ID."""
    content_hash = hashlib.sha256(text.encode()).hexdigest()[:12]
    return f"{source}::{chunk_index}::{content_hash}"

def process_document(
    text: str,
    source: str,
    doc_type: str,
    metadata: dict,
    embedding_fn,
) -> List[dict]:
    """
    Chunks and embeds a single document, returning a list of records
    ready to upsert into the Vector Store.
    """
    config = CHUNK_CONFIGS.get(doc_type, ChunkConfig())
    chunks = recursive_split(text, config)

    records = []
    for i, chunk_text in enumerate(chunks):
        embedding = embedding_fn(chunk_text)
        chunk_id = create_chunk_id(source, i, chunk_text)
        records.append({
            "id": chunk_id,
            "vector": embedding,
            "payload": {
                "text": chunk_text,
                "source": source,
                "doc_type": doc_type,
                "chunk_index": i,
                "total_chunks": len(chunks),
                "chunk_size_tokens": len(chunk_text.split()),
                **{k: metadata.get(k, "") for k in config.metadata_fields},
            },
        })
    return records

Combining Feature Store Features and RAG Results

How you combine Feature Store structured features with RAG search results when composing the prompt sent to the LLM determines the response quality.

"""
Pattern for combining Feature Store features + RAG search results
to compose an LLM prompt.
"""
from typing import Dict, List, Any

def build_augmented_prompt(
    user_query: str,
    structured_features: Dict[str, Any],
    retrieved_documents: List[Dict[str, Any]],
    system_instruction: str = "",
) -> str:
    """
    Generate a prompt combining structured features and unstructured search results.

    Principles:
    1. Structured features are provided in table format in a "User Context" section
    2. RAG search results are provided in a "Reference Documents" section with sources
    3. Explicitly state that structured data (real-time) takes priority when there is a conflict
    """
    # Structured features section
    feature_lines = []
    feature_display_names = {
        "total_purchases_7d": "Purchases in last 7 days",
        "avg_order_value_30d": "Avg order value (30 days)",
        "preferred_category": "Preferred category",
        "membership_tier": "Membership tier",
    }
    for key, value in structured_features.items():
        display = feature_display_names.get(key, key)
        feature_lines.append(f"- {display}: {value}")

    features_section = "\n".join(feature_lines) if feature_lines else "None"

    # RAG search results section
    doc_sections = []
    for i, doc in enumerate(retrieved_documents, 1):
        score = doc.get("score", 0)
        # Exclude documents with low relevance scores
        if score < 0.7:
            continue
        doc_sections.append(
            f"[Document {i}] (Source: {doc['source']}, Relevance: {score:.2f})\n{doc['text']}"
        )

    docs_section = "\n\n".join(doc_sections) if doc_sections else "No relevant documents found"

    prompt = f"""{system_instruction}

## User Context (Feature Store Real-Time Data)
{features_section}

## Reference Documents (RAG Search Results)
{docs_section}

## Instructions
- Use both user context and reference documents to answer.
- When structured data (user context) conflicts with document content, prioritize the structured data.
- Cite the reference document numbers used in your answer.
- Do not speculate about content not found in the reference documents.

## User Question
{user_query}"""

    return prompt

RAGOps: Search Quality Monitoring and Index Update Automation

Just as Feature Store has Freshness SLAs, Vector Store also requires index freshness management. This is called RAGOps.

Index Freshness Tracking

"""
Monitoring module that tracks Vector Store index freshness.
Compares the latest update time of the document source with the
last update time of the index to detect stale states.
"""
from datetime import datetime, timedelta
from typing import Dict, List
from dataclasses import dataclass
import json

@dataclass
class IndexFreshnessReport:
    collection_name: str
    total_documents: int
    stale_documents: int
    freshness_ratio: float
    oldest_document_age_hours: float
    avg_document_age_hours: float
    recommendations: List[str]

def check_index_freshness(
    qdrant_client,
    collection_name: str,
    freshness_threshold_hours: int = 24,
    sample_size: int = 1000,
) -> IndexFreshnessReport:
    """
    Checks the document freshness of a Vector Store index.
    """
    # Query updated_at fields from sample documents
    points, _ = qdrant_client.scroll(
        collection_name=collection_name,
        limit=sample_size,
        with_payload=["updated_at", "source"],
    )

    now = datetime.utcnow()
    ages_hours = []
    stale_count = 0
    stale_sources = set()

    for point in points:
        updated_at_str = point.payload.get("updated_at", "")
        if not updated_at_str:
            stale_count += 1
            continue
        try:
            updated_at = datetime.fromisoformat(updated_at_str)
            age_hours = (now - updated_at).total_seconds() / 3600
            ages_hours.append(age_hours)
            if age_hours > freshness_threshold_hours:
                stale_count += 1
                stale_sources.add(point.payload.get("source", "unknown"))
        except ValueError:
            stale_count += 1

    total = len(points)
    freshness_ratio = 1.0 - (stale_count / max(total, 1))

    recommendations = []
    if freshness_ratio < 0.95:
        recommendations.append(
            f"Stale document ratio is {(1-freshness_ratio)*100:.1f}%. "
            f"Embedding pipeline execution required."
        )
    if stale_sources:
        recommendations.append(
            f"Stale document sources: {', '.join(list(stale_sources)[:5])}"
        )
    if ages_hours and max(ages_hours) > freshness_threshold_hours * 3:
        recommendations.append(
            "Very old documents exist. Consider a full re-indexing."
        )

    return IndexFreshnessReport(
        collection_name=collection_name,
        total_documents=total,
        stale_documents=stale_count,
        freshness_ratio=freshness_ratio,
        oldest_document_age_hours=max(ages_hours) if ages_hours else 0,
        avg_document_age_hours=sum(ages_hours) / len(ages_hours) if ages_hours else 0,
        recommendations=recommendations,
    )

Index Update Kubeflow Pipeline

# rag-index-update-pipeline.yaml
# Periodically detects changed documents and updates the vector index.
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: rag-index-updater
  namespace: ml-pipelines
spec:
  schedule: '0 */4 * * *' # Run every 4 hours
  concurrencyPolicy: Replace
  workflowSpec:
    entrypoint: update-index
    templates:
      - name: update-index
        steps:
          - - name: detect-changes
              template: detect-document-changes
          - - name: chunk-and-embed
              template: run-embedding-pipeline
              arguments:
                parameters:
                  - name: changed-docs
                    value: '{{steps.detect-changes.outputs.parameters.changed-docs}}'
          - - name: verify-freshness
              template: check-freshness

      - name: detect-document-changes
        container:
          image: ml-platform/doc-change-detector:v1.2.0
          command: [python, detect_changes.py]
          args:
            - --since=4h
            - --output=/tmp/changed_docs.json
          env:
            - name: DOC_STORE_URL
              valueFrom:
                secretKeyRef:
                  name: doc-store-credentials
                  key: url

      - name: run-embedding-pipeline
        container:
          image: ml-platform/embedding-worker:v2.1.0
          command: [python, embed_and_upsert.py]
          args:
            - --docs-file={{inputs.parameters.changed-docs}}
            - --collection=product_knowledge
            - --model=text-embedding-3-large
          resources:
            requests:
              nvidia.com/gpu: 1
              memory: 8Gi

      - name: check-freshness
        container:
          image: ml-platform/rag-monitor:v1.0.0
          command: [python, check_freshness.py]
          args:
            - --collection=product_knowledge
            - --threshold-hours=24
            - --alert-on-failure

Cost and Performance Trade-offs

When operating Feature Store and Vector Store together, you need to understand the cost structure.

ComponentCost DriverSavings StrategyCaveats
Feast Online Store (Redis)Memory capacity * timeRemove unused entities with TTLToo-short TTL causes cache miss spikes
Vector Store (Qdrant)Vector dims _ doc count _ replicasDimension reduction (1536 -> 512), quantize1-3% recall drop with quantization
Embedding API costToken count * call countRe-embed only changes, batch processingFull re-embedding needed on model change
CDC Pipeline (Kafka)Partition count * retention periodCompression, shorten retentionRecovery impossible if retention is under backfill window
Serving GPU (LLM)GPU hours * model sizeBatch inference, model quantizationQuality benchmark required per quantization level

Failure Scenario Runbooks

Scenario 1: Vector Store Search Results Suddenly Become Inaccurate

Symptom: RAG chatbot answer quality drops sharply. User feedback of "irrelevant answers" increases.
         Retrieval recall@5 drops from 0.85 to 0.52.

Investigation order:
  1. Check recent embedding pipeline execution logs
     -> Check if embedding model version changed
  2. Check Qdrant collection info
     -> Check if vector dimension changed
  3. Root cause: Upgraded embedding model from text-embedding-3-small to text-embedding-3-large
     but did not re-embed existing vectors, causing dimension mismatch

Resolution:
  1. Always run full re-indexing when changing embedding models
  2. Create new collection -> full embedding -> alias switch (blue-green pattern)
  3. Roll back to previous model until re-indexing is complete

Scenario 2: Only One of Feature Store Lookup or Vector Search Fails

Symptom: Intermittent 500 errors from the unified query API.
         Error log: "TimeoutError: Feast online store read timed out after 50ms"

Root cause: Feature Store (Redis) is timing out but Vector Store (Qdrant) is fine.
            One failure causes the entire request to fail.

Resolution:
  1. Add partial failure tolerance logic to the unified query API
  2. On Feature Store failure: use default values (demographic averages), log the event
  3. On Vector Store failure: fall back to pre-cached popular documents
  4. On both failures: generate general response without context via LLM (include quality degradation warning)

Blueprint Adoption Roadmap

Do not try to implement this blueprint all at once. Adopt it in 4 phases.

Phase 1 (4 weeks): Foundation Setup

  • Set up Feast + Redis Online Store
  • Deploy Qdrant single node
  • Unified query API prototype
  • Validation: Feature Store lookup + Vector search response time under 100ms for a single use case

Phase 2 (4 weeks): Pipeline Automation

  • Build CDC pipeline (achieve Feature Store freshness SLA)
  • Automate embedding pipeline (change detection -> re-embedding)
  • Validation: Feature freshness p95 under 5 minutes, index freshness under 4 hours

Phase 3 (4 weeks): Quality Monitoring

  • Retrieval quality metrics (recall@k, MRR) dashboard
  • Feature Store parity test CI integration
  • Alerting system setup
  • Validation: Receive alerts within 30 minutes of quality regression

Phase 4 (4 weeks): Optimization and Scaling

  • Vector quantization, TTL optimization, cost dashboard
  • Multi-collection support (separated by document type)
  • Blue-green index switching automation
  • Validation: Monthly infra cost reduced 30% compared to Phase 2

Quiz

Quiz

Q1. Why are Feature Store's Feature View and RAG pipeline's chunking configuration in a symmetric relationship?

Both are declarative specifications that define "how to transform source data into a servable format," and the consistency of the transformation logic determines training-serving / indexing-retrieval quality.

Q2. What is the reason and risk of executing Feature Store lookup and Vector search in parallel in the unified query API?

To reduce overall response time to max(Feature Store latency, Vector Store latency). The risk is that if one fails, the entire request can fail, so partial failure tolerance logic is needed.

Q3. What problem occurs when upgrading the embedding model without re-embedding existing vectors?

The query embedding generated by the new model and the document embeddings generated by the old model are not in the same vector space, making similarity calculations meaningless and causing retrieval recall to plummet.

Q4. What are the key steps of the blue-green index switching pattern?

Complete full re-embedding in the new collection -> switch alias to the new collection -> keep the old collection for a period then delete. There is no downtime at the switching point.

Q5. What are the key factors to consider when setting an index freshness SLA in RAGOps?

Document change frequency, embedding pipeline execution cost, business requirements (whether real-time nature is needed), and embedding API call cost must be considered comprehensively. Documents with rare changes like FAQs can use 24 hours, while product reviews can use 4 hours -- differentiate by document type.

Q6. Why instruct the LLM to prioritize structured data when Feature Store structured features and RAG document content conflict?

Feature Store data is refreshed via real-time CDC, guaranteeing freshness, while RAG documents may be delayed by the indexing cycle. Real-time data is the more trustworthy source.

Q7. What is the biggest reason for adopting the blueprint in 4 phases?

Each phase requires verifying that quantitative validation criteria are met before proceeding to the next, and implementing everything at once makes it difficult to isolate failure causes and the team cannot absorb the learning curve.

References