Skip to content

Split View: 데이터베이스 엔지니어링 완전 정복: SQL부터 벡터DB, AI RAG 시스템까지

|

데이터베이스 엔지니어링 완전 정복: SQL부터 벡터DB, AI RAG 시스템까지

데이터베이스 엔지니어링 완전 정복: SQL부터 벡터DB, AI RAG 시스템까지

AI 시대에도 데이터베이스 엔지니어링은 여전히 모든 시스템의 근간입니다. LLM이 아무리 강력해도 데이터를 안전하게 저장하고 빠르게 조회하는 능력 없이는 프로덕션 시스템을 만들 수 없습니다. 특히 벡터 검색과 RAG 시스템의 등장으로 데이터베이스 엔지니어링의 역할은 오히려 더 중요해졌습니다.

이 가이드는 SQL 고급 기법부터 PostgreSQL 실전, NoSQL, 벡터 데이터베이스, 분산 DB 이론, 그리고 LLM + DB 통합 패턴까지 체계적으로 다룹니다.


1. 관계형 DB 핵심: SQL 고급 기법

1.1 Window Functions (윈도우 함수)

윈도우 함수는 집계 함수와 달리 행을 그룹화하지 않고 각 행의 컨텍스트를 유지하면서 계산을 수행합니다. 분석 쿼리에서 필수적입니다.

-- 부서별 급여 순위와 누적 합계
SELECT
    employee_id,
    name,
    department,
    salary,
    RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS dept_rank,
    SUM(salary) OVER (PARTITION BY department) AS dept_total,
    AVG(salary) OVER (
        PARTITION BY department
        ORDER BY hire_date
        ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
    ) AS rolling_avg_3
FROM employees;

주요 윈도우 함수:

  • ROW_NUMBER(): 중복 없는 순서 번호
  • RANK(): 동점 시 같은 순위, 다음 순위 건너뜀
  • DENSE_RANK(): 동점 시 같은 순위, 다음 순위 연속
  • LAG() / LEAD(): 이전/다음 행 값 참조
  • FIRST_VALUE() / LAST_VALUE(): 윈도우 내 첫/마지막 값
-- 월별 매출 증감률 계산
SELECT
    month,
    revenue,
    LAG(revenue, 1) OVER (ORDER BY month) AS prev_month,
    ROUND(
        (revenue - LAG(revenue, 1) OVER (ORDER BY month)) * 100.0
        / NULLIF(LAG(revenue, 1) OVER (ORDER BY month), 0),
        2
    ) AS growth_rate_pct
FROM monthly_sales;

1.2 CTE (Common Table Expressions)

CTE는 복잡한 쿼리를 단계별로 분리하여 가독성과 재사용성을 높입니다. 재귀 CTE는 계층 구조 탐색에 강력합니다.

-- 재귀 CTE로 조직도 탐색
WITH RECURSIVE org_tree AS (
    -- Base case: 최상위 직원
    SELECT
        employee_id,
        name,
        manager_id,
        0 AS depth,
        name::TEXT AS path
    FROM employees
    WHERE manager_id IS NULL

    UNION ALL

    -- Recursive case
    SELECT
        e.employee_id,
        e.name,
        e.manager_id,
        ot.depth + 1,
        ot.path || ' > ' || e.name
    FROM employees e
    INNER JOIN org_tree ot ON e.manager_id = ot.employee_id
)
SELECT employee_id, name, depth, path
FROM org_tree
ORDER BY path;
-- 복잡한 분석을 단계별 CTE로 분리
WITH
top_customers AS (
    SELECT customer_id, SUM(amount) AS total_spent
    FROM orders
    WHERE created_at >= NOW() - INTERVAL '90 days'
    GROUP BY customer_id
    HAVING SUM(amount) > 1000
),
customer_details AS (
    SELECT c.*, tc.total_spent
    FROM customers c
    JOIN top_customers tc ON c.id = tc.customer_id
),
ranked AS (
    SELECT *,
           NTILE(4) OVER (ORDER BY total_spent DESC) AS quartile
    FROM customer_details
)
SELECT * FROM ranked WHERE quartile = 1;

1.3 실행 계획 분석 (EXPLAIN ANALYZE)

쿼리 성능 문제의 근본 원인을 파악하려면 실행 계획을 읽을 줄 알아야 합니다.

EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT u.name, COUNT(o.id) AS order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at > '2025-01-01'
GROUP BY u.id, u.name
ORDER BY order_count DESC
LIMIT 10;

실행 계획 출력 해석:

Limit  (cost=1234.56..1234.57 rows=10 width=72) (actual time=45.123..45.125 rows=10 loops=1)
  ->  Sort  (cost=1234.56..1259.56 rows=10000 width=72) (actual time=45.120..45.121 rows=10 loops=1)
        Sort Key: (count(o.id)) DESC
        Sort Method: top-N heapsort  Memory: 25kB
        ->  HashAggregate  (cost=876.00..976.00 rows=10000 width=72) (actual time=38.456..42.234 rows=8523 loops=1)
              Group Key: u.id
              Batches: 1  Memory Usage: 1553kB
              ->  Hash Left Join  (cost=345.00..801.00 rows=15000 width=40) (actual time=5.678..28.901 rows=15000 loops=1)
                    Hash Cond: (o.user_id = u.id)
                    ->  Seq Scan on orders o  (cost=0.00..312.00 rows=15000 width=16) (actual time=0.023..8.456 rows=15000 loops=1)
                    ->  Hash  (cost=270.00..270.00 rows=6000 width=32) (actual time=5.234..5.234 rows=6000 loops=1)
                          Buckets: 8192  Batches: 1  Memory Usage: 358kB
                          ->  Index Scan using idx_users_created_at on users u  (cost=0.29..270.00 rows=6000 width=32)
Planning Time: 1.234 ms
Execution Time: 45.456 ms

핵심 지표:

  • Seq Scan vs Index Scan: Seq Scan이 나오면 인덱스 필요 여부 검토
  • actual rows vs rows 추정치 차이가 크면 통계 업데이트 필요 (ANALYZE)
  • Buffers: shared hit vs read: 캐시 적중률 확인
  • loops: 중첩 루프 횟수, 높으면 성능 저하

1.4 인덱스 설계 전략

-- 복합 인덱스: 선택도 높은 컬럼을 앞에
CREATE INDEX idx_orders_user_status_date
ON orders (user_id, status, created_at DESC);

-- 부분 인덱스: 특정 조건만 인덱싱 (공간 절약)
CREATE INDEX idx_active_users
ON users (email)
WHERE deleted_at IS NULL AND status = 'active';

-- 표현식 인덱스: 함수 결과 인덱싱
CREATE INDEX idx_users_lower_email
ON users (LOWER(email));

-- BRIN 인덱스: 시계열 데이터에 효율적 (크기 매우 작음)
CREATE INDEX idx_logs_timestamp_brin
ON application_logs USING BRIN (created_at);

-- GIN 인덱스: 배열, JSONB, 전문검색에 적합
CREATE INDEX idx_products_tags_gin
ON products USING GIN (tags);

1.5 트랜잭션과 ACID

-- 트랜잭션 격리 수준 설정
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;

BEGIN;
  -- 계좌 이체: 원자성 보장
  UPDATE accounts SET balance = balance - 500 WHERE id = 1;
  UPDATE accounts SET balance = balance + 500 WHERE id = 2;

  -- 잔액 검증
  DO $$
  DECLARE
    bal NUMERIC;
  BEGIN
    SELECT balance INTO bal FROM accounts WHERE id = 1;
    IF bal < 0 THEN
      RAISE EXCEPTION 'Insufficient funds';
    END IF;
  END $$;

COMMIT;

격리 수준별 발생 가능한 문제:

격리 수준Dirty ReadNon-repeatable ReadPhantom Read
READ UNCOMMITTED가능가능가능
READ COMMITTED방지가능가능
REPEATABLE READ방지방지가능
SERIALIZABLE방지방지방지

