Skip to content
Published on

CDC (Change Data Capture) 完全ガイド 2025: Debezium, Kafka, データ同期, Outbox Pattern

Authors

TL;DR

  • CDC = データ同期のゲームチェンジャー: ポーリングの代わりに変更イベントストリーム。リアルタイムかつ効率的
  • Debezium が事実上の標準: PostgreSQL, MySQL, MongoDB, SQL Server などをサポート
  • WAL/Binlog ベース: DB 内部ログを読んで変更を捕捉。非侵襲的
  • Outbox Pattern: dual-write 問題を解決。マイクロサービスイベントの標準
  • ユースケース: DW 同期、検索インデックス更新、キャッシュ無効化、イベント駆動アーキテクチャ

1. CDC が必要な理由

1.1 従来のデータ同期

ポーリング:

while True:
    new_records = db.query("SELECT * FROM users WHERE updated_at > ?", last_sync)
    sync_to_warehouse(new_records)
    last_sync = now()
    sleep(60)

問題点:

  1. 遅延: 1分単位、リアルタイムではない
  2. 負荷: 毎回 DB にクエリ
  3. DELETE 検知不可: WHERE updated_at > ? では削除は捕捉できない
  4. 漏れ: トランザクションの時間差で抜ける
  5. スケール不可: データ増加でクエリが遅くなる

1.2 CDC が約束するもの

[DB] -> [WAL/Binlog] -> [CDC tool] -> [Kafka] -> [Consumers]
                            ^
                リアルタイム、非侵襲的

利点:

  1. リアルタイム: ミリ秒単位の latency
  2. DB 負荷ゼロ: ログを読むだけ
  3. 全ての変更: INSERT/UPDATE/DELETE を捕捉
  4. 順序保証: トランザクション順
  5. スケーラブル: データ量に依存しない

1.3 ユースケース

ユースケース説明
データウェアハウス同期OLTP から Snowflake/BigQuery へリアルタイム
検索インデックス更新DB 変更を Elasticsearch に即反映
キャッシュ無効化DB 更新で Redis キャッシュを無効化
イベント駆動DB 変更をドメインイベントに
マイクロサービス同期あるサービスのデータを他が参照
DB マイグレーション無停止での旧から新 DB への移行
監査ログ全ての変更履歴を保持

2. CDC の動作原理

2.1 PostgreSQL の WAL

WAL (Write-Ahead Log): 全ての変更をディスクに記録するログ。障害復旧の基盤

INSERT INTO users (name) VALUES ('Alice');
-> [WAL]: tx_id=123, op=INSERT, table=users, data={name:'Alice'}
-> [Disk page write]

CDC はこの WAL を読んで変更を捕捉する。

2.2 Logical vs Physical Replication

Physical Replication (Streaming):

  • ディスクページ単位の複製
  • Standby は Primary と byte-identical
  • 利点: 高速、正確
  • 欠点: 同一バージョン、同一スキーマ必須

Logical Replication:

  • 論理的な変更 (INSERT/UPDATE/DELETE) を複製
  • 異なるスキーマ・バージョンも可
  • 利点: 柔軟、CDC に適する
  • 欠点: 若干のオーバーヘッド

2.3 PostgreSQL Logical Replication 設定

# postgresql.conf
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
-- ユーザ作成
CREATE USER cdc_user REPLICATION LOGIN PASSWORD 'secret';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;

-- Publication 作成 (どのテーブルを複製するか)
CREATE PUBLICATION my_pub FOR TABLE users, orders;
-- 全テーブル対象
CREATE PUBLICATION my_pub FOR ALL TABLES;

-- Replication slot 作成 (変更を保存する場所)
SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput');

2.4 MySQL の Binlog

MySQL は Binlog を使う:

# my.cnf
log_bin = mysql-bin
binlog_format = ROW  # CDC には ROW 必須
binlog_row_image = FULL
server_id = 1
expire_logs_days = 7

binlog_format:

  • STATEMENT: SQL 文自体 (再生時に非決定的)
  • ROW: 行単位の変更 (CDC に適する)
  • MIXED: 両方

2.5 MongoDB Change Streams

const changeStream = db.collection('users').watch()
changeStream.on('change', (change) => {
  console.log(change)
  // { operationType: 'insert', fullDocument: {...}, ... }
})

MongoDB は oplog ベース。


3. Debezium — CDC の標準

3.1 Debezium とは

Debezium = オープンソースの CDC プラットフォーム。Red Hat 発。

