Skip to content
Published on

Outboxパターンと CDCで実現するマイクロサービスデータ同期:Debezium実践ガイド

Authors
  • Name
    Twitter
Outbox Pattern CDC

はじめに:Dual Write問題

マイクロサービスアーキテクチャで最も頻繁に直面する問題の一つは、データベースとメッセージブローカーに同時にデータを書き込まなければならない状況です。注文サービスが注文を作成すると同時に、在庫サービスにイベントを発行しなければならないケースを考えてみましょう。単純に実装すると、次のようなコードになります。

// 危険な Dual Write パターン
@Transactional
public Order createOrder(OrderRequest request) {
    Order order = orderRepository.save(new Order(request));  // Step 1: DB保存

    // Step 2: メッセージブローカーにイベント発行
    kafkaTemplate.send("order-events", new OrderCreatedEvent(order));

    return order;
}

このコードは致命的な問題を抱えています。DB保存は成功したがKafka発行が失敗した場合、注文は存在するのに他のサービスはそれを知ることができません。逆にKafka発行は成功したがDBトランザクションがロールバックされた場合、存在しない注文に対するイベントが伝播されます。これがまさにDual Write問題です。

分散トランザクション(2PC)で解決することもできますが、Kafkaは従来のXAトランザクションに参加できず、2PC自体が性能ボトルネックと可用性低下を引き起こします。この問題を根本的に解決するパターンがOutboxパターンであり、これを効率的に実現する技術が**CDC(Change Data Capture)**です。

1. Outboxパターンアーキテクチャ

1.1 核心アイデア

Outboxパターンの核心はシンプルです。イベントをメッセージブローカーに直接発行するのではなく、ビジネスデータと同一トランザクションでOutboxテーブルに記録するということです。こうすることで、DBのACIDトランザクションが両方の操作のアトミック性を保証します。その後、別のプロセスがOutboxテーブルを読み取ってメッセージブローカーに転送します。

[サービスコード]
    ├── ビジネスデータ INSERT ─────┐
    │                                │  同一DBトランザクション
    └── Outboxテーブル INSERT ───────┘
                              [Outbox Relay]
                              (CDC or Polling)
                              [メッセージブローカー]
                              (Kafka, RabbitMQ)
                              [他のマイクロサービス]

この方式の利点は明確です。ビジネスデータとイベントが一つのトランザクションにまとめられるため、両方とも保存されるか、両方とも保存されないかのどちらかになります。最低1回(at-least-once)の配信が保証され、コンシューマ側で冪等性を実装すれば、実質的に1回(effectively-once)のセマンティクスも実現できます。

1.2 Outboxパターンの2つの実装方式

Outboxテーブルの変更内容をメッセージブローカーに転送する方式は、大きく2つあります。

  1. Polling方式:定期的にOutboxテーブルをクエリして未発行のイベントを取得します
  2. CDC方式:データベースのトランザクションログ(WAL/binlog)を監視して変更内容をキャプチャします

2. Outboxテーブル設計

2.1 基本スキーマ

-- PostgreSQL Outboxテーブル DDL
CREATE TABLE outbox_events (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type  VARCHAR(255) NOT NULL,   -- イベント所属ドメイン(例:'Order')
    aggregate_id    VARCHAR(255) NOT NULL,   -- ドメインエンティティID(例:注文ID)
    event_type      VARCHAR(255) NOT NULL,   -- イベントタイプ(例:'OrderCreated')
    payload         JSONB NOT NULL,          -- イベント本文
    metadata        JSONB DEFAULT '{}',      -- 追加メタデータ
    created_at      TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMP WITH TIME ZONE,-- Polling方式用:発行完了時刻
    retry_count     INT DEFAULT 0,           -- Polling方式用:リトライ回数
    status          VARCHAR(20) DEFAULT 'PENDING'  -- PENDING, PUBLISHED, FAILED
);

-- Polling方式で未発行イベント検索用のインデックス
CREATE INDEX idx_outbox_status_created ON outbox_events(status, created_at)
    WHERE status = 'PENDING';

