Skip to content
Published on

データベースエンジニアリング完全攻略: SQLからベクターDB、AI RAGシステムまで

Authors

データベースエンジニアリング完全攻略: SQLからベクターDB、AI RAGシステムまで

AI時代においても、データベースエンジニアリングはすべてのシステムの根幹です。どれほど強力なLLMがあっても、データを安全に保存し高速に検索する能力なしにはプロダクションシステムは構築できません。ベクター検索とRAGアーキテクチャの台頭により、データベースエンジニアリングの重要性はむしろ高まっています。

このガイドでは、SQL高度テクニック、PostgreSQL実践、NoSQL、ベクターデータベース、分散DBの理論、そしてLLM + DB統合パターンまでを体系的に解説します。


1. リレーショナルDB核心: SQL高度テクニック

1.1 ウィンドウ関数

ウィンドウ関数は、行をグループ化せずに各行のコンテキストを維持しながら計算を行います。分析クエリには不可欠です。

-- 部署別給与ランキングと累積合計
SELECT
    employee_id,
    name,
    department,
    salary,
    RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS dept_rank,
    SUM(salary) OVER (PARTITION BY department) AS dept_total,
    AVG(salary) OVER (
        PARTITION BY department
        ORDER BY hire_date
        ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
    ) AS rolling_avg_3
FROM employees;

主要なウィンドウ関数:

  • ROW_NUMBER(): 重複のない連番
  • RANK(): 同率は同じ順位、次の順位はスキップ
  • DENSE_RANK(): 同率は同じ順位、次の順位は連続
  • LAG() / LEAD(): 前/次の行の値を参照
  • FIRST_VALUE() / LAST_VALUE(): ウィンドウ内の最初/最後の値
-- 月次売上成長率の計算
SELECT
    month,
    revenue,
    LAG(revenue, 1) OVER (ORDER BY month) AS prev_month,
    ROUND(
        (revenue - LAG(revenue, 1) OVER (ORDER BY month)) * 100.0
        / NULLIF(LAG(revenue, 1) OVER (ORDER BY month), 0),
        2
    ) AS growth_rate_pct
FROM monthly_sales;

1.2 CTE (共通テーブル式)

CTEは複雑なクエリをステップごとに分離し、可読性と再利用性を高めます。再帰CTEは階層構造の探索に強力です。

-- 再帰CTEで組織図を探索
WITH RECURSIVE org_tree AS (
    -- ベースケース: 最上位の従業員
    SELECT
        employee_id,
        name,
        manager_id,
        0 AS depth,
        name::TEXT AS path
    FROM employees
    WHERE manager_id IS NULL

    UNION ALL

    -- 再帰ケース
    SELECT
        e.employee_id,
        e.name,
        e.manager_id,
        ot.depth + 1,
        ot.path || ' > ' || e.name
    FROM employees e
    INNER JOIN org_tree ot ON e.manager_id = ot.employee_id
)
SELECT employee_id, name, depth, path
FROM org_tree
ORDER BY path;
-- 複雑な分析をステップ別CTEで分離
WITH
top_customers AS (
    SELECT customer_id, SUM(amount) AS total_spent
    FROM orders
    WHERE created_at >= NOW() - INTERVAL '90 days'
    GROUP BY customer_id
    HAVING SUM(amount) > 1000
),
customer_details AS (
    SELECT c.*, tc.total_spent
    FROM customers c
    JOIN top_customers tc ON c.id = tc.customer_id
),
ranked AS (
    SELECT *,
           NTILE(4) OVER (ORDER BY total_spent DESC) AS quartile
    FROM customer_details
)
SELECT * FROM ranked WHERE quartile = 1;

1.3 実行計画の分析 (EXPLAIN ANALYZE)

クエリパフォーマンスの根本原因を特定するには、実行計画を読む能力が不可欠です。

EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT u.name, COUNT(o.id) AS order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at > '2025-01-01'
GROUP BY u.id, u.name
ORDER BY order_count DESC
LIMIT 10;

実行計画の出力例:

Limit  (cost=1234.56..1234.57 rows=10 width=72) (actual time=45.123..45.125 rows=10 loops=1)
  ->  Sort  (cost=1234.56..1259.56 rows=10000 width=72) (actual time=45.120..45.121 rows=10 loops=1)
        Sort Key: (count(o.id)) DESC
        Sort Method: top-N heapsort  Memory: 25kB
        ->  HashAggregate  (cost=876.00..976.00 rows=10000 width=72)
              Group Key: u.id
              ->  Hash Left Join  (cost=345.00..801.00 rows=15000 width=40)
                    Hash Cond: (o.user_id = u.id)
                    ->  Seq Scan on orders o
                    ->  Hash
                          ->  Index Scan using idx_users_created_at on users u
Planning Time: 1.234 ms
Execution Time: 45.456 ms

チェックポイント:

  • Seq Scan: 大きなテーブルに対するSeq Scanはインデックスの必要性を検討
  • 推定行数と実際の行数の差が大きい場合はANALYZEで統計を更新
  • Buffers: shared read vs hit: キャッシュヒット率を確認
  • loopsが高い: ネストループの問題、ハッシュジョインやインデックス改善を検討

1.4 インデックス設計戦略

-- 複合インデックス: 選択度の高いカラムを先頭に
CREATE INDEX idx_orders_user_status_date
ON orders (user_id, status, created_at DESC);

-- 部分インデックス: 特定条件のみインデックス化 (領域節約)
CREATE INDEX idx_active_users
ON users (email)
WHERE deleted_at IS NULL AND status = 'active';

-- 式インデックス: 関数の結果をインデックス化
CREATE INDEX idx_users_lower_email
ON users (LOWER(email));

-- BRINインデックス: 時系列データに効率的 (非常に小さい)
CREATE INDEX idx_logs_timestamp_brin
ON application_logs USING BRIN (created_at);

-- GINインデックス: 配列、JSONB、全文検索に最適
CREATE INDEX idx_products_tags_gin
ON products USING GIN (tags);

1.5 トランザクションとACID

SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;

BEGIN;
  -- 口座振替: 原子性の保証
  UPDATE accounts SET balance = balance - 500 WHERE id = 1;
  UPDATE accounts SET balance = balance + 500 WHERE id = 2;

  -- 残高検証
  DO $$
  DECLARE
    bal NUMERIC;
  BEGIN
    SELECT balance INTO bal FROM accounts WHERE id = 1;
    IF bal < 0 THEN
      RAISE EXCEPTION 'Insufficient funds';
    END IF;
  END $$;

COMMIT;

分離レベルと発生しうる問題:

分離レベルダーティリード非繰り返し読み取りファントムリード
READ UNCOMMITTED可能可能可能
READ COMMITTED防止可能可能
REPEATABLE READ防止防止可能
SERIALIZABLE防止防止防止

2. PostgreSQL実践: 高度な機能

2.1 JSONBと半構造化データ

PostgreSQLのJSONBはJSONをバイナリ形式で保存し、高速なクエリとインデックス作成をサポートします。

CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name TEXT NOT NULL,
    metadata JSONB NOT NULL DEFAULT '{}'
);

CREATE INDEX idx_products_metadata ON products USING GIN (metadata);

-- JSONBの演算子
SELECT * FROM products
WHERE metadata @> '{"category": "electronics", "in_stock": true}';

SELECT
    name,
    metadata->>'brand' AS brand,
    (metadata->>'price')::NUMERIC AS price,
    metadata->'specs'->>'cpu' AS cpu
FROM products
WHERE metadata ? 'discount_pct'
  AND (metadata->>'discount_pct')::NUMERIC > 10;

-- 特定キーのみ更新
UPDATE products
SET metadata = jsonb_set(metadata, '{price}', '29900'::jsonb)
WHERE id = 42;

2.2 テーブルパーティショニング