アーキテクチャ:

[PostgreSQL] -> [Debezium connector] -> [Kafka Connect] -> [Kafka topics] -> [Consumers]

主要機能:

  • 幅広い DB 対応 (PG, MySQL, MongoDB, Oracle, SQL Server, Cassandra)
  • Kafka Connect 上に構築
  • Exactly-once 配信 (Kafka の保証)
  • Schema 変更検知
  • フィルタ、変換

3.2 Debezium インストール (Docker Compose)

version: '3.7'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092

  connect:
    image: debezium/connect:2.5
    depends_on: [kafka, postgres]
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses

  postgres:
    image: debezium/example-postgres:2.5
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres

3.3 PostgreSQL Connector 登録

curl -X POST http://connect:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "cdc_user",
      "database.password": "secret",
      "database.dbname": "mydb",
      "database.server.name": "mydb",
      "table.include.list": "public.users,public.orders",
      "plugin.name": "pgoutput",
      "publication.name": "my_pub",
      "slot.name": "my_slot"
    }
  }'

これで Debezium が PostgreSQL の変更を自動的に Kafka に発行する。

3.4 Kafka トピック構造

Debezium は テーブル毎にトピック を作成:

mydb.public.users   <- users テーブルの変更
mydb.public.orders  <- orders テーブルの変更

メッセージ形式:

{
  "before": null,
  "after": {
    "id": 123,
    "name": "Alice",
    "email": "alice@example.com"
  },
  "source": {
    "version": "2.5",
    "connector": "postgresql",
    "name": "mydb",
    "ts_ms": 1681545600000,
    "snapshot": "false",
    "db": "mydb",
    "schema": "public",
    "table": "users",
    "txId": 12345,
    "lsn": 23456789
  },
  "op": "c",
  "ts_ms": 1681545600000
}

op の値: c=create, u=update, d=delete, r=read (Snapshot)。

3.5 Kafka Consumer

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'mydb.public.users',
    bootstrap_servers=['kafka:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    event = message.value
    op = event['op']
    if op == 'c':
        print(f"new user: {event['after']}")
    elif op == 'u':
        print(f"update: {event['before']} -> {event['after']}")
    elif op == 'd':
        print(f"delete: {event['before']}")

4. Outbox Pattern — Dual-Write 問題の解決

4.1 Dual-Write 問題

シナリオ: 注文作成時に DB 保存 + Kafka イベント発行。

def create_order(order):
    db.save(order)              # 1
    kafka.send("orders", order) # 2

問題: 1 と 2 の間でクラッシュしたら?

  • 1 のみ成功: DB にはあるがイベント未送信 → 下流で不整合
  • 2 のみ成功: イベントは送ったが DB に無い → ゴーストイベント

dual-write 問題: 2つのシステムに対する原子的書き込みは不可能。

4.2 Outbox Pattern

要点: DB トランザクション内で outbox テーブルにもイベントを保存する。

CREATE TABLE outbox (
  id UUID PRIMARY KEY,
  aggregate_type VARCHAR(255),
  aggregate_id VARCHAR(255),
  event_type VARCHAR(255),
  payload JSONB,
  created_at TIMESTAMP DEFAULT NOW()
);
def create_order(order):
    with db.transaction():
        db.save(order)
        db.save(OutboxEvent(
            aggregate_type='Order',
            aggregate_id=order.id,
            event_type='OrderCreated',
            payload=order.to_json()
        ))

原子性保証: 2つの INSERT が同一トランザクション、共に成功か失敗。

4.3 Outbox -> Kafka

方法 1: Polling Publisher (シンプル)

while True:
    events = db.query("""
        SELECT * FROM outbox
        WHERE published = false
        ORDER BY created_at
        LIMIT 100
    """)
    for event in events:
        kafka.send(event.aggregate_type, event.payload)
        db.execute("UPDATE outbox SET published = true WHERE id = ?", event.id)
    sleep(1)

欠点: ポーリングのオーバーヘッド、多少の遅延。

方法 2: CDC (推奨)

Debezium が outbox テーブルを監視し、自動で Kafka に発行。

curl -X POST http://connect:8083/connectors \
  -d '{
    "name": "outbox-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "table.include.list": "public.outbox",
      "transforms": "outbox",
      "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
      "transforms.outbox.route.by.field": "aggregate_type",
      "transforms.outbox.table.field.event.id": "id",
      "transforms.outbox.table.field.event.payload": "payload"
    }
  }'