2. PostgreSQL 실전: 고급 기능

2.1 JSONB와 반정형 데이터

PostgreSQL의 JSONB는 JSON을 바이너리 형식으로 저장하여 빠른 조회와 인덱싱을 지원합니다.

-- JSONB 컬럼 생성과 GIN 인덱스
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name TEXT NOT NULL,
    metadata JSONB NOT NULL DEFAULT '{}'
);

CREATE INDEX idx_products_metadata ON products USING GIN (metadata);

-- JSONB 연산자
-- ->> : 텍스트로 추출
-- -> : JSON으로 추출
-- @> : containment 체크
-- ? : 키 존재 여부

SELECT * FROM products
WHERE metadata @> '{"category": "electronics", "in_stock": true}';

SELECT
    name,
    metadata->>'brand' AS brand,
    (metadata->>'price')::NUMERIC AS price,
    metadata->'specs'->>'cpu' AS cpu
FROM products
WHERE metadata ? 'discount_pct'
  AND (metadata->>'discount_pct')::NUMERIC > 10;

-- JSONB 업데이트: 특정 키만 변경
UPDATE products
SET metadata = jsonb_set(
    metadata,
    '{price}',
    '29900'::jsonb
)
WHERE id = 42;

-- JSONB 경로 쿼리
SELECT jsonb_path_query(
    metadata,
    '$.specs.memory ? (@ > 16)'
) FROM products;

2.2 파티셔닝 (Table Partitioning)