-- CDC方式ではstatus/published_atカラムは不要であり、
-- イベント発行後に行を削除する方式を使用できます

2.2 設計時の重要な考慮事項

aggregate_idをKafkaパーティションキーとして使用:同じaggregate_idを持つイベントが同じパーティションに入るため、そのエンティティに対するイベントの順序が保証されます。例えば、注文123に対する'OrderCreated' -> 'OrderPaid' -> 'OrderShipped'イベントが順序通りに処理されます。

payloadに必要な情報をすべて含める:コンシューマがイベントだけで処理を完了できるよう、payloadに十分な情報を含めます。コンシューマがプロデューサのDBを直接参照することは、サービス間の結合度を高めるため避けるべきです。

テーブルの肥大化防止:CDC方式では、イベントをキャプチャした後、定期的に古い行を削除します。Polling方式では、発行完了したイベントを一定期間後にアーカイブまたは削除します。

2.3 MySQL用トリガーベースOutbox(代替案)

-- MySQLでトリガーを利用した自動Outbox記録
DELIMITER //

CREATE TRIGGER after_order_insert
AFTER INSERT ON orders
FOR EACH ROW
BEGIN
    INSERT INTO outbox_events (
        id, aggregate_type, aggregate_id, event_type, payload, created_at
    ) VALUES (
        UUID(),
        'Order',
        NEW.order_id,
        'OrderCreated',
        JSON_OBJECT(
            'orderId', NEW.order_id,
            'userId', NEW.user_id,
            'totalAmount', NEW.total_amount,
            'status', NEW.status,
            'createdAt', NEW.created_at
        ),
        NOW()
    );
END //

CREATE TRIGGER after_order_update
AFTER UPDATE ON orders
FOR EACH ROW
BEGIN
    IF OLD.status != NEW.status THEN
        INSERT INTO outbox_events (
            id, aggregate_type, aggregate_id, event_type, payload, created_at
        ) VALUES (
            UUID(),
            'Order',
            NEW.order_id,
            CONCAT('OrderStatus', NEW.status),
            JSON_OBJECT(
                'orderId', NEW.order_id,
                'previousStatus', OLD.status,
                'newStatus', NEW.status,
                'updatedAt', NEW.updated_at
            ),
            NOW()
        );
    END IF;
END //

DELIMITER ;

トリガーベースのアプローチはアプリケーションコードを修正しなくてもよいという利点がありますが、トリガーの実行コストがメイントランザクションに含まれるため、性能に影響を与える可能性があります。また、複雑なビジネスロジックをトリガーに入れることは難しく、デバッグも困難であるため、ほとんどの場合はアプリケーションレベルで明示的にOutbox行を挿入することを推奨します。

3. CDCの概念と動作原理

3.1 CDCとは何か

CDC(Change Data Capture)は、データベースの変更内容(INSERT、UPDATE、DELETE)をリアルタイムでキャプチャして外部システムに転送する技術です。CDCの核心はデータベースのトランザクションログを直接読み取ることです。

  • PostgreSQL:WAL(Write-Ahead Log)の論理レプリケーション(logical replication)スロットを使用
  • MySQL:binlog(binary log)を読み取って変更内容をキャプチャ
  • SQL Server:CDC機能が内蔵されており、変更テーブルを自動生成
  • MongoDB:Change Streamsを通じてoplogの変更内容をストリーミング

3.2 CDCの利点

ログベースCDCはPolling方式と比較して、いくつかの利点があります。

  1. 極めて低いレイテンシ:トランザクションコミット後、ミリ秒単位で変更内容をキャプチャ
  2. DB負荷の最小化:インデックスベースのクエリではなくログストリームを読み取るため、DBへの追加負荷がほとんどない
  3. DELETEのキャプチャが可能:Polling方式では削除された行を検知することが困難ですが、ログにはDELETEイベントも記録される
  4. すべての変更を捕捉:中間状態の変更も見逃さない(Pollingはインターバル間の変更を見逃す可能性がある)
  5. スキーマ変更の追跡:テーブル構造の変更もログに記録されるため、スキーマの進化を処理可能