効果: ポーリングコード不要、ほぼリアルタイム。


5. データウェアハウス同期

5.1 シナリオ

OLTP (PostgreSQL) -> DW (Snowflake/BigQuery) のリアルタイム同期。

5.2 従来 ETL の限界

[Extract] -> [Transform] -> [Load]
(バッチ・日次)  (遅い)    (大容量)

問題: 24時間遅延、DB への大負荷、全データ再処理。

5.3 CDC ベースの ELT

[PostgreSQL] -> [Debezium] -> [Kafka] -> [Stream Processor] -> [Snowflake]

利点: リアルタイム (秒単位)、DB 負荷最小 (WAL だけ読む)、変更のみ処理。

5.4 ツール

ツール特徴
Debezium + Kafka ConnectOSS、セルフホスト
Fivetranマネージド、高価、簡単
AirbyteOSS + マネージド
Stitchシンプル、マネージド
AWS DMSAWS 統合
Striimエンタープライズ
Hevoノーコード

5.5 Schema Evolution

問題: ソース DB のカラム追加/削除にどう対応するか。

Debezium の挙動:

  • 新カラム: 自動検知、イベントに含める
  • カラム削除: イベント発行 (DELETE スタイル)
  • 型変更: 新スキーマで発行

Schema Registry (Confluent): 全スキーマバージョン保存、互換性検証、コンシューマが安全に読める。


6. 検索インデックス同期

6.1 シナリオ

PostgreSQL -> Elasticsearch のリアルタイムインデックス更新。

6.2 アーキテクチャ

[PostgreSQL] -> [Debezium] -> [Kafka] -> [Elasticsearch Sink Connector] -> [Elasticsearch]

6.3 Elasticsearch Sink Connector

curl -X POST http://connect:8083/connectors \
  -d '{
    "name": "es-sink",
    "config": {
      "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "topics": "mydb.public.products",
      "connection.url": "http://elasticsearch:9200",
      "type.name": "_doc",
      "key.ignore": "false",
      "schema.ignore": "true",
      "behavior.on.malformed.documents": "warn"
    }
  }'

6.4 効果

  • 新商品追加 -> 1-2秒で検索可能
  • 価格更新 -> 検索結果が自動更新
  • 削除 -> インデックスから即時除去

従来のバッチ reindex は時間単位・数時間かかる。CDC ならリアルタイム。


7. キャッシュ無効化

7.1 問題

def get_user(user_id):
    user = cache.get(f"user:{user_id}")
    if user is None:
        user = db.query(...)
        cache.set(f"user:{user_id}", user, ttl=3600)
    return user

def update_user(user_id, data):
    db.execute("UPDATE users SET ... WHERE id=?", user_id)
    cache.delete(f"user:{user_id}")  # 忘れたら? 他サービスが更新したら?

解決策: CDC による自動無効化。

7.2 CDC ベース

consumer = KafkaConsumer('mydb.public.users')

for message in consumer:
    event = message.value
    user_id = event['after']['id'] if event['op'] != 'd' else event['before']['id']
    cache.delete(f"user:{user_id}")

利点: コード変更不要、全変更を自動捕捉、中央管理。


8. 無停止マイグレーション

8.1 シナリオ

レガシー MySQL -> 新規 PostgreSQL へのマイグレーション。

8.2 手順

Step 1: CDC 設定

[MySQL (現行)] -> [Debezium] -> [Kafka] -> [PostgreSQL ()]

Step 2: 初期 Snapshot — Debezium が MySQL の全データを PG に一括コピー。

Step 3: リアルタイム同期 — 以降の変更のみ PG に反映、両 DB がほぼリアルタイムで一致。

Step 4: 読み込みトラフィック切替 — 一部読み込みを PG へ (検証)、徐々に100%へ。

Step 5: 書き込みトラフィック切替 — CDC 方向反転 (PG -> MySQL) でロールバック対応、書き込みを PG に、期間経過後 MySQL 廃止。

8.3 利点

  • 無停止: ユーザへの影響なし
  • ロールバック可能: 問題時は即復帰
  • 段階的: リスク分散
  • 検証: 両 DB 比較で整合性確認

9. よくある落とし穴と対策

9.1 Replication Slot の蓄積

使われない PostgreSQL replication slot は WAL が無限に蓄積 してディスクが溢れる。

-- アクティブなスロット確認
SELECT slot_name, active, restart_lsn
FROM pg_replication_slots;