-- 시계열 데이터 레인지 파티셔닝
CREATE TABLE events (
    id BIGSERIAL,
    user_id INT,
    event_type TEXT,
    payload JSONB,
    created_at TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (created_at);

-- 월별 파티션 자동 생성 함수
CREATE OR REPLACE FUNCTION create_monthly_partition(target_date DATE)
RETURNS VOID AS $$
DECLARE
    partition_name TEXT;
    start_date DATE;
    end_date DATE;
BEGIN
    start_date := DATE_TRUNC('month', target_date);
    end_date := start_date + INTERVAL '1 month';
    partition_name := 'events_' || TO_CHAR(start_date, 'YYYY_MM');

    EXECUTE FORMAT(
        'CREATE TABLE IF NOT EXISTS %I PARTITION OF events
         FOR VALUES FROM (%L) TO (%L)',
        partition_name, start_date, end_date
    );
END;
$$ LANGUAGE plpgsql;

SELECT create_monthly_partition('2026-03-01');
SELECT create_monthly_partition('2026-04-01');

-- 파티션 프루닝 확인
EXPLAIN SELECT * FROM events
WHERE created_at BETWEEN '2026-03-01' AND '2026-03-31';

2.3 논리 복제 (Logical Replication)

-- Publisher 설정
ALTER SYSTEM SET wal_level = logical;

CREATE PUBLICATION app_publication
FOR TABLE users, orders, products
WITH (publish = 'insert, update, delete');

-- Subscriber에서 구독 설정
CREATE SUBSCRIPTION app_subscription
CONNECTION 'host=primary-db port=5432 dbname=myapp user=replicator'
PUBLICATION app_publication;

-- 복제 상태 모니터링
SELECT
    subname,
    received_lsn,
    latest_end_lsn,
    latest_end_time
FROM pg_stat_subscription;

3. NoSQL 데이터베이스 실전

3.1 Redis: 캐싱 패턴

Redis는 인메모리 데이터 구조 저장소로, 캐싱, 세션 관리, 메시지 큐에 널리 사용됩니다.

import redis
import json
import hashlib
from functools import wraps
from typing import Any, Optional

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# Cache-Aside 패턴 (Lazy Loading)
def get_user_profile(user_id: int) -> dict:
    cache_key = f"user:profile:{user_id}"

    # 1. 캐시 확인
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    # 2. DB에서 조회
    user = db.query("SELECT * FROM users WHERE id = %s", user_id)

    # 3. 캐시에 저장 (TTL 1시간)
    r.setex(cache_key, 3600, json.dumps(user))
    return user

# Write-Through 패턴: 쓰기 시 캐시도 동시 업데이트
def update_user_profile(user_id: int, data: dict) -> None:
    cache_key = f"user:profile:{user_id}"

    # DB 업데이트
    db.execute("UPDATE users SET ... WHERE id = %s", user_id)

    # 캐시도 즉시 업데이트
    updated = get_user_from_db(user_id)
    r.setex(cache_key, 3600, json.dumps(updated))

# Write-Behind (Write-Back) 패턴: 비동기 DB 저장
class WriteBehindCache:
    def __init__(self):
        self.dirty_keys_set = "cache:dirty_keys"

    def write(self, key: str, value: dict, ttl: int = 3600):
        # 캐시에 즉시 쓰기
        r.setex(key, ttl, json.dumps(value))
        # 더티 키 목록에 추가 (나중에 DB에 flush)
        r.sadd(self.dirty_keys_set, key)

    def flush_to_db(self):
        dirty_keys = r.smembers(self.dirty_keys_set)
        for key in dirty_keys:
            data = r.get(key)
            if data:
                db.upsert(json.loads(data))
                r.srem(self.dirty_keys_set, key)

# 분산 락 (Redlock)
import time

def acquire_lock(lock_name: str, timeout: int = 10) -> Optional[str]:
    identifier = str(time.time())
    lock_key = f"lock:{lock_name}"
    acquired = r.set(lock_key, identifier, nx=True, ex=timeout)
    return identifier if acquired else None

def release_lock(lock_name: str, identifier: str) -> bool:
    lock_key = f"lock:{lock_name}"
    lua_script = """
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end
    """
    result = r.eval(lua_script, 1, lock_key, identifier)
    return bool(result)

Redis 데이터 구조 활용:

# Sorted Set: 실시간 랭킹 보드
def update_score(player: str, score: int):
    r.zadd("leaderboard", {player: score})

def get_top_players(n: int = 10):
    return r.zrevrange("leaderboard", 0, n-1, withscores=True)

# HyperLogLog: 유니크 방문자 수 (근사값, 메모리 효율적)
def track_visitor(page: str, user_id: str):
    r.pfadd(f"visitors:{page}:{today()}", user_id)

def get_unique_visitors(page: str, date: str) -> int:
    return r.pfcount(f"visitors:{page}:{date}")

# Pub/Sub: 실시간 알림
import threading

def publisher():
    for i in range(10):
        r.publish("notifications", json.dumps({"type": "alert", "msg": f"Event {i}"}))

def subscriber():
    pubsub = r.pubsub()
    pubsub.subscribe("notifications")
    for message in pubsub.listen():
        if message['type'] == 'message':
            data = json.loads(message['data'])
            print(f"Received: {data}")

3.2 MongoDB: 도큐먼트 모델링

from pymongo import MongoClient, ASCENDING, DESCENDING
from datetime import datetime

client = MongoClient('mongodb://localhost:27017/')
db = client['ecommerce']

# 임베디드 도큐먼트 vs 레퍼런스 설계
# 임베디드: 자주 함께 조회되는 데이터
orders_collection = db['orders']
sample_order = {
    "_id": "order_12345",
    "user_id": "user_67890",
    "status": "shipped",
    "created_at": datetime.utcnow(),
    # 주소는 임베디드 (변경 이력 보존)
    "shipping_address": {
        "street": "서울시 강남구 테헤란로 123",
        "city": "서울",
        "zip": "06234"
    },
    # 상품 스냅샷 임베디드 (가격 변동 보존)
    "items": [
        {"product_id": "p001", "name": "노트북", "price": 1200000, "qty": 1},
        {"product_id": "p002", "name": "마우스", "price": 35000, "qty": 2}
    ],
    "total": 1270000
}

# Aggregation Pipeline
pipeline = [
    # 1단계: 최근 30일 주문 필터
    {"$match": {
        "created_at": {"$gte": datetime(2026, 2, 17)},
        "status": {"$in": ["delivered", "shipped"]}
    }},
    # 2단계: 아이템 언와인드
    {"$unwind": "$items"},
    # 3단계: 상품별 집계
    {"$group": {
        "_id": "$items.product_id",
        "product_name": {"$first": "$items.name"},
        "total_qty": {"$sum": "$items.qty"},
        "total_revenue": {"$sum": {"$multiply": ["$items.price", "$items.qty"]}}
    }},
    # 4단계: 매출 기준 정렬
    {"$sort": {"total_revenue": -1}},
    # 5단계: 상위 10개
    {"$limit": 10},
    # 6단계: 결과 재구성
    {"$project": {
        "product_id": "$_id",
        "product_name": 1,
        "total_qty": 1,
        "total_revenue": 1,
        "_id": 0
    }}
]

top_products = list(orders_collection.aggregate(pipeline))

3.3 Cassandra: Wide-Column 모델

Cassandra는 쓰기 집약적이고 지리적으로 분산된 대규모 시스템에 적합합니다.

-- Cassandra CQL: 쿼리 패턴 중심 테이블 설계
-- "쿼리를 먼저 정의하고, 테이블을 거기에 맞춘다"

-- 사용자별 타임라인 조회 쿼리를 위한 테이블
CREATE TABLE user_timeline (
    user_id UUID,
    created_at TIMEUUID,
    post_id UUID,
    content TEXT,
    likes COUNTER,
    PRIMARY KEY (user_id, created_at)
) WITH CLUSTERING ORDER BY (created_at DESC)
  AND compaction = {
    'class': 'TimeWindowCompactionStrategy',
    'compaction_window_unit': 'DAYS',
    'compaction_window_size': 7
  };

-- 태그별 게시물 검색 테이블 (별도로 비정규화)
CREATE TABLE posts_by_tag (
    tag TEXT,
    created_at TIMEUUID,
    post_id UUID,
    user_id UUID,
    title TEXT,
    PRIMARY KEY (tag, created_at, post_id)
) WITH CLUSTERING ORDER BY (created_at DESC);

4. 벡터 데이터베이스: AI 시대의 핵심

4.1 pgvector: PostgreSQL에서 벡터 검색

-- pgvector 확장 설치
CREATE EXTENSION IF NOT EXISTS vector;

-- 임베딩 저장 테이블
CREATE TABLE document_embeddings (
    id BIGSERIAL PRIMARY KEY,
    content TEXT NOT NULL,
    metadata JSONB DEFAULT '{}',
    embedding vector(1536),  -- OpenAI text-embedding-3-small 차원
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- HNSW 인덱스 (고속 ANN 검색)
CREATE INDEX idx_doc_embeddings_hnsw
ON document_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- 코사인 유사도 검색
SELECT
    id,
    content,
    metadata,
    1 - (embedding <=> '[0.1, 0.2, ...]'::vector) AS similarity
FROM document_embeddings
ORDER BY embedding <=> '[0.1, 0.2, ...]'::vector
LIMIT 5;

-- 메타데이터 필터링과 벡터 검색 결합 (Hybrid Search)
SELECT
    id,
    content,
    metadata->>'source' AS source,
    1 - (embedding <=> query_embedding) AS similarity
FROM document_embeddings
WHERE
    metadata->>'language' = 'ko'
    AND metadata->>'category' = 'technical'
    AND (embedding <=> query_embedding) < 0.3  -- 임계값 필터
ORDER BY embedding <=> query_embedding
LIMIT 10;

벡터 거리 연산자:

  • <=>: 코사인 거리 (텍스트 임베딩에 가장 적합)
  • <->: L2 유클리드 거리 (이미지 특징 등)
  • <#>: 내적 (Inner Product, 정규화된 벡터에서 코사인과 동일)

4.2 IVFFlat vs HNSW 인덱스 비교

-- IVFFlat: 빌드가 빠르고 메모리 효율적
-- 대용량 데이터셋 초기 구축에 적합
CREATE INDEX idx_ivfflat
ON document_embeddings
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);  -- sqrt(행 수) 권장

-- 검색 품질 조정
SET ivfflat.probes = 10;  -- 검색 클러스터 수 (높을수록 정확, 느림)

-- HNSW: 검색 속도가 빠르고 recall 높음
-- 빌드 시간이 길고 메모리 많이 사용
CREATE INDEX idx_hnsw
ON document_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

SET hnsw.ef_search = 40;  -- 검색 시 탐색 폭 (높을수록 정확)

4.3 Pinecone, Weaviate, Milvus 비교

특성pgvectorPineconeWeaviateMilvus
설치PostgreSQL 확장SaaS자체 호스팅/클라우드자체 호스팅/클라우드
필터링SQL 완전 지원메타데이터 필터GraphQL + 필터풍부한 필터
스케일수백만 행수억+수억+수십억+
비용PostgreSQL 비용만사용량 기반오픈소스 무료오픈소스 무료
하이브리드 검색BM25 + 벡터기본 지원BM25 + 벡터 내장풍부한 지원
적합 사례기존 PG 인프라빠른 프로토타입지식 그래프초대규모

5. 분산 데이터베이스: 이론과 실전

5.1 CAP Theorem

CAP 정리는 분산 시스템에서 다음 세 가지를 동시에 만족할 수 없다는 이론입니다:

  • C (Consistency): 모든 노드가 동일한 최신 데이터를 반환
  • A (Availability): 모든 요청이 응답을 받음
  • P (Partition Tolerance): 네트워크 파티션 시에도 동작

네트워크 파티션은 실제로 항상 발생할 수 있으므로, 사실상 CP 또는 AP 중 선택합니다.

CP 시스템 (일관성 우선):

  • 파티션 발생 시 가용성을 포기
  • 예: ZooKeeper, HBase, MongoDB (w:majority)
  • 적합: 금융 거래, 재고 관리

AP 시스템 (가용성 우선):

  • 파티션 발생 시 최신이 아닐 수 있는 데이터 반환
  • 예: DynamoDB, Cassandra, CouchDB
  • 적합: 소셜 피드, 설정 정보, DNS

5.2 일관성 모델

강한 일관성 (Strong Consistency)
   ↓ 성능 저하, 지연 증가
순차 일관성 (Sequential Consistency)
인과 일관성 (Causal Consistency)
최종 일관성 (Eventual Consistency)
   ↓ 성능 향상, 가용성 증가
단조 읽기 (Monotonic Read)

5.3 Sharding 전략

# Range Sharding: 연속 범위로 분할
def range_shard(user_id: int, num_shards: int = 4) -> int:
    shard_size = 250_000_000  # 10억 사용자를 4개 샤드
    return min(user_id // shard_size, num_shards - 1)

# Hash Sharding: 균등 분배 (핫스팟 방지)
import hashlib

def hash_shard(key: str, num_shards: int = 8) -> int:
    hash_val = int(hashlib.md5(key.encode()).hexdigest(), 16)
    return hash_val % num_shards

# Consistent Hashing: 노드 추가/제거 시 리밸런싱 최소화
import bisect

class ConsistentHashRing:
    def __init__(self, nodes: list, replicas: int = 150):
        self.replicas = replicas
        self.ring = {}
        self.sorted_keys = []
        for node in nodes:
            self.add_node(node)

    def add_node(self, node: str):
        for i in range(self.replicas):
            key = self._hash(f"{node}:{i}")
            self.ring[key] = node
            bisect.insort(self.sorted_keys, key)

    def get_node(self, key: str) -> str:
        if not self.ring:
            return None
        hash_val = self._hash(key)
        idx = bisect.bisect(self.sorted_keys, hash_val)
        if idx == len(self.sorted_keys):
            idx = 0
        return self.ring[self.sorted_keys[idx]]

    def _hash(self, key: str) -> int:
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

6. 데이터 모델링

6.1 정규화 vs 비정규화

정규화 (3NF까지):

  • 데이터 중복 최소화
  • 업데이트 이상 방지
  • 쓰기 성능 좋음, 읽기는 JOIN 필요

비정규화:

  • 읽기 성능 최적화
  • 데이터 중복 허용
  • OLAP, 데이터 웨어하우스에 적합
-- 정규화된 스키마
CREATE TABLE categories (id SERIAL PRIMARY KEY, name TEXT);
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name TEXT,
    category_id INT REFERENCES categories(id)
);

-- 비정규화된 스키마 (읽기 최적화)
CREATE TABLE products_denormalized (
    id SERIAL PRIMARY KEY,
    name TEXT,
    category_id INT,
    category_name TEXT  -- 중복 저장으로 JOIN 제거
);

6.2 스타 스키마 (Data Warehouse)

-- 팩트 테이블 (측정값)
CREATE TABLE fact_sales (
    sale_id BIGINT,
    date_key INT,
    product_key INT,
    customer_key INT,
    store_key INT,
    quantity INT,
    unit_price DECIMAL(10,2),
    total_amount DECIMAL(10,2)
);

-- 디멘션 테이블 (맥락 정보)
CREATE TABLE dim_date (
    date_key INT PRIMARY KEY,
    full_date DATE,
    year INT, quarter INT, month INT, week INT, day_of_week INT
);

CREATE TABLE dim_product (
    product_key INT PRIMARY KEY,
    product_id TEXT,
    name TEXT, category TEXT, brand TEXT, unit_cost DECIMAL(10,2)
);

7. AI 연계: LLM + DB 통합 패턴

7.1 LangChain + pgvector로 RAG 시스템 구축

from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_postgres import PGVector
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.schema import Document

# 1. 임베딩 모델 설정
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

# 2. pgvector 벡터 스토어 연결
CONNECTION_STRING = "postgresql+psycopg://user:password@localhost:5432/vectordb"
COLLECTION_NAME = "documents"

vector_store = PGVector(
    embeddings=embeddings,
    collection_name=COLLECTION_NAME,
    connection=CONNECTION_STRING,
    use_jsonb=True,  # 메타데이터 JSONB 저장
)

# 3. 문서 청킹 및 임베딩 저장
def ingest_documents(file_path: str, metadata: dict):
    with open(file_path, 'r', encoding='utf-8') as f:
        raw_text = f.read()

    splitter = RecursiveCharacterTextSplitter(
        chunk_size=512,
        chunk_overlap=50,
        separators=["\n\n", "\n", ".", "。", " "]
    )
    chunks = splitter.split_text(raw_text)

    documents = [
        Document(page_content=chunk, metadata={**metadata, "chunk_index": i})
        for i, chunk in enumerate(chunks)
    ]

    ids = vector_store.add_documents(documents)
    print(f"Ingested {len(ids)} chunks from {file_path}")
    return ids

# 4. RAG 체인 구성
def build_rag_chain(k: int = 5, score_threshold: float = 0.7):
    retriever = vector_store.as_retriever(
        search_type="similarity_score_threshold",
        search_kwargs={"k": k, "score_threshold": score_threshold}
    )

    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=retriever,
        return_source_documents=True,
        verbose=True
    )
    return qa_chain