4. Debeziumのインストールとコネクタ構成

4.1 Debeziumの概要

DebeziumはRed Hatが主導するオープンソースCDCプラットフォームで、Kafka Connectフレームワーク上で動作します。PostgreSQL、MySQL、MongoDB、SQL Server、Oracle、Cassandra、Vitessなど多様なデータベースをサポートしており、Outbox Event Router SMT(Single Message Transformation)を通じてOutboxパターンをネイティブにサポートしています。

4.2 Docker Composeでフルスタック構成

# docker-compose.yml - Debezium + Kafka フルスタック
version: '3.8'
services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB: orderdb
      POSTGRES_USER: appuser
      POSTGRES_PASSWORD: secret
    command:
      - 'postgres'
      - '-c'
      - 'wal_level=logical' # CDCに必要な設定
      - '-c'
      - 'max_replication_slots=4'
      - '-c'
      - 'max_wal_senders=4'
    ports:
      - '5432:5432'
    volumes:
      - postgres_data:/var/lib/postgresql/data

  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on:
      - zookeeper
    ports:
      - '9092:9092'
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

  connect:
    image: debezium/connect:2.7
    depends_on:
      - kafka
      - postgres
    ports:
      - '8083:8083'
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: debezium-connect-group
      CONFIG_STORAGE_TOPIC: connect-configs
      OFFSET_STORAGE_TOPIC: connect-offsets
      STATUS_STORAGE_TOPIC: connect-status
      CONFIG_STORAGE_REPLICATION_FACTOR: 1
      OFFSET_STORAGE_REPLICATION_FACTOR: 1
      STATUS_STORAGE_REPLICATION_FACTOR: 1

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - '8080:8080'
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: debezium
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://connect:8083

volumes:
  postgres_data:

4.3 Debeziumコネクタ登録(Outbox Event Router含む)

{
  "name": "order-outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "appuser",
    "database.password": "secret",
    "database.dbname": "orderdb",
    "topic.prefix": "order-service",
    "schema.include.list": "public",
    "table.include.list": "public.outbox_events",

    "tombstones.on.delete": "false",
    "slot.name": "order_outbox_slot",
    "plugin.name": "pgoutput",

    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.fields.additional.placement": "event_type:header:eventType",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.table.field.event.timestamp": "created_at",
    "transforms.outbox.route.by.field": "aggregate_type",
    "transforms.outbox.route.topic.replacement": "events.${routedByValue}",

    "transforms.outbox.table.expand.json.payload": "true",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",

    "heartbeat.interval.ms": "10000",
    "snapshot.mode": "initial"
  }
}

この設定の重要なポイントは以下の通りです。

  • table.include.listでOutboxテーブルのみを監視し、不要なCDCイベントを防止します
  • transforms.outbox.route.by.fieldaggregate_typeに設定することで、ドメインごとにKafkaトピックが自動生成されます。例えば、aggregate_typeが'Order'であればevents.Orderトピックに発行されます
  • table.field.event.keyaggregate_idに設定することで、同じエンティティのイベントが同じKafkaパーティションに入ります
  • heartbeat.interval.msを設定することで、Outboxテーブルに変更がなくても定期的にオフセットを更新します。この設定がないとWALの保持期間が不必要に長くなる可能性があります
# コネクタ登録
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @order-outbox-connector.json

# コネクタ状態確認
curl -s http://localhost:8083/connectors/order-outbox-connector/status | jq .

# コネクタ再起動
curl -X POST http://localhost:8083/connectors/order-outbox-connector/restart

# トピック一覧確認(Kafka UI または CLI)
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --list

5. Kafka Connectパイプライン

5.1 パイプライン全体の流れ

