- Authors

- Name
- Youngju Kim
- @fjvbn20031
데이터베이스 엔지니어링 완전 정복: SQL부터 벡터DB, AI RAG 시스템까지
AI 시대에도 데이터베이스 엔지니어링은 여전히 모든 시스템의 근간입니다. LLM이 아무리 강력해도 데이터를 안전하게 저장하고 빠르게 조회하는 능력 없이는 프로덕션 시스템을 만들 수 없습니다. 특히 벡터 검색과 RAG 시스템의 등장으로 데이터베이스 엔지니어링의 역할은 오히려 더 중요해졌습니다.
이 가이드는 SQL 고급 기법부터 PostgreSQL 실전, NoSQL, 벡터 데이터베이스, 분산 DB 이론, 그리고 LLM + DB 통합 패턴까지 체계적으로 다룹니다.
1. 관계형 DB 핵심: SQL 고급 기법
1.1 Window Functions (윈도우 함수)
윈도우 함수는 집계 함수와 달리 행을 그룹화하지 않고 각 행의 컨텍스트를 유지하면서 계산을 수행합니다. 분석 쿼리에서 필수적입니다.
-- 부서별 급여 순위와 누적 합계
SELECT
employee_id,
name,
department,
salary,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS dept_rank,
SUM(salary) OVER (PARTITION BY department) AS dept_total,
AVG(salary) OVER (
PARTITION BY department
ORDER BY hire_date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) AS rolling_avg_3
FROM employees;
주요 윈도우 함수:
ROW_NUMBER(): 중복 없는 순서 번호RANK(): 동점 시 같은 순위, 다음 순위 건너뜀DENSE_RANK(): 동점 시 같은 순위, 다음 순위 연속LAG() / LEAD(): 이전/다음 행 값 참조FIRST_VALUE() / LAST_VALUE(): 윈도우 내 첫/마지막 값
-- 월별 매출 증감률 계산
SELECT
month,
revenue,
LAG(revenue, 1) OVER (ORDER BY month) AS prev_month,
ROUND(
(revenue - LAG(revenue, 1) OVER (ORDER BY month)) * 100.0
/ NULLIF(LAG(revenue, 1) OVER (ORDER BY month), 0),
2
) AS growth_rate_pct
FROM monthly_sales;
1.2 CTE (Common Table Expressions)
CTE는 복잡한 쿼리를 단계별로 분리하여 가독성과 재사용성을 높입니다. 재귀 CTE는 계층 구조 탐색에 강력합니다.
-- 재귀 CTE로 조직도 탐색
WITH RECURSIVE org_tree AS (
-- Base case: 최상위 직원
SELECT
employee_id,
name,
manager_id,
0 AS depth,
name::TEXT AS path
FROM employees
WHERE manager_id IS NULL
UNION ALL
-- Recursive case
SELECT
e.employee_id,
e.name,
e.manager_id,
ot.depth + 1,
ot.path || ' > ' || e.name
FROM employees e
INNER JOIN org_tree ot ON e.manager_id = ot.employee_id
)
SELECT employee_id, name, depth, path
FROM org_tree
ORDER BY path;
-- 복잡한 분석을 단계별 CTE로 분리
WITH
top_customers AS (
SELECT customer_id, SUM(amount) AS total_spent
FROM orders
WHERE created_at >= NOW() - INTERVAL '90 days'
GROUP BY customer_id
HAVING SUM(amount) > 1000
),
customer_details AS (
SELECT c.*, tc.total_spent
FROM customers c
JOIN top_customers tc ON c.id = tc.customer_id
),
ranked AS (
SELECT *,
NTILE(4) OVER (ORDER BY total_spent DESC) AS quartile
FROM customer_details
)
SELECT * FROM ranked WHERE quartile = 1;
1.3 실행 계획 분석 (EXPLAIN ANALYZE)
쿼리 성능 문제의 근본 원인을 파악하려면 실행 계획을 읽을 줄 알아야 합니다.
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT u.name, COUNT(o.id) AS order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at > '2025-01-01'
GROUP BY u.id, u.name
ORDER BY order_count DESC
LIMIT 10;
실행 계획 출력 해석:
Limit (cost=1234.56..1234.57 rows=10 width=72) (actual time=45.123..45.125 rows=10 loops=1)
-> Sort (cost=1234.56..1259.56 rows=10000 width=72) (actual time=45.120..45.121 rows=10 loops=1)
Sort Key: (count(o.id)) DESC
Sort Method: top-N heapsort Memory: 25kB
-> HashAggregate (cost=876.00..976.00 rows=10000 width=72) (actual time=38.456..42.234 rows=8523 loops=1)
Group Key: u.id
Batches: 1 Memory Usage: 1553kB
-> Hash Left Join (cost=345.00..801.00 rows=15000 width=40) (actual time=5.678..28.901 rows=15000 loops=1)
Hash Cond: (o.user_id = u.id)
-> Seq Scan on orders o (cost=0.00..312.00 rows=15000 width=16) (actual time=0.023..8.456 rows=15000 loops=1)
-> Hash (cost=270.00..270.00 rows=6000 width=32) (actual time=5.234..5.234 rows=6000 loops=1)
Buckets: 8192 Batches: 1 Memory Usage: 358kB
-> Index Scan using idx_users_created_at on users u (cost=0.29..270.00 rows=6000 width=32)
Planning Time: 1.234 ms
Execution Time: 45.456 ms
핵심 지표:
Seq ScanvsIndex Scan: Seq Scan이 나오면 인덱스 필요 여부 검토actual rowsvsrows추정치 차이가 크면 통계 업데이트 필요 (ANALYZE)Buffers: shared hitvsread: 캐시 적중률 확인loops: 중첩 루프 횟수, 높으면 성능 저하
1.4 인덱스 설계 전략
-- 복합 인덱스: 선택도 높은 컬럼을 앞에
CREATE INDEX idx_orders_user_status_date
ON orders (user_id, status, created_at DESC);
-- 부분 인덱스: 특정 조건만 인덱싱 (공간 절약)
CREATE INDEX idx_active_users
ON users (email)
WHERE deleted_at IS NULL AND status = 'active';
-- 표현식 인덱스: 함수 결과 인덱싱
CREATE INDEX idx_users_lower_email
ON users (LOWER(email));
-- BRIN 인덱스: 시계열 데이터에 효율적 (크기 매우 작음)
CREATE INDEX idx_logs_timestamp_brin
ON application_logs USING BRIN (created_at);
-- GIN 인덱스: 배열, JSONB, 전문검색에 적합
CREATE INDEX idx_products_tags_gin
ON products USING GIN (tags);
1.5 트랜잭션과 ACID
-- 트랜잭션 격리 수준 설정
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
BEGIN;
-- 계좌 이체: 원자성 보장
UPDATE accounts SET balance = balance - 500 WHERE id = 1;
UPDATE accounts SET balance = balance + 500 WHERE id = 2;
-- 잔액 검증
DO $$
DECLARE
bal NUMERIC;
BEGIN
SELECT balance INTO bal FROM accounts WHERE id = 1;
IF bal < 0 THEN
RAISE EXCEPTION 'Insufficient funds';
END IF;
END $$;
COMMIT;
격리 수준별 발생 가능한 문제:
| 격리 수준 | Dirty Read | Non-repeatable Read | Phantom Read |
|---|---|---|---|
| READ UNCOMMITTED | 가능 | 가능 | 가능 |
| READ COMMITTED | 방지 | 가능 | 가능 |
| REPEATABLE READ | 방지 | 방지 | 가능 |
| SERIALIZABLE | 방지 | 방지 | 방지 |
2. PostgreSQL 실전: 고급 기능
2.1 JSONB와 반정형 데이터
PostgreSQL의 JSONB는 JSON을 바이너리 형식으로 저장하여 빠른 조회와 인덱싱을 지원합니다.
-- JSONB 컬럼 생성과 GIN 인덱스
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}'
);
CREATE INDEX idx_products_metadata ON products USING GIN (metadata);
-- JSONB 연산자
-- ->> : 텍스트로 추출
-- -> : JSON으로 추출
-- @> : containment 체크
-- ? : 키 존재 여부
SELECT * FROM products
WHERE metadata @> '{"category": "electronics", "in_stock": true}';
SELECT
name,
metadata->>'brand' AS brand,
(metadata->>'price')::NUMERIC AS price,
metadata->'specs'->>'cpu' AS cpu
FROM products
WHERE metadata ? 'discount_pct'
AND (metadata->>'discount_pct')::NUMERIC > 10;
-- JSONB 업데이트: 특정 키만 변경
UPDATE products
SET metadata = jsonb_set(
metadata,
'{price}',
'29900'::jsonb
)
WHERE id = 42;
-- JSONB 경로 쿼리
SELECT jsonb_path_query(
metadata,
'$.specs.memory ? (@ > 16)'
) FROM products;
2.2 파티셔닝 (Table Partitioning)
-- 시계열 데이터 레인지 파티셔닝
CREATE TABLE events (
id BIGSERIAL,
user_id INT,
event_type TEXT,
payload JSONB,
created_at TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (created_at);
-- 월별 파티션 자동 생성 함수
CREATE OR REPLACE FUNCTION create_monthly_partition(target_date DATE)
RETURNS VOID AS $$
DECLARE
partition_name TEXT;
start_date DATE;
end_date DATE;
BEGIN
start_date := DATE_TRUNC('month', target_date);
end_date := start_date + INTERVAL '1 month';
partition_name := 'events_' || TO_CHAR(start_date, 'YYYY_MM');
EXECUTE FORMAT(
'CREATE TABLE IF NOT EXISTS %I PARTITION OF events
FOR VALUES FROM (%L) TO (%L)',
partition_name, start_date, end_date
);
END;
$$ LANGUAGE plpgsql;
SELECT create_monthly_partition('2026-03-01');
SELECT create_monthly_partition('2026-04-01');
-- 파티션 프루닝 확인
EXPLAIN SELECT * FROM events
WHERE created_at BETWEEN '2026-03-01' AND '2026-03-31';
2.3 논리 복제 (Logical Replication)
-- Publisher 설정
ALTER SYSTEM SET wal_level = logical;
CREATE PUBLICATION app_publication
FOR TABLE users, orders, products
WITH (publish = 'insert, update, delete');
-- Subscriber에서 구독 설정
CREATE SUBSCRIPTION app_subscription
CONNECTION 'host=primary-db port=5432 dbname=myapp user=replicator'
PUBLICATION app_publication;
-- 복제 상태 모니터링
SELECT
subname,
received_lsn,
latest_end_lsn,
latest_end_time
FROM pg_stat_subscription;
3. NoSQL 데이터베이스 실전
3.1 Redis: 캐싱 패턴
Redis는 인메모리 데이터 구조 저장소로, 캐싱, 세션 관리, 메시지 큐에 널리 사용됩니다.
import redis
import json
import hashlib
from functools import wraps
from typing import Any, Optional
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Cache-Aside 패턴 (Lazy Loading)
def get_user_profile(user_id: int) -> dict:
cache_key = f"user:profile:{user_id}"
# 1. 캐시 확인
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# 2. DB에서 조회
user = db.query("SELECT * FROM users WHERE id = %s", user_id)
# 3. 캐시에 저장 (TTL 1시간)
r.setex(cache_key, 3600, json.dumps(user))
return user
# Write-Through 패턴: 쓰기 시 캐시도 동시 업데이트
def update_user_profile(user_id: int, data: dict) -> None:
cache_key = f"user:profile:{user_id}"
# DB 업데이트
db.execute("UPDATE users SET ... WHERE id = %s", user_id)
# 캐시도 즉시 업데이트
updated = get_user_from_db(user_id)
r.setex(cache_key, 3600, json.dumps(updated))
# Write-Behind (Write-Back) 패턴: 비동기 DB 저장
class WriteBehindCache:
def __init__(self):
self.dirty_keys_set = "cache:dirty_keys"
def write(self, key: str, value: dict, ttl: int = 3600):
# 캐시에 즉시 쓰기
r.setex(key, ttl, json.dumps(value))
# 더티 키 목록에 추가 (나중에 DB에 flush)
r.sadd(self.dirty_keys_set, key)
def flush_to_db(self):
dirty_keys = r.smembers(self.dirty_keys_set)
for key in dirty_keys:
data = r.get(key)
if data:
db.upsert(json.loads(data))
r.srem(self.dirty_keys_set, key)
# 분산 락 (Redlock)
import time
def acquire_lock(lock_name: str, timeout: int = 10) -> Optional[str]:
identifier = str(time.time())
lock_key = f"lock:{lock_name}"
acquired = r.set(lock_key, identifier, nx=True, ex=timeout)
return identifier if acquired else None
def release_lock(lock_name: str, identifier: str) -> bool:
lock_key = f"lock:{lock_name}"
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = r.eval(lua_script, 1, lock_key, identifier)
return bool(result)
Redis 데이터 구조 활용:
# Sorted Set: 실시간 랭킹 보드
def update_score(player: str, score: int):
r.zadd("leaderboard", {player: score})
def get_top_players(n: int = 10):
return r.zrevrange("leaderboard", 0, n-1, withscores=True)
# HyperLogLog: 유니크 방문자 수 (근사값, 메모리 효율적)
def track_visitor(page: str, user_id: str):
r.pfadd(f"visitors:{page}:{today()}", user_id)
def get_unique_visitors(page: str, date: str) -> int:
return r.pfcount(f"visitors:{page}:{date}")
# Pub/Sub: 실시간 알림
import threading
def publisher():
for i in range(10):
r.publish("notifications", json.dumps({"type": "alert", "msg": f"Event {i}"}))
def subscriber():
pubsub = r.pubsub()
pubsub.subscribe("notifications")
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
print(f"Received: {data}")
3.2 MongoDB: 도큐먼트 모델링
from pymongo import MongoClient, ASCENDING, DESCENDING
from datetime import datetime
client = MongoClient('mongodb://localhost:27017/')
db = client['ecommerce']
# 임베디드 도큐먼트 vs 레퍼런스 설계
# 임베디드: 자주 함께 조회되는 데이터
orders_collection = db['orders']
sample_order = {
"_id": "order_12345",
"user_id": "user_67890",
"status": "shipped",
"created_at": datetime.utcnow(),
# 주소는 임베디드 (변경 이력 보존)
"shipping_address": {
"street": "서울시 강남구 테헤란로 123",
"city": "서울",
"zip": "06234"
},
# 상품 스냅샷 임베디드 (가격 변동 보존)
"items": [
{"product_id": "p001", "name": "노트북", "price": 1200000, "qty": 1},
{"product_id": "p002", "name": "마우스", "price": 35000, "qty": 2}
],
"total": 1270000
}
# Aggregation Pipeline
pipeline = [
# 1단계: 최근 30일 주문 필터
{"$match": {
"created_at": {"$gte": datetime(2026, 2, 17)},
"status": {"$in": ["delivered", "shipped"]}
}},
# 2단계: 아이템 언와인드
{"$unwind": "$items"},
# 3단계: 상품별 집계
{"$group": {
"_id": "$items.product_id",
"product_name": {"$first": "$items.name"},
"total_qty": {"$sum": "$items.qty"},
"total_revenue": {"$sum": {"$multiply": ["$items.price", "$items.qty"]}}
}},
# 4단계: 매출 기준 정렬
{"$sort": {"total_revenue": -1}},
# 5단계: 상위 10개
{"$limit": 10},
# 6단계: 결과 재구성
{"$project": {
"product_id": "$_id",
"product_name": 1,
"total_qty": 1,
"total_revenue": 1,
"_id": 0
}}
]
top_products = list(orders_collection.aggregate(pipeline))
3.3 Cassandra: Wide-Column 모델
Cassandra는 쓰기 집약적이고 지리적으로 분산된 대규모 시스템에 적합합니다.
-- Cassandra CQL: 쿼리 패턴 중심 테이블 설계
-- "쿼리를 먼저 정의하고, 테이블을 거기에 맞춘다"
-- 사용자별 타임라인 조회 쿼리를 위한 테이블
CREATE TABLE user_timeline (
user_id UUID,
created_at TIMEUUID,
post_id UUID,
content TEXT,
likes COUNTER,
PRIMARY KEY (user_id, created_at)
) WITH CLUSTERING ORDER BY (created_at DESC)
AND compaction = {
'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': 7
};
-- 태그별 게시물 검색 테이블 (별도로 비정규화)
CREATE TABLE posts_by_tag (
tag TEXT,
created_at TIMEUUID,
post_id UUID,
user_id UUID,
title TEXT,
PRIMARY KEY (tag, created_at, post_id)
) WITH CLUSTERING ORDER BY (created_at DESC);
4. 벡터 데이터베이스: AI 시대의 핵심
4.1 pgvector: PostgreSQL에서 벡터 검색
-- pgvector 확장 설치
CREATE EXTENSION IF NOT EXISTS vector;
-- 임베딩 저장 테이블
CREATE TABLE document_embeddings (
id BIGSERIAL PRIMARY KEY,
content TEXT NOT NULL,
metadata JSONB DEFAULT '{}',
embedding vector(1536), -- OpenAI text-embedding-3-small 차원
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- HNSW 인덱스 (고속 ANN 검색)
CREATE INDEX idx_doc_embeddings_hnsw
ON document_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- 코사인 유사도 검색
SELECT
id,
content,
metadata,
1 - (embedding <=> '[0.1, 0.2, ...]'::vector) AS similarity
FROM document_embeddings
ORDER BY embedding <=> '[0.1, 0.2, ...]'::vector
LIMIT 5;
-- 메타데이터 필터링과 벡터 검색 결합 (Hybrid Search)
SELECT
id,
content,
metadata->>'source' AS source,
1 - (embedding <=> query_embedding) AS similarity
FROM document_embeddings
WHERE
metadata->>'language' = 'ko'
AND metadata->>'category' = 'technical'
AND (embedding <=> query_embedding) < 0.3 -- 임계값 필터
ORDER BY embedding <=> query_embedding
LIMIT 10;
벡터 거리 연산자:
<=>: 코사인 거리 (텍스트 임베딩에 가장 적합)<->: L2 유클리드 거리 (이미지 특징 등)<#>: 내적 (Inner Product, 정규화된 벡터에서 코사인과 동일)
4.2 IVFFlat vs HNSW 인덱스 비교
-- IVFFlat: 빌드가 빠르고 메모리 효율적
-- 대용량 데이터셋 초기 구축에 적합
CREATE INDEX idx_ivfflat
ON document_embeddings
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100); -- sqrt(행 수) 권장
-- 검색 품질 조정
SET ivfflat.probes = 10; -- 검색 클러스터 수 (높을수록 정확, 느림)
-- HNSW: 검색 속도가 빠르고 recall 높음
-- 빌드 시간이 길고 메모리 많이 사용
CREATE INDEX idx_hnsw
ON document_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
SET hnsw.ef_search = 40; -- 검색 시 탐색 폭 (높을수록 정확)
4.3 Pinecone, Weaviate, Milvus 비교
| 특성 | pgvector | Pinecone | Weaviate | Milvus |
|---|---|---|---|---|
| 설치 | PostgreSQL 확장 | SaaS | 자체 호스팅/클라우드 | 자체 호스팅/클라우드 |
| 필터링 | SQL 완전 지원 | 메타데이터 필터 | GraphQL + 필터 | 풍부한 필터 |
| 스케일 | 수백만 행 | 수억+ | 수억+ | 수십억+ |
| 비용 | PostgreSQL 비용만 | 사용량 기반 | 오픈소스 무료 | 오픈소스 무료 |
| 하이브리드 검색 | BM25 + 벡터 | 기본 지원 | BM25 + 벡터 내장 | 풍부한 지원 |
| 적합 사례 | 기존 PG 인프라 | 빠른 프로토타입 | 지식 그래프 | 초대규모 |
5. 분산 데이터베이스: 이론과 실전
5.1 CAP Theorem
CAP 정리는 분산 시스템에서 다음 세 가지를 동시에 만족할 수 없다는 이론입니다:
- C (Consistency): 모든 노드가 동일한 최신 데이터를 반환
- A (Availability): 모든 요청이 응답을 받음
- P (Partition Tolerance): 네트워크 파티션 시에도 동작
네트워크 파티션은 실제로 항상 발생할 수 있으므로, 사실상 CP 또는 AP 중 선택합니다.
CP 시스템 (일관성 우선):
- 파티션 발생 시 가용성을 포기
- 예: ZooKeeper, HBase, MongoDB (w:majority)
- 적합: 금융 거래, 재고 관리
AP 시스템 (가용성 우선):
- 파티션 발생 시 최신이 아닐 수 있는 데이터 반환
- 예: DynamoDB, Cassandra, CouchDB
- 적합: 소셜 피드, 설정 정보, DNS
5.2 일관성 모델
강한 일관성 (Strong Consistency)
↓ 성능 저하, 지연 증가
순차 일관성 (Sequential Consistency)
↓
인과 일관성 (Causal Consistency)
↓
최종 일관성 (Eventual Consistency)
↓ 성능 향상, 가용성 증가
단조 읽기 (Monotonic Read)
5.3 Sharding 전략
# Range Sharding: 연속 범위로 분할
def range_shard(user_id: int, num_shards: int = 4) -> int:
shard_size = 250_000_000 # 10억 사용자를 4개 샤드
return min(user_id // shard_size, num_shards - 1)
# Hash Sharding: 균등 분배 (핫스팟 방지)
import hashlib
def hash_shard(key: str, num_shards: int = 8) -> int:
hash_val = int(hashlib.md5(key.encode()).hexdigest(), 16)
return hash_val % num_shards
# Consistent Hashing: 노드 추가/제거 시 리밸런싱 최소화
import bisect
class ConsistentHashRing:
def __init__(self, nodes: list, replicas: int = 150):
self.replicas = replicas
self.ring = {}
self.sorted_keys = []
for node in nodes:
self.add_node(node)
def add_node(self, node: str):
for i in range(self.replicas):
key = self._hash(f"{node}:{i}")
self.ring[key] = node
bisect.insort(self.sorted_keys, key)
def get_node(self, key: str) -> str:
if not self.ring:
return None
hash_val = self._hash(key)
idx = bisect.bisect(self.sorted_keys, hash_val)
if idx == len(self.sorted_keys):
idx = 0
return self.ring[self.sorted_keys[idx]]
def _hash(self, key: str) -> int:
return int(hashlib.md5(key.encode()).hexdigest(), 16)
6. 데이터 모델링
6.1 정규화 vs 비정규화
정규화 (3NF까지):
- 데이터 중복 최소화
- 업데이트 이상 방지
- 쓰기 성능 좋음, 읽기는 JOIN 필요
비정규화:
- 읽기 성능 최적화
- 데이터 중복 허용
- OLAP, 데이터 웨어하우스에 적합
-- 정규화된 스키마
CREATE TABLE categories (id SERIAL PRIMARY KEY, name TEXT);
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name TEXT,
category_id INT REFERENCES categories(id)
);
-- 비정규화된 스키마 (읽기 최적화)
CREATE TABLE products_denormalized (
id SERIAL PRIMARY KEY,
name TEXT,
category_id INT,
category_name TEXT -- 중복 저장으로 JOIN 제거
);
6.2 스타 스키마 (Data Warehouse)
-- 팩트 테이블 (측정값)
CREATE TABLE fact_sales (
sale_id BIGINT,
date_key INT,
product_key INT,
customer_key INT,
store_key INT,
quantity INT,
unit_price DECIMAL(10,2),
total_amount DECIMAL(10,2)
);
-- 디멘션 테이블 (맥락 정보)
CREATE TABLE dim_date (
date_key INT PRIMARY KEY,
full_date DATE,
year INT, quarter INT, month INT, week INT, day_of_week INT
);
CREATE TABLE dim_product (
product_key INT PRIMARY KEY,
product_id TEXT,
name TEXT, category TEXT, brand TEXT, unit_cost DECIMAL(10,2)
);
7. AI 연계: LLM + DB 통합 패턴
7.1 LangChain + pgvector로 RAG 시스템 구축
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_postgres import PGVector
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.schema import Document
# 1. 임베딩 모델 설정
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# 2. pgvector 벡터 스토어 연결
CONNECTION_STRING = "postgresql+psycopg://user:password@localhost:5432/vectordb"
COLLECTION_NAME = "documents"
vector_store = PGVector(
embeddings=embeddings,
collection_name=COLLECTION_NAME,
connection=CONNECTION_STRING,
use_jsonb=True, # 메타데이터 JSONB 저장
)
# 3. 문서 청킹 및 임베딩 저장
def ingest_documents(file_path: str, metadata: dict):
with open(file_path, 'r', encoding='utf-8') as f:
raw_text = f.read()
splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=50,
separators=["\n\n", "\n", ".", "。", " "]
)
chunks = splitter.split_text(raw_text)
documents = [
Document(page_content=chunk, metadata={**metadata, "chunk_index": i})
for i, chunk in enumerate(chunks)
]
ids = vector_store.add_documents(documents)
print(f"Ingested {len(ids)} chunks from {file_path}")
return ids
# 4. RAG 체인 구성
def build_rag_chain(k: int = 5, score_threshold: float = 0.7):
retriever = vector_store.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"k": k, "score_threshold": score_threshold}
)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True,
verbose=True
)
return qa_chain
# 5. 쿼리 실행
rag_chain = build_rag_chain()
result = rag_chain.invoke({"query": "PostgreSQL MVCC는 어떻게 동시성을 처리하나요?"})
print(result['result'])
for doc in result['source_documents']:
print(f"Source: {doc.metadata.get('source')}, Chunk: {doc.metadata.get('chunk_index')}")
7.2 Hybrid Search: BM25 + 벡터 검색 결합
from sqlalchemy import text
def hybrid_search(
query: str,
query_embedding: list,
k: int = 10,
alpha: float = 0.5 # 0=BM25 only, 1=벡터 only
) -> list:
"""RRF (Reciprocal Rank Fusion)로 결과 결합"""
sql = text("""
WITH
vector_search AS (
SELECT id, content, metadata,
ROW_NUMBER() OVER (ORDER BY embedding <=> :embedding) AS rank
FROM document_embeddings
ORDER BY embedding <=> :embedding
LIMIT :k
),
bm25_search AS (
SELECT id, content, metadata,
ROW_NUMBER() OVER (ORDER BY ts_rank(to_tsvector('korean', content),
plainto_tsquery('korean', :query)) DESC) AS rank
FROM document_embeddings
WHERE to_tsvector('korean', content) @@ plainto_tsquery('korean', :query)
LIMIT :k
),
rrf_scores AS (
SELECT
COALESCE(v.id, b.id) AS id,
COALESCE(v.content, b.content) AS content,
COALESCE(v.metadata, b.metadata) AS metadata,
COALESCE(1.0 / (60 + v.rank), 0) * :alpha +
COALESCE(1.0 / (60 + b.rank), 0) * (1 - :alpha) AS rrf_score
FROM vector_search v
FULL OUTER JOIN bm25_search b ON v.id = b.id
)
SELECT id, content, metadata, rrf_score
FROM rrf_scores
ORDER BY rrf_score DESC
LIMIT :k
""")
with engine.connect() as conn:
results = conn.execute(sql, {
"embedding": str(query_embedding),
"query": query,
"k": k,
"alpha": alpha
})
return [dict(row) for row in results]
7.3 임베딩 캐싱 전략
import hashlib
import json
from typing import Optional
class EmbeddingCache:
def __init__(self, redis_client, ttl: int = 86400 * 7): # 7일 캐시
self.redis = redis_client
self.ttl = ttl
def _cache_key(self, text: str, model: str) -> str:
content_hash = hashlib.sha256(f"{model}:{text}".encode()).hexdigest()
return f"embedding:{content_hash}"
def get(self, text: str, model: str) -> Optional[list]:
key = self._cache_key(text, model)
cached = self.redis.get(key)
if cached:
return json.loads(cached)
return None
def set(self, text: str, model: str, embedding: list) -> None:
key = self._cache_key(text, model)
self.redis.setex(key, self.ttl, json.dumps(embedding))
def get_or_compute(self, text: str, model: str, compute_fn) -> list:
cached = self.get(text, model)
if cached:
return cached
embedding = compute_fn(text)
self.set(text, model, embedding)
return embedding
퀴즈: 실력 점검
Q1. B-Tree 인덱스와 Hash 인덱스는 어떤 상황에서 각각 더 적합한가요?
정답: B-Tree는 범위 쿼리, 정렬, LIKE 'prefix%' 검색에 적합합니다. Hash 인덱스는 등치 비교(=)에서만 적합합니다.
설명: B-Tree는 키를 정렬된 트리 구조로 저장하므로 WHERE age > 30, ORDER BY name, BETWEEN 등 범위 연산에서 뛰어납니다. Hash 인덱스는 키를 해시하여 저장하므로 WHERE id = 42처럼 정확한 값 매칭에서 O(1) 성능을 보이지만, 범위 쿼리나 정렬에는 전혀 사용할 수 없습니다. PostgreSQL에서 Hash 인덱스는 WAL 로깅 지원이 추가된 v10 이후 프로덕션에서 사용 가능해졌습니다.
Q2. PostgreSQL MVCC(Multi-Version Concurrency Control)가 동시성을 처리하는 방식은?
정답: 각 트랜잭션이 시작될 때 스냅샷을 찍어, 해당 시점의 데이터 버전만 볼 수 있게 합니다. 읽기는 쓰기를 블록하지 않고, 쓰기도 읽기를 블록하지 않습니다.
설명: PostgreSQL은 행을 업데이트할 때 기존 행을 삭제하지 않고, xmin/xmax 트랜잭션 ID를 함께 저장한 새 버전의 행을 추가합니다. 각 트랜잭션은 시작 시점의 xid를 기준으로 자신이 볼 수 있는 버전을 결정합니다. 오래된 버전은 VACUUM 프로세스가 주기적으로 정리합니다(dead tuple 제거). 이 덕분에 읽기/쓰기 충돌이 최소화되어 높은 동시성을 달성합니다.
Q3. CAP theorem에서 CP 시스템과 AP 시스템의 트레이드오프는 무엇인가요?
정답: CP는 네트워크 파티션 발생 시 일부 요청에 에러를 반환하여 일관성을 보장합니다. AP는 파티션 발생 시에도 응답을 반환하지만 데이터가 최신이 아닐 수 있습니다.
설명: CP 시스템(ZooKeeper, HBase)은 파티션 상황에서 응답을 거부하거나 에러를 반환해 일관성을 유지합니다. 금융 거래, 재고 차감 등 정확성이 중요한 시스템에 적합합니다. AP 시스템(Cassandra, DynamoDB)은 파티션 상황에서도 최선의 응답을 반환하지만, 노드 간 데이터가 잠시 불일치할 수 있으며 최종적 일관성(Eventual Consistency)을 보장합니다. 소셜 미디어 피드, 알림 시스템처럼 약간의 불일치가 허용되는 시스템에 적합합니다.
Q4. 벡터 데이터베이스에서 HNSW 알고리즘이 ANN 검색에 사용되는 이유는?
정답: HNSW(Hierarchical Navigable Small World)는 여러 계층의 그래프를 구성하여 O(log N) 수준의 빠른 근사 최근접 이웃 검색을 가능하게 하며, 높은 recall과 빠른 쿼리 속도를 동시에 달성합니다.
설명: 수백만 개의 벡터에서 정확한 최근접 이웃(KNN)을 찾으려면 모든 벡터와 비교해야 하므로 O(N) 시간이 필요합니다. HNSW는 각 노드가 몇 개의 이웃과만 연결된 소세계 그래프(small-world graph)를 계층적으로 구성합니다. 상위 계층은 대략적 네비게이션에, 하위 계층은 정밀 검색에 사용됩니다. 빌드 파라미터 m(각 노드의 연결 수)과 ef_construction(빌드 시 탐색 폭)이 인덱스 품질을 결정하고, 쿼리 시 ef_search가 recall과 속도의 트레이드오프를 조절합니다.
Q5. Redis의 write-through 전략과 write-behind 전략의 차이는 무엇인가요?
정답: Write-through는 캐시와 DB를 동시에 업데이트하여 일관성을 보장하지만 쓰기 지연이 발생합니다. Write-behind는 캐시만 먼저 업데이트하고 DB 저장을 비동기로 처리하여 쓰기 속도가 빠르지만 데이터 유실 위험이 있습니다.
설명: Write-through는 모든 쓰기 요청에서 캐시와 DB를 동기적으로 업데이트하므로 데이터 일관성이 보장되지만, DB 쓰기 지연이 그대로 사용자에게 전달됩니다. Write-behind(Write-back)는 캐시만 즉시 업데이트하고, 더티(dirty) 데이터를 배치로 DB에 플러시합니다. 쓰기 처리량이 높고 응답 속도가 빠르지만, 플러시 전에 시스템이 장애를 겪으면 캐시의 데이터가 DB에 반영되지 않아 유실될 수 있습니다. 전자상거래의 장바구니, 게임 점수처럼 빈번한 쓰기가 발생하는 곳에 write-behind가 적합합니다.
마무리
데이터베이스 엔지니어링은 단순히 SQL을 작성하는 것을 넘어, 데이터 모델 설계부터 인덱스 전략, 분산 시스템 이론, 그리고 이제는 벡터 검색과 AI 연계까지 광범위한 지식을 요구합니다. 특히 AI 시대에는 임베딩 저장, 시맨틱 검색, LLM과의 통합이 데이터베이스 엔지니어의 핵심 역량이 되었습니다.
이 가이드의 내용을 바탕으로 실제 프로젝트에 적용해보시기 바랍니다. pgvector로 시작하는 RAG 시스템, Redis 캐싱 계층, 그리고 적절한 인덱스 설계만으로도 시스템 성능을 수십 배 향상시킬 수 있습니다.