# 5. 쿼리 실행
rag_chain = build_rag_chain()
result = rag_chain.invoke({"query": "PostgreSQL MVCC는 어떻게 동시성을 처리하나요?"})

print(result['result'])
for doc in result['source_documents']:
    print(f"Source: {doc.metadata.get('source')}, Chunk: {doc.metadata.get('chunk_index')}")

7.2 Hybrid Search: BM25 + 벡터 검색 결합

from sqlalchemy import text

def hybrid_search(
    query: str,
    query_embedding: list,
    k: int = 10,
    alpha: float = 0.5  # 0=BM25 only, 1=벡터 only
) -> list:
    """RRF (Reciprocal Rank Fusion)로 결과 결합"""

    sql = text("""
    WITH
    vector_search AS (
        SELECT id, content, metadata,
               ROW_NUMBER() OVER (ORDER BY embedding <=> :embedding) AS rank
        FROM document_embeddings
        ORDER BY embedding <=> :embedding
        LIMIT :k
    ),
    bm25_search AS (
        SELECT id, content, metadata,
               ROW_NUMBER() OVER (ORDER BY ts_rank(to_tsvector('korean', content),
                                  plainto_tsquery('korean', :query)) DESC) AS rank
        FROM document_embeddings
        WHERE to_tsvector('korean', content) @@ plainto_tsquery('korean', :query)
        LIMIT :k
    ),
    rrf_scores AS (
        SELECT
            COALESCE(v.id, b.id) AS id,
            COALESCE(v.content, b.content) AS content,
            COALESCE(v.metadata, b.metadata) AS metadata,
            COALESCE(1.0 / (60 + v.rank), 0) * :alpha +
            COALESCE(1.0 / (60 + b.rank), 0) * (1 - :alpha) AS rrf_score
        FROM vector_search v
        FULL OUTER JOIN bm25_search b ON v.id = b.id
    )
    SELECT id, content, metadata, rrf_score
    FROM rrf_scores
    ORDER BY rrf_score DESC
    LIMIT :k
    """)

    with engine.connect() as conn:
        results = conn.execute(sql, {
            "embedding": str(query_embedding),
            "query": query,
            "k": k,
            "alpha": alpha
        })
        return [dict(row) for row in results]

7.3 임베딩 캐싱 전략

import hashlib
import json
from typing import Optional

class EmbeddingCache:
    def __init__(self, redis_client, ttl: int = 86400 * 7):  # 7일 캐시
        self.redis = redis_client
        self.ttl = ttl

    def _cache_key(self, text: str, model: str) -> str:
        content_hash = hashlib.sha256(f"{model}:{text}".encode()).hexdigest()
        return f"embedding:{content_hash}"

    def get(self, text: str, model: str) -> Optional[list]:
        key = self._cache_key(text, model)
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)
        return None

    def set(self, text: str, model: str, embedding: list) -> None:
        key = self._cache_key(text, model)
        self.redis.setex(key, self.ttl, json.dumps(embedding))

    def get_or_compute(self, text: str, model: str, compute_fn) -> list:
        cached = self.get(text, model)
        if cached:
            return cached
        embedding = compute_fn(text)
        self.set(text, model, embedding)
        return embedding

퀴즈: 실력 점검

Q1. B-Tree 인덱스와 Hash 인덱스는 어떤 상황에서 각각 더 적합한가요?

정답: B-Tree는 범위 쿼리, 정렬, LIKE 'prefix%' 검색에 적합합니다. Hash 인덱스는 등치 비교(=)에서만 적합합니다.

설명: B-Tree는 키를 정렬된 트리 구조로 저장하므로 WHERE age > 30, ORDER BY name, BETWEEN 등 범위 연산에서 뛰어납니다. Hash 인덱스는 키를 해시하여 저장하므로 WHERE id = 42처럼 정확한 값 매칭에서 O(1) 성능을 보이지만, 범위 쿼리나 정렬에는 전혀 사용할 수 없습니다. PostgreSQL에서 Hash 인덱스는 WAL 로깅 지원이 추가된 v10 이후 프로덕션에서 사용 가능해졌습니다.

Q2. PostgreSQL MVCC(Multi-Version Concurrency Control)가 동시성을 처리하는 방식은?

정답: 각 트랜잭션이 시작될 때 스냅샷을 찍어, 해당 시점의 데이터 버전만 볼 수 있게 합니다. 읽기는 쓰기를 블록하지 않고, 쓰기도 읽기를 블록하지 않습니다.

설명: PostgreSQL은 행을 업데이트할 때 기존 행을 삭제하지 않고, xmin/xmax 트랜잭션 ID를 함께 저장한 새 버전의 행을 추가합니다. 각 트랜잭션은 시작 시점의 xid를 기준으로 자신이 볼 수 있는 버전을 결정합니다. 오래된 버전은 VACUUM 프로세스가 주기적으로 정리합니다(dead tuple 제거). 이 덕분에 읽기/쓰기 충돌이 최소화되어 높은 동시성을 달성합니다.