Kafka ConnectベースのCDCパイプラインの全体フローは以下の通りです。

  1. アプリケーションがビジネスデータとOutboxイベントを同一トランザクションでDBに書き込みます
  2. Debezium Source ConnectorがPostgreSQLのWALを読み取り、Outboxテーブルの変更を検知します
  3. Outbox Event Router SMTが元のCDCイベントをドメインごとのKafkaトピックにルーティングします
  4. コンシューママイクロサービスが該当トピックをサブスクライブしてイベントを処理します
  5. 必要に応じてSink Connectorがイベントを他のデータストア(Elasticsearch、S3など)に転送します

5.2 Spring Boot Outboxパターン実装

// OutboxEvent.java - Outboxエンティティ
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
    @Id
    @GeneratedValue(strategy = GenerationType.UUID)
    private UUID id;

    @Column(name = "aggregate_type", nullable = false)
    private String aggregateType;

    @Column(name = "aggregate_id", nullable = false)
    private String aggregateId;

    @Column(name = "event_type", nullable = false)
    private String eventType;

    @Column(name = "payload", columnDefinition = "jsonb", nullable = false)
    private String payload;

    @Column(name = "created_at", nullable = false)
    private Instant createdAt;

    // デフォルトコンストラクタ、getter、setter省略

    public static OutboxEvent create(String aggregateType, String aggregateId,
                                     String eventType, Object payload) {
        OutboxEvent event = new OutboxEvent();
        event.aggregateType = aggregateType;
        event.aggregateId = aggregateId;
        event.eventType = eventType;
        event.payload = toJson(payload);
        event.createdAt = Instant.now();
        return event;
    }

    private static String toJson(Object obj) {
        try {
            return new ObjectMapper()
                .registerModule(new JavaTimeModule())
                .writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("JSONシリアライズ失敗", e);
        }
    }
}

// OrderService.java - Outboxパターン適用注文サービス
@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxEventRepository outboxRepository;

    @Transactional  // 一つのトランザクションでビジネスデータとイベントを同時に保存
    public Order createOrder(OrderRequest request) {
        // 1. ビジネスロジック実行
        Order order = Order.builder()
            .userId(request.getUserId())
            .items(request.getItems())
            .totalAmount(calculateTotal(request.getItems()))
            .status(OrderStatus.CREATED)
            .build();

        Order savedOrder = orderRepository.save(order);

        // 2. Outboxテーブルにイベント記録(同一トランザクション)
        OrderCreatedPayload eventPayload = OrderCreatedPayload.builder()
            .orderId(savedOrder.getId().toString())
            .userId(savedOrder.getUserId())
            .totalAmount(savedOrder.getTotalAmount())
            .items(savedOrder.getItems())
            .createdAt(savedOrder.getCreatedAt())
            .build();

        OutboxEvent outboxEvent = OutboxEvent.create(
            "Order",                              // aggregate_type -> Kafkaトピック決定
            savedOrder.getId().toString(),         // aggregate_id -> Kafkaパーティションキー
            "OrderCreated",                        // event_type
            eventPayload                           // payload
        );

        outboxRepository.save(outboxEvent);

        return savedOrder;
    }

    @Transactional
    public Order cancelOrder(UUID orderId) {
        Order order = orderRepository.findById(orderId)
            .orElseThrow(() -> new OrderNotFoundException(orderId));

        if (order.getStatus() != OrderStatus.CREATED) {
            throw new IllegalStateException(
                "CREATED状態の注文のみキャンセルできます。現在の状態: " + order.getStatus()
            );
        }

        order.setStatus(OrderStatus.CANCELLED);
        Order savedOrder = orderRepository.save(order);

        OutboxEvent outboxEvent = OutboxEvent.create(
            "Order",
            savedOrder.getId().toString(),
            "OrderCancelled",
            Map.of(
                "orderId", savedOrder.getId().toString(),
                "reason", "ユーザーリクエスト",
                "cancelledAt", Instant.now().toString()
            )
        );

        outboxRepository.save(outboxEvent);

        return savedOrder;
    }
}

6. イベント順序保証と冪等性処理

6.1 イベント順序保証