-- 使ってないスロット削除
SELECT pg_drop_replication_slot('unused_slot');

監視必須: WAL サイズのアラート。

9.2 大規模トランザクション

数百万行を1トランザクションで処理すると Debezium のメモリが爆発。

対策: トランザクション分割、max.queue.size チューニング、Heartbeat 活用。

9.3 Schema 変更

ALTER TABLE が CDC を壊すことがある。

予防: 追加のみ (additive)、DDL を小さな単位で、Debezium の schema history を使う。

9.4 Exactly-Once Delivery

Debezium は at-least-once (重複あり得る)。

対策: コンシューマ側の冪等性確保、Kafka のトランザクション API、メッセージ ID 追跡。

9.5 モニタリング

# Connector 状態
curl http://connect:8083/connectors/postgres-connector/status

# メトリクス (JMX)
debezium_metrics_QueueRemainingCapacity
debezium_metrics_NumberOfEventsFiltered

主要指標: Lag (ソース DB との差分)、Queue sizeError rate


10. CDC vs 代替

10.1 CDC vs Polling

CDCPolling
遅延リアルタイム (ms)分/時間
DB 負荷非常に低い高い
DELETE 検知ありなし
複雑度
インフラKafka 等が必要不要

10.2 CDC vs Triggers

Triggers: DB トリガで別テーブルに書く。シンプルだが DB 性能に影響、トランザクション負荷。

CDC: 外部ツールが WAL を読む。非侵襲的だが別インフラが必要。

10.3 CDC vs アプリケーションイベント

App-level: コードが直接イベント発行。ビジネスロジックと統合できるが dual-write 問題 (Outbox で解決)。

CDC: DB から直接。安全・完全だが、データモデルレベルのみ。

組み合わせ: Outbox + CDC = 両方の良いとこ取り。


クイズ

1. Logical Replication と Physical Replication の違い?

答え: Physical Replication はディスクページ単位の複製で、Standby は Primary と byte-identical。高速だが同一バージョン・同一スキーマ必須、主に HA 用途。Logical Replication は論理変更 (INSERT/UPDATE/DELETE) の複製で、異なるスキーマ・バージョン可能。CDC に適し、PostgreSQL 10+ でサポート。CDC は logical replication ベースで、wal_level = logical で有効化。

2. Outbox Pattern はどのように dual-write 問題を解決するか?

答え: DB トランザクション内で outbox テーブルにもイベントを一緒に保存する。2つの INSERT が同じトランザクション → 原子的に一緒に成功または失敗。その後、別プロセス (Polling または CDC) が outbox から Kafka へイベントを転送。結果: DB とメッセージブローカ間の原子性を保証。Debezium + Outbox Event Router transformation が標準実装。マイクロサービスでほぼ必須のパターン。

3. Debezium が事実上の標準になった理由?

答え: (1) 広い DB サポート — PostgreSQL, MySQL, MongoDB, Oracle, SQL Server, Cassandra、(2) Kafka Connect 上に構築 — 実績あるインフラ、(3) オープンソース — 無料、(4) 機能充実 — Schema 変更検知、フィルタ、transformation、(5) Red Hat バックアップ — 安定した開発。代替 (Maxwell, Canal) は特定 DB のみ、または機能不足。クラウドマネージド (Fivetran, Airbyte) は高いが運用負担は少ない。

4. CDC ベースのキャッシュ無効化の利点?

答え: (1) 自動cache.delete() を忘れない、(2) 完全 — DB の全変更 (他サービスも含む) を捕捉、(3) 中央管理 — 無効化ロジックが一箇所、(4) 信頼性 — DB トランザクション commit 時点で発火。欠点: 若干の遅延 (通常1秒以内)、別インフラ必要。複数のマイクロサービスが同じ DB を共有 する場合に特に有効。

5. 無停止 DB マイグレーションにおける CDC の役割?

答え: (1) 初期 Snapshot: Debezium が全データを新 DB に一括コピー。(2) リアルタイム同期: WAL を読んで新 DB に即反映、両 DB がほぼリアルタイムに一致。(3) 段階的トラフィック切替: 読み込みを徐々に新 DB へ、次に書き込み。(4) ロールバック可能: 問題時に CDC 方向反転。無停止 で数ヶ月かけた安全なマイグレーションが可能。Stripe や Airbnb が PostgreSQL マイグレーションでこの方式を採用。


参考資料