Q3. CAP theorem에서 CP 시스템과 AP 시스템의 트레이드오프는 무엇인가요?

정답: CP는 네트워크 파티션 발생 시 일부 요청에 에러를 반환하여 일관성을 보장합니다. AP는 파티션 발생 시에도 응답을 반환하지만 데이터가 최신이 아닐 수 있습니다.

설명: CP 시스템(ZooKeeper, HBase)은 파티션 상황에서 응답을 거부하거나 에러를 반환해 일관성을 유지합니다. 금융 거래, 재고 차감 등 정확성이 중요한 시스템에 적합합니다. AP 시스템(Cassandra, DynamoDB)은 파티션 상황에서도 최선의 응답을 반환하지만, 노드 간 데이터가 잠시 불일치할 수 있으며 최종적 일관성(Eventual Consistency)을 보장합니다. 소셜 미디어 피드, 알림 시스템처럼 약간의 불일치가 허용되는 시스템에 적합합니다.

Q4. 벡터 데이터베이스에서 HNSW 알고리즘이 ANN 검색에 사용되는 이유는?

정답: HNSW(Hierarchical Navigable Small World)는 여러 계층의 그래프를 구성하여 O(log N) 수준의 빠른 근사 최근접 이웃 검색을 가능하게 하며, 높은 recall과 빠른 쿼리 속도를 동시에 달성합니다.

설명: 수백만 개의 벡터에서 정확한 최근접 이웃(KNN)을 찾으려면 모든 벡터와 비교해야 하므로 O(N) 시간이 필요합니다. HNSW는 각 노드가 몇 개의 이웃과만 연결된 소세계 그래프(small-world graph)를 계층적으로 구성합니다. 상위 계층은 대략적 네비게이션에, 하위 계층은 정밀 검색에 사용됩니다. 빌드 파라미터 m(각 노드의 연결 수)과 ef_construction(빌드 시 탐색 폭)이 인덱스 품질을 결정하고, 쿼리 시 ef_search가 recall과 속도의 트레이드오프를 조절합니다.

Q5. Redis의 write-through 전략과 write-behind 전략의 차이는 무엇인가요?

정답: Write-through는 캐시와 DB를 동시에 업데이트하여 일관성을 보장하지만 쓰기 지연이 발생합니다. Write-behind는 캐시만 먼저 업데이트하고 DB 저장을 비동기로 처리하여 쓰기 속도가 빠르지만 데이터 유실 위험이 있습니다.

설명: Write-through는 모든 쓰기 요청에서 캐시와 DB를 동기적으로 업데이트하므로 데이터 일관성이 보장되지만, DB 쓰기 지연이 그대로 사용자에게 전달됩니다. Write-behind(Write-back)는 캐시만 즉시 업데이트하고, 더티(dirty) 데이터를 배치로 DB에 플러시합니다. 쓰기 처리량이 높고 응답 속도가 빠르지만, 플러시 전에 시스템이 장애를 겪으면 캐시의 데이터가 DB에 반영되지 않아 유실될 수 있습니다. 전자상거래의 장바구니, 게임 점수처럼 빈번한 쓰기가 발생하는 곳에 write-behind가 적합합니다.


마무리

데이터베이스 엔지니어링은 단순히 SQL을 작성하는 것을 넘어, 데이터 모델 설계부터 인덱스 전략, 분산 시스템 이론, 그리고 이제는 벡터 검색과 AI 연계까지 광범위한 지식을 요구합니다. 특히 AI 시대에는 임베딩 저장, 시맨틱 검색, LLM과의 통합이 데이터베이스 엔지니어의 핵심 역량이 되었습니다.

이 가이드의 내용을 바탕으로 실제 프로젝트에 적용해보시기 바랍니다. pgvector로 시작하는 RAG 시스템, Redis 캐싱 계층, 그리고 적절한 인덱스 설계만으로도 시스템 성능을 수십 배 향상시킬 수 있습니다.

Database Engineering Complete Guide: From SQL to Vector DBs and AI RAG Systems

Database Engineering Complete Guide: From SQL to Vector DBs and AI RAG Systems

Even in the AI era, database engineering remains the backbone of every production system. No matter how powerful an LLM is, you cannot build reliable software without efficiently storing and retrieving data. The rise of vector search and RAG architectures has actually made database engineering more important, not less.

This guide takes you from advanced SQL through PostgreSQL internals, NoSQL systems, vector databases, distributed DB theory, and finally into LLM + DB integration patterns — all with production-ready code examples.


1. Relational DB Core: Advanced SQL

1.1 Window Functions

Window functions compute results across a set of rows related to the current row without collapsing them into a single group — indispensable for analytical queries.

-- Salary rank and rolling average per department
SELECT
    employee_id,
    name,
    department,
    salary,
    RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS dept_rank,
    SUM(salary) OVER (PARTITION BY department) AS dept_total,
    AVG(salary) OVER (
        PARTITION BY department
        ORDER BY hire_date
        ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
    ) AS rolling_avg_3
FROM employees;

Key window functions:

  • ROW_NUMBER(): Unique sequential number with no ties
  • RANK(): Tied rows get the same rank; next rank skips numbers
  • DENSE_RANK(): Tied rows get the same rank; next rank is consecutive
  • LAG() / LEAD(): Access values from previous / next rows
  • FIRST_VALUE() / LAST_VALUE(): First / last value within the window frame
-- Month-over-month revenue growth rate
SELECT
    month,
    revenue,
    LAG(revenue, 1) OVER (ORDER BY month) AS prev_month,
    ROUND(
        (revenue - LAG(revenue, 1) OVER (ORDER BY month)) * 100.0
        / NULLIF(LAG(revenue, 1) OVER (ORDER BY month), 0),
        2
    ) AS growth_rate_pct
FROM monthly_sales;

1.2 CTEs (Common Table Expressions)

CTEs let you break complex queries into named, readable steps. Recursive CTEs are especially powerful for hierarchical traversal.

-- Recursive CTE for org chart traversal
WITH RECURSIVE org_tree AS (
    -- Base case: top-level employees
    SELECT
        employee_id,
        name,
        manager_id,
        0 AS depth,
        name::TEXT AS path
    FROM employees
    WHERE manager_id IS NULL

    UNION ALL

    -- Recursive case
    SELECT
        e.employee_id,
        e.name,
        e.manager_id,
        ot.depth + 1,
        ot.path || ' > ' || e.name
    FROM employees e
    INNER JOIN org_tree ot ON e.manager_id = ot.employee_id
)
SELECT employee_id, name, depth, path
FROM org_tree
ORDER BY path;
-- Multi-step analysis broken into readable CTEs
WITH
top_customers AS (
    SELECT customer_id, SUM(amount) AS total_spent
    FROM orders
    WHERE created_at >= NOW() - INTERVAL '90 days'
    GROUP BY customer_id
    HAVING SUM(amount) > 1000
),
customer_details AS (
    SELECT c.*, tc.total_spent
    FROM customers c
    JOIN top_customers tc ON c.id = tc.customer_id
),
ranked AS (
    SELECT *,
           NTILE(4) OVER (ORDER BY total_spent DESC) AS quartile
    FROM customer_details
)
SELECT * FROM ranked WHERE quartile = 1;

1.3 Reading EXPLAIN ANALYZE

Understanding the query execution plan is the single most powerful skill for diagnosing performance problems.

EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT u.name, COUNT(o.id) AS order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at > '2025-01-01'
GROUP BY u.id, u.name
ORDER BY order_count DESC
LIMIT 10;

Sample plan output:

Limit  (cost=1234.56..1234.57 rows=10 width=72) (actual time=45.123..45.125 rows=10 loops=1)
  ->  Sort  (cost=1234.56..1259.56 rows=10000 width=72) (actual time=45.120..45.121 rows=10 loops=1)
        Sort Key: (count(o.id)) DESC
        Sort Method: top-N heapsort  Memory: 25kB
        ->  HashAggregate  (cost=876.00..976.00 rows=10000 width=72) (actual time=38.456..42.234 rows=8523 loops=1)
              Group Key: u.id
              ->  Hash Left Join  (cost=345.00..801.00 rows=15000 width=40)
                    Hash Cond: (o.user_id = u.id)
                    ->  Seq Scan on orders o  (cost=0.00..312.00 rows=15000 width=16)
                    ->  Hash  (cost=270.00..270.00 rows=6000 width=32)
                          ->  Index Scan using idx_users_created_at on users u
Planning Time: 1.234 ms
Execution Time: 45.456 ms

Key signals to watch:

  • Seq Scan on a large table: investigate whether an index would help
  • Large gap between estimated and actual rows: run ANALYZE to refresh statistics
  • Buffers: shared read vs hit: low cache hit ratio means consider shared_buffers tuning
  • High loops on nested loop joins: consider a hash join or better index

1.4 Index Design

-- Composite index: put high-selectivity columns first
CREATE INDEX idx_orders_user_status_date
ON orders (user_id, status, created_at DESC);

-- Partial index: only index rows meeting a condition
CREATE INDEX idx_active_users
ON users (email)
WHERE deleted_at IS NULL AND status = 'active';

-- Expression index: index the result of a function
CREATE INDEX idx_users_lower_email
ON users (LOWER(email));

-- BRIN index: very small, ideal for naturally ordered time-series
CREATE INDEX idx_logs_timestamp_brin
ON application_logs USING BRIN (created_at);

-- GIN index: arrays, JSONB, full-text search
CREATE INDEX idx_products_tags_gin
ON products USING GIN (tags);

1.5 Transactions and ACID

SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;

BEGIN;
  -- Atomic bank transfer
  UPDATE accounts SET balance = balance - 500 WHERE id = 1;
  UPDATE accounts SET balance = balance + 500 WHERE id = 2;

  -- Guard clause
  DO $$
  DECLARE
    bal NUMERIC;
  BEGIN
    SELECT balance INTO bal FROM accounts WHERE id = 1;
    IF bal < 0 THEN
      RAISE EXCEPTION 'Insufficient funds';
    END IF;
  END $$;

COMMIT;

Isolation level anomalies:

Isolation LevelDirty ReadNon-repeatable ReadPhantom Read
READ UNCOMMITTEDPossiblePossiblePossible
READ COMMITTEDPreventedPossiblePossible
REPEATABLE READPreventedPreventedPossible
SERIALIZABLEPreventedPreventedPrevented

2. PostgreSQL in Practice

2.1 JSONB and Semi-Structured Data

PostgreSQL's JSONB stores JSON in a parsed binary format enabling indexing and fast operators.

CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name TEXT NOT NULL,
    metadata JSONB NOT NULL DEFAULT '{}'
);

CREATE INDEX idx_products_metadata ON products USING GIN (metadata);

-- Containment check (@>), key existence (?), path extraction (->>)
SELECT * FROM products
WHERE metadata @> '{"category": "electronics", "in_stock": true}';

SELECT
    name,
    metadata->>'brand' AS brand,
    (metadata->>'price')::NUMERIC AS price,
    metadata->'specs'->>'cpu' AS cpu
FROM products
WHERE metadata ? 'discount_pct'
  AND (metadata->>'discount_pct')::NUMERIC > 10;

-- Update a single key without replacing the whole document
UPDATE products
SET metadata = jsonb_set(metadata, '{price}', '29900'::jsonb)
WHERE id = 42;

2.2 Table Partitioning