Outboxパターンにおけるイベント順序保証は、以下の原則に従います。

  • 同一Aggregateに対する順序保証aggregate_idをKafkaパーティションキーとして使用すると、同じエンティティに対するイベントが同じパーティションに順序通りに保存されます
  • 異なるAggregate間では順序が保証されない:これは意図的な設計であり、マイクロサービス間の疎結合に合致します
  • パーティション数変更時の注意:Kafkaトピックのパーティション数を変更すると、既存のキーとパーティションのマッピングが崩れるため、運用中はパーティション数を変更しないことが安全です

6.2 冪等性処理(Pythonコンシューマ例)

import json
import uuid
from datetime import datetime
from kafka import KafkaConsumer
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session

# 冪等性保証のための処理履歴テーブル
# CREATE TABLE processed_events (
#     event_id UUID PRIMARY KEY,
#     processed_at TIMESTAMP NOT NULL DEFAULT NOW()
# );

engine = create_engine('postgresql://user:pass@localhost:5432/inventorydb')

consumer = KafkaConsumer(
    'events.Order',
    bootstrap_servers=['localhost:9092'],
    group_id='inventory-service',
    auto_offset_reset='earliest',
    enable_auto_commit=False,          # 手動コミットによる正確な制御
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    max_poll_records=100,
)

def process_order_event(event_data: dict, event_id: str) -> None:
    """冪等性を保証するイベント処理"""
    with Session(engine) as session:
        with session.begin():
            # 1. 既に処理済みのイベントか確認
            result = session.execute(
                text("SELECT 1 FROM processed_events WHERE event_id = :id"),
                {"id": event_id}
            )
            if result.fetchone():
                print(f"既に処理済みのイベントをスキップ: {event_id}")
                return

            # 2. ビジネスロジック実行
            event_type = event_data.get('eventType', '')

            if event_type == 'OrderCreated':
                items = event_data.get('items', [])
                for item in items:
                    session.execute(
                        text("""
                            UPDATE inventory
                            SET reserved_quantity = reserved_quantity + :qty
                            WHERE product_id = :product_id
                              AND available_quantity >= :qty
                        """),
                        {"qty": item['quantity'], "product_id": item['productId']}
                    )

            elif event_type == 'OrderCancelled':
                order_id = event_data.get('orderId')
                session.execute(
                    text("""
                        UPDATE inventory i
                        SET reserved_quantity = reserved_quantity - oi.quantity
                        FROM order_items oi
                        WHERE oi.order_id = :order_id
                          AND oi.product_id = i.product_id
                    """),
                    {"order_id": order_id}
                )

            # 3. 処理履歴記録(同一トランザクション)
            session.execute(
                text("""
                    INSERT INTO processed_events (event_id, processed_at)
                    VALUES (:id, :now)
                """),
                {"id": event_id, "now": datetime.utcnow()}
            )

    print(f"イベント処理完了: {event_id} ({event_type})")


# メインコンシューマループ
print("イベントコンシューマ起動...")
try:
    for message in consumer:
        try:
            event_data = message.value
            # Debezium Outbox Event Routerが設定したヘッダーからevent_idを抽出
            headers = {k: v.decode('utf-8') for k, v in message.headers}
            event_id = headers.get('id', str(uuid.uuid4()))

            process_order_event(event_data, event_id)

            # 処理成功時のみオフセットコミット
            consumer.commit()

        except Exception as e:
            print(f"イベント処理失敗: {e}")
            # 失敗時はDLQ(Dead Letter Queue)に送信するかリトライロジックを適用
            # ここでは単純にログ出力して次のイベントに進む

except KeyboardInterrupt:
    print("コンシューマ終了")
finally:
    consumer.close()

冪等性処理の核心は、イベント処理と処理履歴の記録を同一DBトランザクションにまとめることです。こうすることで、イベント処理中に障害が発生しても処理履歴が記録されないため、再起動時に同じイベントを再処理できます。既に処理済みのイベントは処理履歴テーブルで確認してスキップします。

7. Polling方式 vs CDC方式の比較

