- Authors

- Name
- Youngju Kim
- @fjvbn20031
データベースエンジニアリング完全攻略: SQLからベクターDB、AI RAGシステムまで
AI時代においても、データベースエンジニアリングはすべてのシステムの根幹です。どれほど強力なLLMがあっても、データを安全に保存し高速に検索する能力なしにはプロダクションシステムは構築できません。ベクター検索とRAGアーキテクチャの台頭により、データベースエンジニアリングの重要性はむしろ高まっています。
このガイドでは、SQL高度テクニック、PostgreSQL実践、NoSQL、ベクターデータベース、分散DBの理論、そしてLLM + DB統合パターンまでを体系的に解説します。
1. リレーショナルDB核心: SQL高度テクニック
1.1 ウィンドウ関数
ウィンドウ関数は、行をグループ化せずに各行のコンテキストを維持しながら計算を行います。分析クエリには不可欠です。
-- 部署別給与ランキングと累積合計
SELECT
employee_id,
name,
department,
salary,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS dept_rank,
SUM(salary) OVER (PARTITION BY department) AS dept_total,
AVG(salary) OVER (
PARTITION BY department
ORDER BY hire_date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) AS rolling_avg_3
FROM employees;
主要なウィンドウ関数:
ROW_NUMBER(): 重複のない連番RANK(): 同率は同じ順位、次の順位はスキップDENSE_RANK(): 同率は同じ順位、次の順位は連続LAG() / LEAD(): 前/次の行の値を参照FIRST_VALUE() / LAST_VALUE(): ウィンドウ内の最初/最後の値
-- 月次売上成長率の計算
SELECT
month,
revenue,
LAG(revenue, 1) OVER (ORDER BY month) AS prev_month,
ROUND(
(revenue - LAG(revenue, 1) OVER (ORDER BY month)) * 100.0
/ NULLIF(LAG(revenue, 1) OVER (ORDER BY month), 0),
2
) AS growth_rate_pct
FROM monthly_sales;
1.2 CTE (共通テーブル式)
CTEは複雑なクエリをステップごとに分離し、可読性と再利用性を高めます。再帰CTEは階層構造の探索に強力です。
-- 再帰CTEで組織図を探索
WITH RECURSIVE org_tree AS (
-- ベースケース: 最上位の従業員
SELECT
employee_id,
name,
manager_id,
0 AS depth,
name::TEXT AS path
FROM employees
WHERE manager_id IS NULL
UNION ALL
-- 再帰ケース
SELECT
e.employee_id,
e.name,
e.manager_id,
ot.depth + 1,
ot.path || ' > ' || e.name
FROM employees e
INNER JOIN org_tree ot ON e.manager_id = ot.employee_id
)
SELECT employee_id, name, depth, path
FROM org_tree
ORDER BY path;
-- 複雑な分析をステップ別CTEで分離
WITH
top_customers AS (
SELECT customer_id, SUM(amount) AS total_spent
FROM orders
WHERE created_at >= NOW() - INTERVAL '90 days'
GROUP BY customer_id
HAVING SUM(amount) > 1000
),
customer_details AS (
SELECT c.*, tc.total_spent
FROM customers c
JOIN top_customers tc ON c.id = tc.customer_id
),
ranked AS (
SELECT *,
NTILE(4) OVER (ORDER BY total_spent DESC) AS quartile
FROM customer_details
)
SELECT * FROM ranked WHERE quartile = 1;
1.3 実行計画の分析 (EXPLAIN ANALYZE)
クエリパフォーマンスの根本原因を特定するには、実行計画を読む能力が不可欠です。
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT u.name, COUNT(o.id) AS order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at > '2025-01-01'
GROUP BY u.id, u.name
ORDER BY order_count DESC
LIMIT 10;
実行計画の出力例:
Limit (cost=1234.56..1234.57 rows=10 width=72) (actual time=45.123..45.125 rows=10 loops=1)
-> Sort (cost=1234.56..1259.56 rows=10000 width=72) (actual time=45.120..45.121 rows=10 loops=1)
Sort Key: (count(o.id)) DESC
Sort Method: top-N heapsort Memory: 25kB
-> HashAggregate (cost=876.00..976.00 rows=10000 width=72)
Group Key: u.id
-> Hash Left Join (cost=345.00..801.00 rows=15000 width=40)
Hash Cond: (o.user_id = u.id)
-> Seq Scan on orders o
-> Hash
-> Index Scan using idx_users_created_at on users u
Planning Time: 1.234 ms
Execution Time: 45.456 ms
チェックポイント:
- Seq Scan: 大きなテーブルに対するSeq Scanはインデックスの必要性を検討
- 推定行数と実際の行数の差が大きい場合は
ANALYZEで統計を更新 - Buffers: shared read vs hit: キャッシュヒット率を確認
- loopsが高い: ネストループの問題、ハッシュジョインやインデックス改善を検討
1.4 インデックス設計戦略
-- 複合インデックス: 選択度の高いカラムを先頭に
CREATE INDEX idx_orders_user_status_date
ON orders (user_id, status, created_at DESC);
-- 部分インデックス: 特定条件のみインデックス化 (領域節約)
CREATE INDEX idx_active_users
ON users (email)
WHERE deleted_at IS NULL AND status = 'active';
-- 式インデックス: 関数の結果をインデックス化
CREATE INDEX idx_users_lower_email
ON users (LOWER(email));
-- BRINインデックス: 時系列データに効率的 (非常に小さい)
CREATE INDEX idx_logs_timestamp_brin
ON application_logs USING BRIN (created_at);
-- GINインデックス: 配列、JSONB、全文検索に最適
CREATE INDEX idx_products_tags_gin
ON products USING GIN (tags);
1.5 トランザクションとACID
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
BEGIN;
-- 口座振替: 原子性の保証
UPDATE accounts SET balance = balance - 500 WHERE id = 1;
UPDATE accounts SET balance = balance + 500 WHERE id = 2;
-- 残高検証
DO $$
DECLARE
bal NUMERIC;
BEGIN
SELECT balance INTO bal FROM accounts WHERE id = 1;
IF bal < 0 THEN
RAISE EXCEPTION 'Insufficient funds';
END IF;
END $$;
COMMIT;
分離レベルと発生しうる問題:
| 分離レベル | ダーティリード | 非繰り返し読み取り | ファントムリード |
|---|---|---|---|
| READ UNCOMMITTED | 可能 | 可能 | 可能 |
| READ COMMITTED | 防止 | 可能 | 可能 |
| REPEATABLE READ | 防止 | 防止 | 可能 |
| SERIALIZABLE | 防止 | 防止 | 防止 |
2. PostgreSQL実践: 高度な機能
2.1 JSONBと半構造化データ
PostgreSQLのJSONBはJSONをバイナリ形式で保存し、高速なクエリとインデックス作成をサポートします。
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}'
);
CREATE INDEX idx_products_metadata ON products USING GIN (metadata);
-- JSONBの演算子
SELECT * FROM products
WHERE metadata @> '{"category": "electronics", "in_stock": true}';
SELECT
name,
metadata->>'brand' AS brand,
(metadata->>'price')::NUMERIC AS price,
metadata->'specs'->>'cpu' AS cpu
FROM products
WHERE metadata ? 'discount_pct'
AND (metadata->>'discount_pct')::NUMERIC > 10;
-- 特定キーのみ更新
UPDATE products
SET metadata = jsonb_set(metadata, '{price}', '29900'::jsonb)
WHERE id = 42;
2.2 テーブルパーティショニング
-- 時系列データのレンジパーティショニング
CREATE TABLE events (
id BIGSERIAL,
user_id INT,
event_type TEXT,
payload JSONB,
created_at TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (created_at);
CREATE OR REPLACE FUNCTION create_monthly_partition(target_date DATE)
RETURNS VOID AS $$
DECLARE
partition_name TEXT;
start_date DATE;
end_date DATE;
BEGIN
start_date := DATE_TRUNC('month', target_date);
end_date := start_date + INTERVAL '1 month';
partition_name := 'events_' || TO_CHAR(start_date, 'YYYY_MM');
EXECUTE FORMAT(
'CREATE TABLE IF NOT EXISTS %I PARTITION OF events
FOR VALUES FROM (%L) TO (%L)',
partition_name, start_date, end_date
);
END;
$$ LANGUAGE plpgsql;
SELECT create_monthly_partition('2026-03-01');
2.3 論理レプリケーション
-- プライマリでパブリケーションを作成
ALTER SYSTEM SET wal_level = logical;
CREATE PUBLICATION app_publication
FOR TABLE users, orders, products
WITH (publish = 'insert, update, delete');
-- レプリカでサブスクライブ
CREATE SUBSCRIPTION app_subscription
CONNECTION 'host=primary-db port=5432 dbname=myapp user=replicator'
PUBLICATION app_publication;
-- レプリケーション状態の監視
SELECT subname, received_lsn, latest_end_lsn, latest_end_time
FROM pg_stat_subscription;
3. NoSQLデータベース実践
3.1 Redis: キャッシュパターン
import redis
import json
import hashlib
import time
from typing import Optional
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Cache-Aside (遅延読み込み)
def get_user_profile(user_id: int) -> dict:
cache_key = f"user:profile:{user_id}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
user = db.query("SELECT * FROM users WHERE id = %s", user_id)
r.setex(cache_key, 3600, json.dumps(user))
return user
# Write-Through: 書き込み時にキャッシュとDBを同期更新
def update_user_profile(user_id: int, data: dict) -> None:
cache_key = f"user:profile:{user_id}"
db.execute("UPDATE users SET ... WHERE id = %s", user_id)
updated = get_user_from_db(user_id)
r.setex(cache_key, 3600, json.dumps(updated))
# Write-Behind: キャッシュのみ即時更新、DBへは非同期フラッシュ
class WriteBehindCache:
def __init__(self):
self.dirty_keys_set = "cache:dirty_keys"
def write(self, key: str, value: dict, ttl: int = 3600):
r.setex(key, ttl, json.dumps(value))
r.sadd(self.dirty_keys_set, key)
def flush_to_db(self):
dirty_keys = r.smembers(self.dirty_keys_set)
for key in dirty_keys:
data = r.get(key)
if data:
db.upsert(json.loads(data))
r.srem(self.dirty_keys_set, key)
# 分散ロック (Redlockパターン)
def acquire_lock(lock_name: str, timeout: int = 10) -> Optional[str]:
identifier = str(time.time())
acquired = r.set(f"lock:{lock_name}", identifier, nx=True, ex=timeout)
return identifier if acquired else None
def release_lock(lock_name: str, identifier: str) -> bool:
lua = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
return bool(r.eval(lua, 1, f"lock:{lock_name}", identifier))
Redisデータ構造の活用:
# Sorted Set: リアルタイムランキング
def update_score(player: str, score: int):
r.zadd("leaderboard", {player: score})
def get_top_players(n: int = 10):
return r.zrevrange("leaderboard", 0, n - 1, withscores=True)
# HyperLogLog: ユニーク訪問者数 (近似値、非常にメモリ効率的)
def track_visitor(page: str, user_id: str):
r.pfadd(f"visitors:{page}", user_id)
def get_unique_visitors(page: str) -> int:
return r.pfcount(f"visitors:{page}")
3.2 MongoDB: ドキュメントモデリング
from pymongo import MongoClient
from datetime import datetime
client = MongoClient('mongodb://localhost:27017/')
db = client['ecommerce']
# 埋め込みドキュメント vs 参照
sample_order = {
"_id": "order_12345",
"user_id": "user_67890",
"status": "shipped",
"created_at": datetime.utcnow(),
"shipping_address": { # 埋め込み: 注文時のスナップショット
"street": "東京都渋谷区渋谷1-1-1",
"city": "東京",
"zip": "150-0002"
},
"items": [ # 埋め込み: 価格スナップショット保持
{"product_id": "p001", "name": "ノートPC", "price": 150000, "qty": 1},
{"product_id": "p002", "name": "マウス", "price": 5000, "qty": 2}
],
"total": 160000
}
# アグリゲーションパイプライン
pipeline = [
{"$match": {
"created_at": {"$gte": datetime(2026, 2, 17)},
"status": {"$in": ["delivered", "shipped"]}
}},
{"$unwind": "$items"},
{"$group": {
"_id": "$items.product_id",
"product_name": {"$first": "$items.name"},
"total_qty": {"$sum": "$items.qty"},
"total_revenue": {"$sum": {"$multiply": ["$items.price", "$items.qty"]}}
}},
{"$sort": {"total_revenue": -1}},
{"$limit": 10},
{"$project": {
"product_id": "$_id",
"product_name": 1,
"total_qty": 1,
"total_revenue": 1,
"_id": 0
}}
]
top_products = list(db['orders'].aggregate(pipeline))
3.3 Cassandra: ワイドカラム設計
Cassandraでは「クエリを先に設計し、テーブルをそれに合わせる」のが鉄則です。
-- クエリファーストのテーブル設計
CREATE TABLE user_timeline (
user_id UUID,
created_at TIMEUUID,
post_id UUID,
content TEXT,
PRIMARY KEY (user_id, created_at)
) WITH CLUSTERING ORDER BY (created_at DESC)
AND compaction = {
'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': 7
};
-- タグ別検索用に非正規化した別テーブル
CREATE TABLE posts_by_tag (
tag TEXT,
created_at TIMEUUID,
post_id UUID,
user_id UUID,
title TEXT,
PRIMARY KEY (tag, created_at, post_id)
) WITH CLUSTERING ORDER BY (created_at DESC);
4. ベクターデータベース: AI時代の核心
4.1 pgvector: PostgreSQLでベクター検索
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE document_embeddings (
id BIGSERIAL PRIMARY KEY,
content TEXT NOT NULL,
metadata JSONB DEFAULT '{}',
embedding vector(1536),
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- HNSW インデックス (高速ANN検索)
CREATE INDEX idx_doc_hnsw
ON document_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- コサイン類似度検索
SELECT
id,
content,
metadata,
1 - (embedding <=> '[0.1, 0.2, ...]'::vector) AS similarity
FROM document_embeddings
ORDER BY embedding <=> '[0.1, 0.2, ...]'::vector
LIMIT 5;
-- メタデータフィルタリングとベクター検索の組み合わせ
SELECT
id,
content,
metadata->>'source' AS source,
1 - (embedding <=> query_vec) AS similarity
FROM document_embeddings
WHERE metadata->>'language' = 'ja'
AND metadata->>'category' = 'technical'
ORDER BY embedding <=> query_vec
LIMIT 10;
距離演算子:
<=>: コサイン距離 — テキスト埋め込みに最適<->: L2ユークリッド距離 — 画像特徴量ベクター<#>: 内積 — 正規化済みベクターではコサインと同等
4.2 IVFFlat vs HNSW インデックスの比較
-- IVFFlat: 構築が速く、メモリ効率的
CREATE INDEX idx_ivfflat
ON document_embeddings
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100); -- 目安: sqrt(行数)
SET ivfflat.probes = 10; -- 多いほど精度向上、速度低下
-- HNSW: 高リコール、高速クエリ、構築時間とメモリが多め
CREATE INDEX idx_hnsw
ON document_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
SET hnsw.ef_search = 40; -- 高いほど精度向上
4.3 ベクターDBの比較
| 特性 | pgvector | Pinecone | Weaviate | Milvus |
|---|---|---|---|---|
| デプロイ | PG拡張 | SaaS | セルフホスト/クラウド | セルフホスト/クラウド |
| フィルタリング | フルSQL | メタデータフィルタ | GraphQL+フィルタ | 豊富なフィルタ |
| スケール | 数百万件 | 数十億件超 | 数億件 | 数十億件超 |
| コスト | PostgreSQLコストのみ | 従量課金 | OSS無料 | OSS無料 |
| ハイブリッド検索 | BM25+ベクター | 標準サポート | BM25+ベクター内蔵 | 豊富なサポート |
| 最適ユースケース | 既存PGインフラ | 迅速なプロト | 知識グラフ | 超大規模 |
5. 分散データベース: 理論と実践
5.1 CAP定理
CAP定理は、分散システムが以下の3つを同時に満たすことはできないという理論です:
- C (Consistency): すべてのノードが最新の同一データを返す
- A (Availability): すべてのリクエストがレスポンスを受け取る
- P (Partition Tolerance): ネットワーク分断時でも動作する
ネットワーク分断は実際に発生しうるため、事実上CPかAPの選択になります。
CPシステム (一貫性優先):
- 分断時は可用性を犠牲にして正確性を維持
- 例: ZooKeeper, HBase, MongoDB (w:majority)
- 適用: 金融取引、在庫管理
APシステム (可用性優先):
- 分断時でも応答するが、データが最新でない場合がある
- 例: DynamoDB, Cassandra, CouchDB
- 適用: ソーシャルフィード、DNS、通知システム
5.2 一貫性モデルのスペクトル
強い一貫性 (Strong Consistency) ← 高レイテンシ、低スループット
↓
順次一貫性 (Sequential Consistency)
↓
因果一貫性 (Causal Consistency)
↓
最終的一貫性 (Eventual Consistency) ← 低レイテンシ、高スループット
5.3 シャーディング戦略
# レンジシャーディング: 連続範囲でシャードに分割
def range_shard(user_id: int, num_shards: int = 4) -> int:
shard_size = 250_000_000
return min(user_id // shard_size, num_shards - 1)
# ハッシュシャーディング: 均等分散、ホットスポット防止
import hashlib
def hash_shard(key: str, num_shards: int = 8) -> int:
h = int(hashlib.md5(key.encode()).hexdigest(), 16)
return h % num_shards
# 一貫性ハッシュ: ノード追加/削除時のリバランスを最小化
import bisect
class ConsistentHashRing:
def __init__(self, nodes: list, replicas: int = 150):
self.replicas = replicas
self.ring: dict = {}
self.sorted_keys: list = []
for node in nodes:
self.add_node(node)
def add_node(self, node: str):
for i in range(self.replicas):
key = self._hash(f"{node}:{i}")
self.ring[key] = node
bisect.insort(self.sorted_keys, key)
def get_node(self, key: str) -> str:
if not self.ring:
return None
h = self._hash(key)
idx = bisect.bisect(self.sorted_keys, h)
if idx == len(self.sorted_keys):
idx = 0
return self.ring[self.sorted_keys[idx]]
def _hash(self, key: str) -> int:
return int(hashlib.md5(key.encode()).hexdigest(), 16)
6. データモデリング
6.1 正規化 vs 非正規化
正規化 (第3正規形まで):
- データの重複を最小化
- 更新異常を防止
- 書き込みパフォーマンスが良い
- 読み取りにはJOINが必要
非正規化:
- 読み取りパフォーマンスを最適化
- データの重複を許容
- OLAP、データウェアハウスに適合
-- 正規化スキーマ
CREATE TABLE categories (id SERIAL PRIMARY KEY, name TEXT);
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name TEXT,
category_id INT REFERENCES categories(id)
);
-- 非正規化スキーマ (JOIN不要で高速読み取り)
CREATE TABLE products_denormalized (
id SERIAL PRIMARY KEY,
name TEXT,
category_id INT,
category_name TEXT -- 重複保存でJOINを排除
);
6.2 スタースキーマ (データウェアハウス)
-- ファクトテーブル (計測値)
CREATE TABLE fact_sales (
sale_id BIGINT,
date_key INT,
product_key INT,
customer_key INT,
store_key INT,
quantity INT,
unit_price DECIMAL(10,2),
total_amount DECIMAL(10,2)
);
-- ディメンションテーブル (文脈情報)
CREATE TABLE dim_date (
date_key INT PRIMARY KEY,
full_date DATE,
year INT, quarter INT, month INT, week INT, day_of_week INT
);
CREATE TABLE dim_product (
product_key INT PRIMARY KEY,
product_id TEXT,
name TEXT, category TEXT, brand TEXT, unit_cost DECIMAL(10,2)
);
7. AI連携: LLM + DB統合パターン
7.1 LangChain + pgvectorでRAGシステムを構築
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_postgres import PGVector
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.schema import Document
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
CONNECTION_STRING = "postgresql+psycopg://user:password@localhost:5432/vectordb"
vector_store = PGVector(
embeddings=embeddings,
collection_name="documents",
connection=CONNECTION_STRING,
use_jsonb=True,
)
# ドキュメントのチャンク化・埋め込み・保存
def ingest_documents(file_path: str, metadata: dict):
with open(file_path, 'r', encoding='utf-8') as f:
raw_text = f.read()
splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=50,
separators=["\n\n", "\n", "。", ".", " "]
)
chunks = splitter.split_text(raw_text)
documents = [
Document(page_content=chunk, metadata={**metadata, "chunk_index": i})
for i, chunk in enumerate(chunks)
]
return vector_store.add_documents(documents)
# RAGチェーンの構築
def build_rag_chain(k: int = 5, score_threshold: float = 0.7):
retriever = vector_store.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"k": k, "score_threshold": score_threshold}
)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
return RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True,
)
rag_chain = build_rag_chain()
result = rag_chain.invoke({"query": "PostgreSQL MVCCはどのように同時実行を処理しますか?"})
print(result['result'])
7.2 ハイブリッド検索: BM25 + ベクター検索の融合
from sqlalchemy import text
def hybrid_search(
query: str,
query_embedding: list,
k: int = 10,
alpha: float = 0.5, # 0=BM25のみ、1=ベクターのみ
) -> list:
sql = text("""
WITH
vector_results AS (
SELECT id, content, metadata,
ROW_NUMBER() OVER (ORDER BY embedding <=> :embedding) AS rank
FROM document_embeddings
ORDER BY embedding <=> :embedding
LIMIT :k
),
bm25_results AS (
SELECT id, content, metadata,
ROW_NUMBER() OVER (ORDER BY ts_rank(
to_tsvector('japanese', content),
plainto_tsquery('japanese', :query)
) DESC) AS rank
FROM document_embeddings
WHERE to_tsvector('japanese', content) @@ plainto_tsquery('japanese', :query)
LIMIT :k
),
rrf AS (
SELECT
COALESCE(v.id, b.id) AS id,
COALESCE(v.content, b.content) AS content,
COALESCE(v.metadata, b.metadata) AS metadata,
COALESCE(1.0 / (60 + v.rank), 0) * :alpha +
COALESCE(1.0 / (60 + b.rank), 0) * (1 - :alpha) AS score
FROM vector_results v
FULL OUTER JOIN bm25_results b ON v.id = b.id
)
SELECT id, content, metadata, score
FROM rrf
ORDER BY score DESC
LIMIT :k
""")
with engine.connect() as conn:
rows = conn.execute(sql, {
"embedding": str(query_embedding),
"query": query,
"k": k,
"alpha": alpha,
})
return [dict(row) for row in rows]
7.3 Redisを使った埋め込みキャッシュ
import hashlib, json
from typing import Optional
class EmbeddingCache:
def __init__(self, redis_client, ttl: int = 86400 * 7): # 7日間キャッシュ
self.redis = redis_client
self.ttl = ttl
def _key(self, text: str, model: str) -> str:
h = hashlib.sha256(f"{model}:{text}".encode()).hexdigest()
return f"embedding:{h}"
def get(self, text: str, model: str) -> Optional[list]:
raw = self.redis.get(self._key(text, model))
return json.loads(raw) if raw else None
def set(self, text: str, model: str, embedding: list):
self.redis.setex(self._key(text, model), self.ttl, json.dumps(embedding))
def get_or_compute(self, text: str, model: str, compute_fn) -> list:
cached = self.get(text, model)
if cached:
return cached
embedding = compute_fn(text)
self.set(text, model, embedding)
return embedding
クイズ: 実力確認
Q1. B-TreeインデックスとHashインデックスはそれぞれどのような状況に適していますか?
答え: B-Treeは範囲クエリ、ソート、LIKEプレフィックス検索に適しています。Hashインデックスは等値比較(=)のみに適しています。
解説: B-Treeはキーをソートされたツリー構造で保存するため、WHERE age > 30やORDER BY name、BETWEENなどの範囲演算子に優れています。Hashインデックスはキーをハッシュ化するため、WHERE id = 42のような正確な値の一致ではO(1)のパフォーマンスを発揮しますが、範囲クエリやソートには全く使用できません。PostgreSQLでは、WALログ完全サポートはv10以降で、プロダクション使用が安全になりました。
Q2. PostgreSQL MVCCが同時実行を処理する仕組みは?
答え: 各トランザクションは開始時にスナップショットを取得し、そのスナップショット時点のデータバージョンのみを参照します。読み取りは書き込みをブロックせず、書き込みも読み取りをブロックしません。
解説: PostgreSQLは行の更新時に既存の行を削除せず、xminとxmaxトランザクションIDをタグした新バージョンの行を追加します。各トランザクションは自身のxidスナップショットを基に、どのバージョンが参照可能かを決定します。古いバージョン(デッドタプル)はVACUUMバックグラウンドプロセスが定期的にクリーンアップします。これにより、読み取り/書き込みの競合が最小化され、高い同時実行性を実現します。
Q3. CAP定理におけるCPシステムとAPシステムのトレードオフは何ですか?
答え: CPシステムはネットワーク分断時に一部のリクエストにエラーを返して一貫性を保証します。APシステムは分断時でも応答を返しますが、データが最新でない場合があります。
解説: CPシステム(ZooKeeper, HBase)は分断状況でリクエストを拒否したりエラーを返したりして一貫性を維持します。金融取引、在庫管理など正確性が重要なシステムに適しています。APシステム(Cassandra, DynamoDB)は分断時でも最善の応答を返しますが、ノード間データが一時的に不一致になる場合があり、最終的一貫性(Eventual Consistency)を保証します。ソーシャルメディアフィード、通知システムのように多少の不一致が許容されるシステムに適しています。
Q4. ベクターデータベースでHNSWアルゴリズムがANN検索に使われる理由は?
答え: HNSW(Hierarchical Navigable Small World)は複数階層のグラフを構築し、O(log N)レベルの高速な近似最近傍検索を可能にします。高いリコール率と高速なクエリ速度を同時に実現します。
解説: 数百万のベクターから正確な最近傍(KNN)を見つけるには、すべてのベクターと比較する必要がありO(N)時間が必要です。HNSWは各ノードが少数の隣接ノードとのみ接続された小世界グラフ(small-world graph)を階層的に構築します。上位層は大まかなナビゲーション、下位層は精密な検索に使用されます。構築パラメータのm(各ノードの接続数)とef_construction(構築時の探索幅)がインデックスの品質を決定し、クエリ時のef_searchがリコールと速度のトレードオフを調整します。
Q5. Redisのwrite-through戦略とwrite-behind戦略の違いは何ですか?
答え: write-throughはキャッシュとDBを同期的に同時更新して一貫性を保証しますが、書き込みレイテンシが発生します。write-behindはキャッシュのみを先に更新し、DBへの保存を非同期で行うため書き込み速度が速いですが、データ消失のリスクがあります。
解説: write-throughはすべての書き込みリクエストでキャッシュとDBを同期的に更新するため、データの一貫性が保証されますが、DBの書き込みレイテンシがそのままユーザーに伝わります。write-behind(write-back)はキャッシュのみを即時更新し、ダーティ(dirty)なデータをバッチでDBにフラッシュします。書き込みスループットが高く応答速度が速いですが、フラッシュ前にシステム障害が発生すると、キャッシュのデータがDBに反映されず消失する可能性があります。ECサイトのカート、ゲームスコアのような頻繁な書き込みが発生する場所にwrite-behindが適しています。
まとめ
データベースエンジニアリングは単にSQLを書くことを超え、データモデル設計からインデックス戦略、分散システム理論、そして今やベクター検索とAI連携まで幅広い知識を要求します。特にAI時代には、埋め込み保存、セマンティック検索、LLMとの統合がデータベースエンジニアの核心スキルとなっています。
このガイドの内容を実際のプロジェクトに適用してみてください。pgvectorを使ったRAGシステム、Redisキャッシングレイヤー、そして適切なインデックス設計だけでも、システムパフォーマンスを大幅に向上させることができます。