-- Range partitioning for time-series data
CREATE TABLE events (
    id BIGSERIAL,
    user_id INT,
    event_type TEXT,
    payload JSONB,
    created_at TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (created_at);

CREATE OR REPLACE FUNCTION create_monthly_partition(target_date DATE)
RETURNS VOID AS $$
DECLARE
    partition_name TEXT;
    start_date DATE;
    end_date DATE;
BEGIN
    start_date := DATE_TRUNC('month', target_date);
    end_date := start_date + INTERVAL '1 month';
    partition_name := 'events_' || TO_CHAR(start_date, 'YYYY_MM');

    EXECUTE FORMAT(
        'CREATE TABLE IF NOT EXISTS %I PARTITION OF events
         FOR VALUES FROM (%L) TO (%L)',
        partition_name, start_date, end_date
    );
END;
$$ LANGUAGE plpgsql;

SELECT create_monthly_partition('2026-03-01');

2.3 Logical Replication

-- On the primary: create a publication
ALTER SYSTEM SET wal_level = logical;

CREATE PUBLICATION app_publication
FOR TABLE users, orders, products
WITH (publish = 'insert, update, delete');

-- On the replica: subscribe
CREATE SUBSCRIPTION app_subscription
CONNECTION 'host=primary-db port=5432 dbname=myapp user=replicator'
PUBLICATION app_publication;

-- Monitor replication lag
SELECT subname, received_lsn, latest_end_lsn, latest_end_time
FROM pg_stat_subscription;

3. NoSQL Databases

3.1 Redis: Caching Patterns

import redis
import json
import hashlib
import time
from typing import Optional

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# Cache-Aside (Lazy Loading)
def get_user_profile(user_id: int) -> dict:
    cache_key = f"user:profile:{user_id}"

    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    user = db.query("SELECT * FROM users WHERE id = %s", user_id)
    r.setex(cache_key, 3600, json.dumps(user))
    return user

# Write-Through: update cache synchronously on every write
def update_user_profile(user_id: int, data: dict) -> None:
    cache_key = f"user:profile:{user_id}"
    db.execute("UPDATE users SET ... WHERE id = %s", user_id)
    updated = get_user_from_db(user_id)
    r.setex(cache_key, 3600, json.dumps(updated))

# Write-Behind (Write-Back): write to cache immediately, flush DB asynchronously
class WriteBehindCache:
    def __init__(self):
        self.dirty_keys_set = "cache:dirty_keys"

    def write(self, key: str, value: dict, ttl: int = 3600):
        r.setex(key, ttl, json.dumps(value))
        r.sadd(self.dirty_keys_set, key)

    def flush_to_db(self):
        dirty_keys = r.smembers(self.dirty_keys_set)
        for key in dirty_keys:
            data = r.get(key)
            if data:
                db.upsert(json.loads(data))
                r.srem(self.dirty_keys_set, key)

# Distributed lock (Redlock pattern)
def acquire_lock(lock_name: str, timeout: int = 10) -> Optional[str]:
    identifier = str(time.time())
    acquired = r.set(f"lock:{lock_name}", identifier, nx=True, ex=timeout)
    return identifier if acquired else None

def release_lock(lock_name: str, identifier: str) -> bool:
    lua = """
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end
    """
    return bool(r.eval(lua, 1, f"lock:{lock_name}", identifier))

Additional Redis data structures:

# Sorted Set: real-time leaderboard
def update_score(player: str, score: int):
    r.zadd("leaderboard", {player: score})

def get_top_players(n: int = 10):
    return r.zrevrange("leaderboard", 0, n - 1, withscores=True)

# HyperLogLog: approximate unique visitor count (very memory-efficient)
def track_visitor(page: str, user_id: str):
    r.pfadd(f"visitors:{page}", user_id)

def get_unique_visitors(page: str) -> int:
    return r.pfcount(f"visitors:{page}")

3.2 MongoDB: Document Modeling

from pymongo import MongoClient
from datetime import datetime

client = MongoClient('mongodb://localhost:27017/')
db = client['ecommerce']

# Embedded documents vs references
# Embed when data is frequently read together and has limited growth
sample_order = {
    "_id": "order_12345",
    "user_id": "user_67890",
    "status": "shipped",
    "created_at": datetime.utcnow(),
    "shipping_address": {           # Embedded: snapshot at time of order
        "street": "123 Main St",
        "city": "Seoul",
        "zip": "06234"
    },
    "items": [                      # Embedded: price snapshot preserved
        {"product_id": "p001", "name": "Laptop", "price": 1200, "qty": 1},
        {"product_id": "p002", "name": "Mouse",  "price":   35, "qty": 2}
    ],
    "total": 1270
}

# Aggregation Pipeline
pipeline = [
    {"$match": {
        "created_at": {"$gte": datetime(2026, 2, 17)},
        "status": {"$in": ["delivered", "shipped"]}
    }},
    {"$unwind": "$items"},
    {"$group": {
        "_id": "$items.product_id",
        "product_name": {"$first": "$items.name"},
        "total_qty":     {"$sum": "$items.qty"},
        "total_revenue": {"$sum": {"$multiply": ["$items.price", "$items.qty"]}}
    }},
    {"$sort": {"total_revenue": -1}},
    {"$limit": 10},
    {"$project": {
        "product_id":    "$_id",
        "product_name":  1,
        "total_qty":     1,
        "total_revenue": 1,
        "_id": 0
    }}
]

top_products = list(db['orders'].aggregate(pipeline))

3.3 Cassandra: Wide-Column Design

Cassandra forces you to design tables around query patterns, not entity relationships.

-- Query-first table design
CREATE TABLE user_timeline (
    user_id  UUID,
    created_at TIMEUUID,
    post_id  UUID,
    content  TEXT,
    PRIMARY KEY (user_id, created_at)
) WITH CLUSTERING ORDER BY (created_at DESC)
  AND compaction = {
    'class': 'TimeWindowCompactionStrategy',
    'compaction_window_unit': 'DAYS',
    'compaction_window_size': 7
  };

-- Denormalized table for tag-based search (separate query pattern)
CREATE TABLE posts_by_tag (
    tag        TEXT,
    created_at TIMEUUID,
    post_id    UUID,
    user_id    UUID,
    title      TEXT,
    PRIMARY KEY (tag, created_at, post_id)
) WITH CLUSTERING ORDER BY (created_at DESC);

4. Vector Databases: The AI-Era Core

4.1 pgvector: Vector Search Inside PostgreSQL

CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE document_embeddings (
    id         BIGSERIAL PRIMARY KEY,
    content    TEXT NOT NULL,
    metadata   JSONB DEFAULT '{}',
    embedding  vector(1536),
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- HNSW index for fast approximate nearest-neighbor search
CREATE INDEX idx_doc_hnsw
ON document_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- Cosine similarity search (lower distance = more similar)
SELECT
    id,
    content,
    metadata,
    1 - (embedding <=> '[0.1, 0.2, ...]'::vector) AS similarity
FROM document_embeddings
ORDER BY embedding <=> '[0.1, 0.2, ...]'::vector
LIMIT 5;

-- Hybrid: metadata filter + vector search
SELECT
    id,
    content,
    metadata->>'source' AS source,
    1 - (embedding <=> query_vec) AS similarity
FROM document_embeddings
WHERE metadata->>'language' = 'en'
  AND metadata->>'category' = 'technical'
ORDER BY embedding <=> query_vec
LIMIT 10;

Distance operators in pgvector:

  • <=>: Cosine distance — best for text embeddings
  • <->: L2 (Euclidean) distance — image feature vectors
  • <#>: Negative inner product — normalized vectors (equivalent to cosine)

4.2 IVFFlat vs HNSW

-- IVFFlat: faster build, lower memory, good for large initial datasets
CREATE INDEX idx_ivfflat
ON document_embeddings
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);         -- rule of thumb: sqrt(row count)

SET ivfflat.probes = 10;   -- more probes = higher recall, slower query

-- HNSW: higher recall, faster queries, more memory / slower build
CREATE INDEX idx_hnsw
ON document_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

SET hnsw.ef_search = 40;   -- higher = more accurate, slower

4.3 Choosing a Vector Database

PropertypgvectorPineconeWeaviateMilvus
DeploymentPG extensionSaaSSelf-hosted / cloudSelf-hosted / cloud
FilteringFull SQLMetadata filtersGraphQL + filtersRich filters
ScaleMillionsBillions+Hundreds of millionsBillions+
CostPG infra onlyUsage-basedOpen-source freeOpen-source free
Hybrid searchBM25 + vectorBuilt-inBM25 + vector nativeRich support
Best forExisting PG stackFast prototypesKnowledge graphsMassive scale

5. Distributed Databases: Theory and Practice

5.1 CAP Theorem

The CAP theorem states that a distributed system can satisfy at most two of:

  • C (Consistency): Every node returns the latest data
  • A (Availability): Every request gets a response
  • P (Partition Tolerance): The system keeps working despite network splits

Because network partitions are inevitable in distributed systems, the real choice is between CP and AP.

CP systems (consistency over availability):

  • Refuse requests during a partition to preserve correctness
  • Examples: ZooKeeper, HBase, MongoDB with w: majority
  • Use cases: financial transactions, inventory management

AP systems (availability over consistency):

  • Return possibly stale data during a partition
  • Examples: DynamoDB, Cassandra, CouchDB
  • Use cases: social feeds, DNS, notification systems

5.2 Consistency Models

Strong Consistency        (high latency, low throughput)
Sequential Consistency
Causal Consistency
Eventual Consistency      (low latency, high throughput)
Monotonic Read

5.3 Sharding Strategies

# Range sharding: consecutive ID ranges per shard
def range_shard(user_id: int, num_shards: int = 4) -> int:
    shard_size = 250_000_000
    return min(user_id // shard_size, num_shards - 1)

# Hash sharding: uniform distribution, prevents hotspots
import hashlib

def hash_shard(key: str, num_shards: int = 8) -> int:
    h = int(hashlib.md5(key.encode()).hexdigest(), 16)
    return h % num_shards

# Consistent hashing: minimal rehashing when nodes are added/removed
import bisect

class ConsistentHashRing:
    def __init__(self, nodes: list, replicas: int = 150):
        self.replicas = replicas
        self.ring: dict = {}
        self.sorted_keys: list = []
        for node in nodes:
            self.add_node(node)

    def add_node(self, node: str):
        for i in range(self.replicas):
            key = self._hash(f"{node}:{i}")
            self.ring[key] = node
            bisect.insort(self.sorted_keys, key)

    def get_node(self, key: str) -> str:
        if not self.ring:
            return None
        h = self._hash(key)
        idx = bisect.bisect(self.sorted_keys, h)
        if idx == len(self.sorted_keys):
            idx = 0
        return self.ring[self.sorted_keys[idx]]

    def _hash(self, key: str) -> int:
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

6. Data Modeling

6.1 Normalization vs Denormalization

Normalization (through 3NF):

  • Eliminates redundancy and update anomalies
  • Excellent write performance
  • Requires JOINs for reads

Denormalization:

  • Optimizes read performance at the cost of redundancy
  • Well-suited for OLAP and data warehouses
-- Normalized schema
CREATE TABLE categories (id SERIAL PRIMARY KEY, name TEXT);
CREATE TABLE products (
    id          SERIAL PRIMARY KEY,
    name        TEXT,
    category_id INT REFERENCES categories(id)
);

-- Denormalized schema (eliminates the JOIN)
CREATE TABLE products_denormalized (
    id            SERIAL PRIMARY KEY,
    name          TEXT,
    category_id   INT,
    category_name TEXT   -- duplicated to avoid JOIN
);

6.2 Star Schema for Data Warehouses

-- Fact table (measurements / events)
CREATE TABLE fact_sales (
    sale_id      BIGINT,
    date_key     INT,
    product_key  INT,
    customer_key INT,
    store_key    INT,
    quantity     INT,
    unit_price   DECIMAL(10,2),
    total_amount DECIMAL(10,2)
);

-- Dimension tables (descriptive context)
CREATE TABLE dim_date (
    date_key    INT PRIMARY KEY,
    full_date   DATE,
    year        INT, quarter INT, month INT, week INT, day_of_week INT
);

CREATE TABLE dim_product (
    product_key INT PRIMARY KEY,
    product_id  TEXT,
    name        TEXT, category TEXT, brand TEXT, unit_cost DECIMAL(10,2)
);

7. AI Integration: LLM + DB Patterns

7.1 Building a RAG System with LangChain + pgvector

from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_postgres import PGVector
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.schema import Document

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

CONNECTION_STRING = "postgresql+psycopg://user:password@localhost:5432/vectordb"

vector_store = PGVector(
    embeddings=embeddings,
    collection_name="documents",
    connection=CONNECTION_STRING,
    use_jsonb=True,
)

# Ingest: chunk, embed, and store
def ingest_documents(file_path: str, metadata: dict):
    with open(file_path, 'r', encoding='utf-8') as f:
        raw_text = f.read()

    splitter = RecursiveCharacterTextSplitter(
        chunk_size=512,
        chunk_overlap=50,
        separators=["\n\n", "\n", ".", " "]
    )
    chunks = splitter.split_text(raw_text)
    documents = [
        Document(page_content=chunk, metadata={**metadata, "chunk_index": i})
        for i, chunk in enumerate(chunks)
    ]
    return vector_store.add_documents(documents)

# Build the RAG chain
def build_rag_chain(k: int = 5, score_threshold: float = 0.7):
    retriever = vector_store.as_retriever(
        search_type="similarity_score_threshold",
        search_kwargs={"k": k, "score_threshold": score_threshold}
    )
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    return RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=retriever,
        return_source_documents=True,
    )

rag_chain = build_rag_chain()
result = rag_chain.invoke({"query": "How does PostgreSQL MVCC handle concurrency?"})
print(result['result'])

7.2 Hybrid Search: BM25 + Vector via RRF

from sqlalchemy import text

def hybrid_search(
    query: str,
    query_embedding: list,
    k: int = 10,
    alpha: float = 0.5,   # 0 = BM25 only, 1 = vector only
) -> list:
    sql = text("""
    WITH
    vector_results AS (
        SELECT id, content, metadata,
               ROW_NUMBER() OVER (ORDER BY embedding <=> :embedding) AS rank
        FROM document_embeddings
        ORDER BY embedding <=> :embedding
        LIMIT :k
    ),
    bm25_results AS (
        SELECT id, content, metadata,
               ROW_NUMBER() OVER (ORDER BY ts_rank(
                   to_tsvector('english', content),
                   plainto_tsquery('english', :query)
               ) DESC) AS rank
        FROM document_embeddings
        WHERE to_tsvector('english', content) @@ plainto_tsquery('english', :query)
        LIMIT :k
    ),
    rrf AS (
        SELECT
            COALESCE(v.id, b.id)           AS id,
            COALESCE(v.content, b.content) AS content,
            COALESCE(v.metadata, b.metadata) AS metadata,
            COALESCE(1.0 / (60 + v.rank), 0) * :alpha +
            COALESCE(1.0 / (60 + b.rank), 0) * (1 - :alpha) AS score
        FROM vector_results v
        FULL OUTER JOIN bm25_results b ON v.id = b.id
    )
    SELECT id, content, metadata, score
    FROM rrf
    ORDER BY score DESC
    LIMIT :k
    """)

    with engine.connect() as conn:
        rows = conn.execute(sql, {
            "embedding": str(query_embedding),
            "query": query,
            "k": k,
            "alpha": alpha,
        })
        return [dict(row) for row in rows]

7.3 Embedding Cache with Redis

import hashlib, json
from typing import Optional

class EmbeddingCache:
    def __init__(self, redis_client, ttl: int = 86400 * 7):
        self.redis = redis_client
        self.ttl = ttl

    def _key(self, text: str, model: str) -> str:
        h = hashlib.sha256(f"{model}:{text}".encode()).hexdigest()
        return f"embedding:{h}"

    def get(self, text: str, model: str) -> Optional[list]:
        raw = self.redis.get(self._key(text, model))
        return json.loads(raw) if raw else None

    def set(self, text: str, model: str, embedding: list):
        self.redis.setex(self._key(text, model), self.ttl, json.dumps(embedding))

    def get_or_compute(self, text: str, model: str, compute_fn) -> list:
        cached = self.get(text, model)
        if cached:
            return cached
        embedding = compute_fn(text)
        self.set(text, model, embedding)
        return embedding

Quiz: Test Your Knowledge

Q1. When should you use a B-Tree index versus a Hash index?

Answer: B-Tree is best for range queries, sorting, and prefix LIKE patterns. Hash indexes only support equality comparisons (=).

Explanation: B-Tree stores keys in a sorted tree, making it ideal for WHERE age > 30, ORDER BY name, and BETWEEN operations. Hash indexes compute a hash of the key for O(1) point lookups (WHERE id = 42) but are completely useless for range queries or sorting. In PostgreSQL, Hash indexes gained full WAL logging support in v10, making them production-safe.

Q2. How does PostgreSQL MVCC handle concurrent reads and writes?

Answer: Each transaction takes a snapshot at its start time and only sees row versions that were committed before that snapshot. Readers never block writers and writers never block readers.

Explanation: When a row is updated, PostgreSQL writes a new version of the row tagged with xmin and xmax transaction IDs rather than overwriting the old version. Each transaction uses its own xid snapshot to determine which row version is visible. Old versions (dead tuples) are cleaned up by the VACUUM background process. This is why PostgreSQL can achieve very high read concurrency with minimal locking.

Q3. What are the trade-offs between CP and AP systems in the CAP theorem?

Answer: CP systems sacrifice availability during a network partition to preserve data consistency. AP systems stay available during a partition but may return stale data, offering only eventual consistency.

Explanation: A CP system like ZooKeeper will refuse requests or return errors during a partition, ensuring every response reflects the latest committed state — critical for distributed coordination, financial ledgers, and inventory. An AP system like Cassandra will always return a response even if some nodes are unreachable, but different clients might temporarily see different versions of data. The right choice depends on whether inconsistency or downtime is the bigger risk for your use case.

Q4. Why does HNSW outperform brute-force KNN for approximate nearest-neighbor search in vector databases?

Answer: HNSW builds a hierarchical small-world graph where each node connects to a small number of neighbors. Search navigates from coarse upper layers to fine-grained lower layers in O(log N) time, achieving high recall with far fewer distance computations than brute-force O(N).

Explanation: Exact KNN over millions of vectors requires computing the distance to every vector — prohibitively slow at scale. HNSW (Hierarchical Navigable Small World) constructs multiple graph layers: upper layers enable fast long-range navigation while the bottom layer provides precise local search. The build parameters m (edges per node) and ef_construction (beam width during construction) determine index quality. At query time, ef_search controls the recall/speed trade-off. For most production workloads, HNSW achieves 95%+ recall with millisecond latency.

Q5. What is the difference between write-through and write-behind caching in Redis?

Answer: Write-through updates the cache and the database synchronously on every write, guaranteeing consistency at the cost of write latency. Write-behind updates the cache immediately and flushes dirty entries to the database asynchronously, improving throughput at the risk of data loss on failure.

Explanation: In write-through, every write goes to both the cache and the DB before acknowledging the client, so the cache is never stale but writes are as slow as the DB. In write-behind (write-back), the application writes only to the cache and a background process batches DB writes later. This gives much lower write latency and higher throughput, but if the cache node crashes before flushing, those writes are lost. Write-behind is appropriate for high-frequency, loss-tolerant workloads like shopping carts, game state, and analytics counters.


Conclusion

Database engineering extends far beyond writing SQL. It encompasses data modeling, index strategy, distributed systems theory, and now — increasingly — vector search and AI pipeline integration. In the AI era, the engineer who can design a schema for embedding storage, tune HNSW parameters, implement a Redis caching layer, and wire everything into a LangChain RAG pipeline holds a rare and valuable skill set.

Start with pgvector for vector search inside your existing PostgreSQL infrastructure, add Redis for caching hot data, and apply the principles of proper index design and query plan analysis to keep everything fast. The foundations in this guide will serve you well regardless of which specific tools the ecosystem moves to next.