比較項目Polling方式CDC方式(Debezium)
レイテンシポーリング周期に依存(秒〜分)ミリ秒レベルのニアリアルタイム
DB負荷定期的なSELECTクエリ負荷WAL/binlog読み取りにより負荷最小
実装の複雑さ低い(CRON + SQL)高い(Kafka Connect + Debezium)
インフラ要件DBのみ必要Kafka + Kafka Connect + Debezium
DELETE検知不可(soft delete必要)可能(ログに記録)
中間状態キャプチャ不可(最後の状態のみ)可能(すべての変更をキャプチャ)
順序保証タイムスタンプベース(脆弱)ログベース(堅牢)
スケーラビリティDBコネクション負担増加Kafka Connect水平スケーリング可能
運用の複雑さ低い高い(Kafkaクラスタ管理が必要)
障害復旧ステータスカラムベースの簡易リトライKafkaオフセットベースの精密な復旧
適合環境小規模、低レイテンシ許容大規模、リアルタイム処理が必要

最初にOutboxパターンを導入する際はPolling方式から始め、トラフィック増加に応じてCDC方式に移行する段階的なアプローチも有効です。Polling方式であってもDual Write問題は完全に解決でき、レイテンシ要件が秒単位であれば十分な場合があります。

8. Debezium vs Maxwell vs Canal 比較

比較項目DebeziumMaxwellCanal
対応DBPostgreSQL, MySQL, MongoDB, SQL Server, Oracle, Cassandra等MySQLのみMySQLのみ
アーキテクチャKafka Connectベースの分散型単一Javaプロセス単一Javaプロセス
メッセージブローカーKafka(デフォルト)、Pulsar、NATS等Kafka、RabbitMQ、Redis等Kafka、RocketMQ等
OutboxサポートEventRouter SMTネイティブサポート別途実装が必要別途実装が必要
スキーマ進化Schema Registry統合サポート限定的限定的
スケーラビリティKafka Connect分散モードで水平スケーリング単一プロセスの限界単一プロセスの限界
設定の複雑さ高い(Kafka Connectの理解が必要)低い(シンプルな設定)中程度
コミュニティ非常に活発(Red Hat後援)小規模中国コミュニティ中心(Alibaba)
運用成熟度高い中程度中程度
適合環境大規模、複数DB、エンタープライズMySQLのみ使用する小規模サービスMySQLベースの中国エコシステム

選択ガイド:ほとんどのプロダクション環境ではDebeziumを推奨します。多様なデータベースをサポートし、Kafka Connectの分散モードで高可用性を確保でき、Outbox Event Routerをネイティブにサポートしています。MySQLのみ使用していて迅速なPoCが必要な場合は、Maxwellのシンプルさが魅力的です。CanalはAlibabaが開発したもので、中国のエコシステムで主に使用されており、海外での採用は比較的少ないです。

9. 障害シナリオと復旧手順

9.1 シナリオ1:Debeziumコネクタ障害

症状:Kafka Connectのコネクタ状態がFAILEDに変わり、新しいイベントがKafkaに発行されなくなります。

主な原因:DB接続切断、WALスロット削除、スキーマ変更の互換性問題、Kafkaブローカー接続不可など

復旧手順

# 1. コネクタ状態確認
curl -s http://localhost:8083/connectors/order-outbox-connector/status | jq .

# 2. コネクタタスクの再起動を試行
curl -X POST http://localhost:8083/connectors/order-outbox-connector/tasks/0/restart

# 3. 再起動できない場合はコネクタを削除して再登録
curl -X DELETE http://localhost:8083/connectors/order-outbox-connector
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @order-outbox-connector.json

# 4. PostgreSQL WALスロット状態確認
psql -c "SELECT * FROM pg_replication_slots;"

# 5. 必要に応じてスロットを再作成
psql -c "SELECT pg_drop_replication_slot('order_outbox_slot');"
# コネクタ再登録時に自動的にスロットが作成される