-- 時系列データのレンジパーティショニング
CREATE TABLE events (
    id BIGSERIAL,
    user_id INT,
    event_type TEXT,
    payload JSONB,
    created_at TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (created_at);

CREATE OR REPLACE FUNCTION create_monthly_partition(target_date DATE)
RETURNS VOID AS $$
DECLARE
    partition_name TEXT;
    start_date DATE;
    end_date DATE;
BEGIN
    start_date := DATE_TRUNC('month', target_date);
    end_date := start_date + INTERVAL '1 month';
    partition_name := 'events_' || TO_CHAR(start_date, 'YYYY_MM');

    EXECUTE FORMAT(
        'CREATE TABLE IF NOT EXISTS %I PARTITION OF events
         FOR VALUES FROM (%L) TO (%L)',
        partition_name, start_date, end_date
    );
END;
$$ LANGUAGE plpgsql;

SELECT create_monthly_partition('2026-03-01');

2.3 論理レプリケーション

-- プライマリでパブリケーションを作成
ALTER SYSTEM SET wal_level = logical;

CREATE PUBLICATION app_publication
FOR TABLE users, orders, products
WITH (publish = 'insert, update, delete');

-- レプリカでサブスクライブ
CREATE SUBSCRIPTION app_subscription
CONNECTION 'host=primary-db port=5432 dbname=myapp user=replicator'
PUBLICATION app_publication;

-- レプリケーション状態の監視
SELECT subname, received_lsn, latest_end_lsn, latest_end_time
FROM pg_stat_subscription;

3. NoSQLデータベース実践

3.1 Redis: キャッシュパターン

import redis
import json
import hashlib
import time
from typing import Optional

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# Cache-Aside (遅延読み込み)
def get_user_profile(user_id: int) -> dict:
    cache_key = f"user:profile:{user_id}"

    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    user = db.query("SELECT * FROM users WHERE id = %s", user_id)
    r.setex(cache_key, 3600, json.dumps(user))
    return user

# Write-Through: 書き込み時にキャッシュとDBを同期更新
def update_user_profile(user_id: int, data: dict) -> None:
    cache_key = f"user:profile:{user_id}"
    db.execute("UPDATE users SET ... WHERE id = %s", user_id)
    updated = get_user_from_db(user_id)
    r.setex(cache_key, 3600, json.dumps(updated))

# Write-Behind: キャッシュのみ即時更新、DBへは非同期フラッシュ
class WriteBehindCache:
    def __init__(self):
        self.dirty_keys_set = "cache:dirty_keys"

    def write(self, key: str, value: dict, ttl: int = 3600):
        r.setex(key, ttl, json.dumps(value))
        r.sadd(self.dirty_keys_set, key)

    def flush_to_db(self):
        dirty_keys = r.smembers(self.dirty_keys_set)
        for key in dirty_keys:
            data = r.get(key)
            if data:
                db.upsert(json.loads(data))
                r.srem(self.dirty_keys_set, key)

# 分散ロック (Redlockパターン)
def acquire_lock(lock_name: str, timeout: int = 10) -> Optional[str]:
    identifier = str(time.time())
    acquired = r.set(f"lock:{lock_name}", identifier, nx=True, ex=timeout)
    return identifier if acquired else None

def release_lock(lock_name: str, identifier: str) -> bool:
    lua = """
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end
    """
    return bool(r.eval(lua, 1, f"lock:{lock_name}", identifier))

Redisデータ構造の活用:

# Sorted Set: リアルタイムランキング
def update_score(player: str, score: int):
    r.zadd("leaderboard", {player: score})

def get_top_players(n: int = 10):
    return r.zrevrange("leaderboard", 0, n - 1, withscores=True)

# HyperLogLog: ユニーク訪問者数 (近似値、非常にメモリ効率的)
def track_visitor(page: str, user_id: str):
    r.pfadd(f"visitors:{page}", user_id)

def get_unique_visitors(page: str) -> int:
    return r.pfcount(f"visitors:{page}")

3.2 MongoDB: ドキュメントモデリング

from pymongo import MongoClient
from datetime import datetime

client = MongoClient('mongodb://localhost:27017/')
db = client['ecommerce']

# 埋め込みドキュメント vs 参照
sample_order = {
    "_id": "order_12345",
    "user_id": "user_67890",
    "status": "shipped",
    "created_at": datetime.utcnow(),
    "shipping_address": {           # 埋め込み: 注文時のスナップショット
        "street": "東京都渋谷区渋谷1-1-1",
        "city": "東京",
        "zip": "150-0002"
    },
    "items": [                      # 埋め込み: 価格スナップショット保持
        {"product_id": "p001", "name": "ノートPC", "price": 150000, "qty": 1},
        {"product_id": "p002", "name": "マウス",   "price":   5000, "qty": 2}
    ],
    "total": 160000
}

# アグリゲーションパイプライン
pipeline = [
    {"$match": {
        "created_at": {"$gte": datetime(2026, 2, 17)},
        "status": {"$in": ["delivered", "shipped"]}
    }},
    {"$unwind": "$items"},
    {"$group": {
        "_id": "$items.product_id",
        "product_name":  {"$first": "$items.name"},
        "total_qty":     {"$sum": "$items.qty"},
        "total_revenue": {"$sum": {"$multiply": ["$items.price", "$items.qty"]}}
    }},
    {"$sort": {"total_revenue": -1}},
    {"$limit": 10},
    {"$project": {
        "product_id":   "$_id",
        "product_name": 1,
        "total_qty":    1,
        "total_revenue": 1,
        "_id": 0
    }}
]

top_products = list(db['orders'].aggregate(pipeline))

3.3 Cassandra: ワイドカラム設計

Cassandraでは「クエリを先に設計し、テーブルをそれに合わせる」のが鉄則です。

-- クエリファーストのテーブル設計
CREATE TABLE user_timeline (
    user_id    UUID,
    created_at TIMEUUID,
    post_id    UUID,
    content    TEXT,
    PRIMARY KEY (user_id, created_at)
) WITH CLUSTERING ORDER BY (created_at DESC)
  AND compaction = {
    'class': 'TimeWindowCompactionStrategy',
    'compaction_window_unit': 'DAYS',
    'compaction_window_size': 7
  };

-- タグ別検索用に非正規化した別テーブル
CREATE TABLE posts_by_tag (
    tag        TEXT,
    created_at TIMEUUID,
    post_id    UUID,
    user_id    UUID,
    title      TEXT,
    PRIMARY KEY (tag, created_at, post_id)
) WITH CLUSTERING ORDER BY (created_at DESC);

4. ベクターデータベース: AI時代の核心

4.1 pgvector: PostgreSQLでベクター検索

CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE document_embeddings (
    id         BIGSERIAL PRIMARY KEY,
    content    TEXT NOT NULL,
    metadata   JSONB DEFAULT '{}',
    embedding  vector(1536),
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- HNSW インデックス (高速ANN検索)
CREATE INDEX idx_doc_hnsw
ON document_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- コサイン類似度検索
SELECT
    id,
    content,
    metadata,
    1 - (embedding <=> '[0.1, 0.2, ...]'::vector) AS similarity
FROM document_embeddings
ORDER BY embedding <=> '[0.1, 0.2, ...]'::vector
LIMIT 5;

-- メタデータフィルタリングとベクター検索の組み合わせ
SELECT
    id,
    content,
    metadata->>'source' AS source,
    1 - (embedding <=> query_vec) AS similarity
FROM document_embeddings
WHERE metadata->>'language' = 'ja'
  AND metadata->>'category' = 'technical'
ORDER BY embedding <=> query_vec
LIMIT 10;

距離演算子:

  • <=>: コサイン距離 — テキスト埋め込みに最適
  • <->: L2ユークリッド距離 — 画像特徴量ベクター
  • <#>: 内積 — 正規化済みベクターではコサインと同等

4.2 IVFFlat vs HNSW インデックスの比較

-- IVFFlat: 構築が速く、メモリ効率的
CREATE INDEX idx_ivfflat
ON document_embeddings
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);       -- 目安: sqrt(行数)

SET ivfflat.probes = 10;  -- 多いほど精度向上、速度低下

-- HNSW: 高リコール、高速クエリ、構築時間とメモリが多め
CREATE INDEX idx_hnsw
ON document_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

SET hnsw.ef_search = 40;  -- 高いほど精度向上

4.3 ベクターDBの比較

特性pgvectorPineconeWeaviateMilvus
デプロイPG拡張SaaSセルフホスト/クラウドセルフホスト/クラウド
フィルタリングフルSQLメタデータフィルタGraphQL+フィルタ豊富なフィルタ
スケール数百万件数十億件超数億件数十億件超
コストPostgreSQLコストのみ従量課金OSS無料OSS無料
ハイブリッド検索BM25+ベクター標準サポートBM25+ベクター内蔵豊富なサポート
最適ユースケース既存PGインフラ迅速なプロト知識グラフ超大規模

5. 分散データベース: 理論と実践

5.1 CAP定理

CAP定理は、分散システムが以下の3つを同時に満たすことはできないという理論です:

  • C (Consistency): すべてのノードが最新の同一データを返す
  • A (Availability): すべてのリクエストがレスポンスを受け取る
  • P (Partition Tolerance): ネットワーク分断時でも動作する

ネットワーク分断は実際に発生しうるため、事実上CPかAPの選択になります。

CPシステム (一貫性優先):

  • 分断時は可用性を犠牲にして正確性を維持
  • 例: ZooKeeper, HBase, MongoDB (w:majority)
  • 適用: 金融取引、在庫管理

APシステム (可用性優先):

  • 分断時でも応答するが、データが最新でない場合がある
  • 例: DynamoDB, Cassandra, CouchDB
  • 適用: ソーシャルフィード、DNS、通知システム

5.2 一貫性モデルのスペクトル

強い一貫性 (Strong Consistency)   ← 高レイテンシ、低スループット
順次一貫性 (Sequential Consistency)
因果一貫性 (Causal Consistency)
最終的一貫性 (Eventual Consistency) ← 低レイテンシ、高スループット

5.3 シャーディング戦略

# レンジシャーディング: 連続範囲でシャードに分割
def range_shard(user_id: int, num_shards: int = 4) -> int:
    shard_size = 250_000_000
    return min(user_id // shard_size, num_shards - 1)

# ハッシュシャーディング: 均等分散、ホットスポット防止
import hashlib

def hash_shard(key: str, num_shards: int = 8) -> int:
    h = int(hashlib.md5(key.encode()).hexdigest(), 16)
    return h % num_shards

# 一貫性ハッシュ: ノード追加/削除時のリバランスを最小化
import bisect

class ConsistentHashRing:
    def __init__(self, nodes: list, replicas: int = 150):
        self.replicas = replicas
        self.ring: dict = {}
        self.sorted_keys: list = []
        for node in nodes:
            self.add_node(node)

    def add_node(self, node: str):
        for i in range(self.replicas):
            key = self._hash(f"{node}:{i}")
            self.ring[key] = node
            bisect.insort(self.sorted_keys, key)

    def get_node(self, key: str) -> str:
        if not self.ring:
            return None
        h = self._hash(key)
        idx = bisect.bisect(self.sorted_keys, h)
        if idx == len(self.sorted_keys):
            idx = 0
        return self.ring[self.sorted_keys[idx]]

    def _hash(self, key: str) -> int:
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

6. データモデリング

6.1 正規化 vs 非正規化

正規化 (第3正規形まで):

  • データの重複を最小化
  • 更新異常を防止
  • 書き込みパフォーマンスが良い
  • 読み取りにはJOINが必要

非正規化:

  • 読み取りパフォーマンスを最適化
  • データの重複を許容
  • OLAP、データウェアハウスに適合
-- 正規化スキーマ
CREATE TABLE categories (id SERIAL PRIMARY KEY, name TEXT);
CREATE TABLE products (
    id          SERIAL PRIMARY KEY,
    name        TEXT,
    category_id INT REFERENCES categories(id)
);

-- 非正規化スキーマ (JOIN不要で高速読み取り)
CREATE TABLE products_denormalized (
    id            SERIAL PRIMARY KEY,
    name          TEXT,
    category_id   INT,
    category_name TEXT  -- 重複保存でJOINを排除
);

6.2 スタースキーマ (データウェアハウス)

-- ファクトテーブル (計測値)
CREATE TABLE fact_sales (
    sale_id      BIGINT,
    date_key     INT,
    product_key  INT,
    customer_key INT,
    store_key    INT,
    quantity     INT,
    unit_price   DECIMAL(10,2),
    total_amount DECIMAL(10,2)
);

-- ディメンションテーブル (文脈情報)
CREATE TABLE dim_date (
    date_key    INT PRIMARY KEY,
    full_date   DATE,
    year        INT, quarter INT, month INT, week INT, day_of_week INT
);

CREATE TABLE dim_product (
    product_key INT PRIMARY KEY,
    product_id  TEXT,
    name TEXT, category TEXT, brand TEXT, unit_cost DECIMAL(10,2)
);

7. AI連携: LLM + DB統合パターン

7.1 LangChain + pgvectorでRAGシステムを構築

from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_postgres import PGVector
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.schema import Document

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

CONNECTION_STRING = "postgresql+psycopg://user:password@localhost:5432/vectordb"

vector_store = PGVector(
    embeddings=embeddings,
    collection_name="documents",
    connection=CONNECTION_STRING,
    use_jsonb=True,
)

# ドキュメントのチャンク化・埋め込み・保存
def ingest_documents(file_path: str, metadata: dict):
    with open(file_path, 'r', encoding='utf-8') as f:
        raw_text = f.read()

    splitter = RecursiveCharacterTextSplitter(
        chunk_size=512,
        chunk_overlap=50,
        separators=["\n\n", "\n", "。", ".", " "]
    )
    chunks = splitter.split_text(raw_text)
    documents = [
        Document(page_content=chunk, metadata={**metadata, "chunk_index": i})
        for i, chunk in enumerate(chunks)
    ]
    return vector_store.add_documents(documents)

# RAGチェーンの構築
def build_rag_chain(k: int = 5, score_threshold: float = 0.7):
    retriever = vector_store.as_retriever(
        search_type="similarity_score_threshold",
        search_kwargs={"k": k, "score_threshold": score_threshold}
    )
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    return RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=retriever,
        return_source_documents=True,
    )

rag_chain = build_rag_chain()
result = rag_chain.invoke({"query": "PostgreSQL MVCCはどのように同時実行を処理しますか?"})
print(result['result'])

7.2 ハイブリッド検索: BM25 + ベクター検索の融合

from sqlalchemy import text

def hybrid_search(
    query: str,
    query_embedding: list,
    k: int = 10,
    alpha: float = 0.5,   # 0=BM25のみ、1=ベクターのみ
) -> list:
    sql = text("""
    WITH
    vector_results AS (
        SELECT id, content, metadata,
               ROW_NUMBER() OVER (ORDER BY embedding <=> :embedding) AS rank
        FROM document_embeddings
        ORDER BY embedding <=> :embedding
        LIMIT :k
    ),
    bm25_results AS (
        SELECT id, content, metadata,
               ROW_NUMBER() OVER (ORDER BY ts_rank(
                   to_tsvector('japanese', content),
                   plainto_tsquery('japanese', :query)
               ) DESC) AS rank
        FROM document_embeddings
        WHERE to_tsvector('japanese', content) @@ plainto_tsquery('japanese', :query)
        LIMIT :k
    ),
    rrf AS (
        SELECT
            COALESCE(v.id, b.id)             AS id,
            COALESCE(v.content, b.content)   AS content,
            COALESCE(v.metadata, b.metadata) AS metadata,
            COALESCE(1.0 / (60 + v.rank), 0) * :alpha +
            COALESCE(1.0 / (60 + b.rank), 0) * (1 - :alpha) AS score
        FROM vector_results v
        FULL OUTER JOIN bm25_results b ON v.id = b.id
    )
    SELECT id, content, metadata, score
    FROM rrf
    ORDER BY score DESC
    LIMIT :k
    """)

    with engine.connect() as conn:
        rows = conn.execute(sql, {
            "embedding": str(query_embedding),
            "query": query,
            "k": k,
            "alpha": alpha,
        })
        return [dict(row) for row in rows]

7.3 Redisを使った埋め込みキャッシュ

import hashlib, json
from typing import Optional

class EmbeddingCache:
    def __init__(self, redis_client, ttl: int = 86400 * 7):  # 7日間キャッシュ
        self.redis = redis_client
        self.ttl = ttl

    def _key(self, text: str, model: str) -> str:
        h = hashlib.sha256(f"{model}:{text}".encode()).hexdigest()
        return f"embedding:{h}"

    def get(self, text: str, model: str) -> Optional[list]:
        raw = self.redis.get(self._key(text, model))
        return json.loads(raw) if raw else None

    def set(self, text: str, model: str, embedding: list):
        self.redis.setex(self._key(text, model), self.ttl, json.dumps(embedding))

    def get_or_compute(self, text: str, model: str, compute_fn) -> list:
        cached = self.get(text, model)
        if cached:
            return cached
        embedding = compute_fn(text)
        self.set(text, model, embedding)
        return embedding

クイズ: 実力確認

Q1. B-TreeインデックスとHashインデックスはそれぞれどのような状況に適していますか?

答え: B-Treeは範囲クエリ、ソート、LIKEプレフィックス検索に適しています。Hashインデックスは等値比較(=)のみに適しています。

解説: B-Treeはキーをソートされたツリー構造で保存するため、WHERE age > 30ORDER BY nameBETWEENなどの範囲演算子に優れています。Hashインデックスはキーをハッシュ化するため、WHERE id = 42のような正確な値の一致ではO(1)のパフォーマンスを発揮しますが、範囲クエリやソートには全く使用できません。PostgreSQLでは、WALログ完全サポートはv10以降で、プロダクション使用が安全になりました。

Q2. PostgreSQL MVCCが同時実行を処理する仕組みは?

答え: 各トランザクションは開始時にスナップショットを取得し、そのスナップショット時点のデータバージョンのみを参照します。読み取りは書き込みをブロックせず、書き込みも読み取りをブロックしません。

解説: PostgreSQLは行の更新時に既存の行を削除せず、xminxmaxトランザクションIDをタグした新バージョンの行を追加します。各トランザクションは自身のxidスナップショットを基に、どのバージョンが参照可能かを決定します。古いバージョン(デッドタプル)はVACUUMバックグラウンドプロセスが定期的にクリーンアップします。これにより、読み取り/書き込みの競合が最小化され、高い同時実行性を実現します。

Q3. CAP定理におけるCPシステムとAPシステムのトレードオフは何ですか?

答え: CPシステムはネットワーク分断時に一部のリクエストにエラーを返して一貫性を保証します。APシステムは分断時でも応答を返しますが、データが最新でない場合があります。

解説: CPシステム(ZooKeeper, HBase)は分断状況でリクエストを拒否したりエラーを返したりして一貫性を維持します。金融取引、在庫管理など正確性が重要なシステムに適しています。APシステム(Cassandra, DynamoDB)は分断時でも最善の応答を返しますが、ノード間データが一時的に不一致になる場合があり、最終的一貫性(Eventual Consistency)を保証します。ソーシャルメディアフィード、通知システムのように多少の不一致が許容されるシステムに適しています。

Q4. ベクターデータベースでHNSWアルゴリズムがANN検索に使われる理由は?

答え: HNSW(Hierarchical Navigable Small World)は複数階層のグラフを構築し、O(log N)レベルの高速な近似最近傍検索を可能にします。高いリコール率と高速なクエリ速度を同時に実現します。

解説: 数百万のベクターから正確な最近傍(KNN)を見つけるには、すべてのベクターと比較する必要がありO(N)時間が必要です。HNSWは各ノードが少数の隣接ノードとのみ接続された小世界グラフ(small-world graph)を階層的に構築します。上位層は大まかなナビゲーション、下位層は精密な検索に使用されます。構築パラメータのm(各ノードの接続数)とef_construction(構築時の探索幅)がインデックスの品質を決定し、クエリ時のef_searchがリコールと速度のトレードオフを調整します。

Q5. Redisのwrite-through戦略とwrite-behind戦略の違いは何ですか?

答え: write-throughはキャッシュとDBを同期的に同時更新して一貫性を保証しますが、書き込みレイテンシが発生します。write-behindはキャッシュのみを先に更新し、DBへの保存を非同期で行うため書き込み速度が速いですが、データ消失のリスクがあります。

解説: write-throughはすべての書き込みリクエストでキャッシュとDBを同期的に更新するため、データの一貫性が保証されますが、DBの書き込みレイテンシがそのままユーザーに伝わります。write-behind(write-back)はキャッシュのみを即時更新し、ダーティ(dirty)なデータをバッチでDBにフラッシュします。書き込みスループットが高く応答速度が速いですが、フラッシュ前にシステム障害が発生すると、キャッシュのデータがDBに反映されず消失する可能性があります。ECサイトのカート、ゲームスコアのような頻繁な書き込みが発生する場所にwrite-behindが適しています。


まとめ

データベースエンジニアリングは単にSQLを書くことを超え、データモデル設計からインデックス戦略、分散システム理論、そして今やベクター検索とAI連携まで幅広い知識を要求します。特にAI時代には、埋め込み保存、セマンティック検索、LLMとの統合がデータベースエンジニアの核心スキルとなっています。

このガイドの内容を実際のプロジェクトに適用してみてください。pgvectorを使ったRAGシステム、Redisキャッシングレイヤー、そして適切なインデックス設計だけでも、システムパフォーマンスを大幅に向上させることができます。