注意事項:WALスロットが有効な状態でDebeziumが長期間停止すると、PostgreSQLが該当スロット以降のWALファイルを削除できず、ディスクが満杯になる可能性があります。必ずWALサイズを監視し、max_slot_wal_keep_sizeパラメータで上限を設定する必要があります。

9.2 シナリオ2:Kafkaブローカー障害

症状:Debeziumコネクタがイベントを発行できず、コンシューマもイベントを消費できなくなります。

復旧手順

  1. Kafkaブローカーを復旧します
  2. Debeziumは最後に成功したオフセットから自動的に再開します(Kafka Connectのオフセット管理)
  3. コンシューマも最後にコミットされたオフセットから再開します
  4. CDC方式では、DBのWALが保持されている限りデータ損失は発生しません

9.3 シナリオ3:コンシューマ処理失敗

症状:特定のイベントが繰り返し処理に失敗し、コンシューマが進行できなくなります(poison pill)。

復旧手順

  1. リトライポリシーを適用します(最大3〜5回、指数バックオフ)
  2. 最大リトライ回数超過時にDLQ(Dead Letter Queue)トピックに送信します
  3. DLQのメッセージを手動で分析し、修正後に元のトピックに再発行します

9.4 シナリオ4:Outboxテーブルの肥大化

症状:Outboxテーブルが数百万行に膨れ上がり、DB性能が低下します。

復旧手順

  1. CDC方式では、イベントがキャプチャされた後、一定時間経過した行を削除します
  2. PostgreSQLの場合、パーティショニングを適用して古いパーティションをDROPします
  3. 定期的なVACUUMを実行してデッドタプル(dead tuple)を整理します
-- 日別パーティショニングされたOutboxテーブル
CREATE TABLE outbox_events (
    id              UUID NOT NULL DEFAULT gen_random_uuid(),
    aggregate_type  VARCHAR(255) NOT NULL,
    aggregate_id    VARCHAR(255) NOT NULL,
    event_type      VARCHAR(255) NOT NULL,
    payload         JSONB NOT NULL,
    created_at      TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (created_at);

-- 日別パーティション自動生成(pg_partman活用)
SELECT partman.create_parent(
    'public.outbox_events',
    'created_at',
    'native',
    'daily'
);

-- 7日以上経過したパーティションを自動削除
UPDATE partman.part_config
SET retention = '7 days', retention_keep_table = false
WHERE parent_table = 'public.outbox_events';

10. 運用時の注意事項チェックリスト

Debezium/CDC設定段階

  • PostgreSQLのwal_levellogicalに設定して再起動したか確認します
  • max_replication_slotsmax_wal_sendersをコネクタ数より余裕を持って設定します
  • max_slot_wal_keep_sizeを設定してWALディスク暴走を防止します
  • Debeziumコネクタのheartbeat.interval.msを必ず設定してオフセット更新を保証します
  • snapshot.modeを慎重に選択します(初回はinitial、以降の再起動時はschema_only

イベント設計段階

  • イベントpayloadにコンシューマが必要とするすべての情報を含めます(自己完結型イベント)
  • スキーマの進化を考慮して、フィールドの追加は許容しつつ既存フィールドの削除は避けます
  • aggregate_idをKafkaパーティションキーとして使用し、同じエンティティのイベント順序を保証します
  • イベントサイズを1MB以下に維持します(Kafkaデフォルトのメッセージ最大サイズ)

コンシューマ実装段階

  • すべてのコンシューマに冪等性を実装します(processed_eventsテーブル活用)
  • auto.commitを無効化し、手動でオフセットをコミットします
  • DLQを構成してpoison pillメッセージによる処理停滞を防止します
  • コンシューマグループのセッションタイムアウトとハートビート間隔を適切に設定します

モニタリング段階

  • Debeziumコネクタ状態を定期的に確認します(RUNNING/FAILED)
  • WALスロットサイズとディスク使用量を監視します
  • イベント発行レイテンシ(CDC lag)を監視します
  • コンシューマラグ(consumer lag)を監視します
  • Outboxテーブルのサイズと行数を監視します
  • DLQトピックのメッセージ数にアラートを設定します

障害対策

  • Kafka Connectを分散モードで運用し、ワーカーノード障害時にタスクが自動再割り当てされるようにします
  • 定期的にコネクタの削除・再登録の復旧手順をテストします
  • WALスロット関連の障害シナリオを運用マニュアルに文書化します
  • パイプライン全体のエンドツーエンドテストを自動化します

参考資料

  1. Debezium公式ドキュメント - Outbox Event Router - Outbox Event Router SMTの全設定オプションと使用方法
  2. Debeziumブログ - Reliable Microservices Data Exchange With the Outbox Pattern - OutboxパターンとDebeziumを活用したマイクロサービスデータ交換の原理
  3. Thorben Janssen - Implementing the Outbox Pattern with CDC using Debezium - JPA/Hibernate環境でのOutboxパターン実装実践ガイド
  4. Decodable - Revisiting the Outbox Pattern - 2024年の視点からのOutboxパターンの再評価と代替案分析
  5. Upsolver - Debezium vs Maxwell - CDCツール比較と選択基準の詳細分析
  6. RisingWave - Debezium vs Other CDC Tools - Debeziumと競合CDCツールの総合比較
  7. DEV Community - CDC Maxwell vs Debezium - MaxwellとDebeziumのアーキテクチャおよび機能比較
  8. Medium - Change Data Capture vs Outbox Pattern - CDCとOutboxパターンの違いと適切な使用シナリオの分析

クイズ

Q1: 「Outboxパターンと CDCで実現するマイクロサービスデータ同期:Debezium実践ガイド」の主なトピックは何ですか?

Outboxパターンと CDC(Change Data Capture)を活用したマイクロサービスデータ同期の総合ガイド。Dual Write問題の分析、Outboxテーブル設計、Debeziumコネクタ構成、Kafka Connectパイプライン、イベント順序保証、冪等性処理、障害復旧まで実践的に解説します。

Q2: Outboxパターンアーキテクチャについて説明してください。 1.1 核心アイデア Outboxパターンの核心はシンプルです。イベントをメッセージブローカーに直接発行するのではなく、ビジネスデータと同一トランザクションでOutboxテーブルに記録するということです。こうすることで、DBのACIDトランザクションが両方の操作のアトミック性を保証します。その後、別のプロセスがOutboxテーブルを読み取ってメッセージブローカーに転送します。 この方式の利点は明確です。ビジネスデータとイベントが一つのトランザクションにまとめられるため、両方とも保存されるか、両方とも保存されないかのどちらかになります。

Q3: Outboxテーブル設計について説明してください。 2.1 基本スキーマ 2.2 設計時の重要な考慮事項 aggregate_idをKafkaパーティションキーとして使用:同じaggregate_idを持つイベントが同じパーティションに入るため、そのエンティティに対するイベントの順序が保証されます。例えば、注文123に対する'OrderCreated' -> 'OrderPaid' -> 'OrderShipped'イベントが順序通りに処理されます。 payloadに必要な情報をすべて含める:コンシューマがイベントだけで処理を完了できるよう、payloadに十分な情報を含めます。

Q4: CDCの概念と動作原理の主な特徴は何ですか? 3.1 CDCとは何か CDC(Change Data Capture)は、データベースの変更内容(INSERT、UPDATE、DELETE)をリアルタイムでキャプチャして外部システムに転送する技術です。CDCの核心はデータベースのトランザクションログを直接読み取ることです。

Q5: Debeziumのインストールとコネクタ構成の主な手順は何ですか? 4.1 Debeziumの概要 DebeziumはRed Hatが主導するオープンソースCDCプラットフォームで、Kafka Connectフレームワーク上で動作します。PostgreSQL、MySQL、MongoDB、SQL Server、Oracle、Cassandra、Vitessなど多様なデータベースをサポートしており、Outbox Event Router SMT(Single Message Transformation)を通じてOutboxパターンをネイティブにサポートしています。