Skip to content
Published on

非同期処理パターン完全攻略:CQRS、Saga、Event Sourcing — シニアバックエンド開発者の必須武器

Authors

1. なぜ非同期なのか?大規模システムの生存戦略

現代

( げんだい )

の大規模システムは、

同期式

( どうきしき )

のリクエスト-レスポンスモデルでは

対応

( たいおう )

できない規模のトラフィックを処理しています。

実際の企業事例:

企業規模非同期処理方式
Netflix2.6億サブスクライバー、毎秒数百万イベントApache Kafkaベースのイベントストリーミング
Uber日次ペタバイトのリアルタイムデータApache Kafka + Apache Flink
LinkedIn7兆+メッセージ/日のKafka処理自社開発Kafkaインフラ
Shopifyブラックフライデー毎秒80万リクエスト非同期注文処理パイプライン

同期式

( どうきしき )

アーキテクチャでは、サービスAがサービスBを

( )

び出し、BがCを呼び出すと、全体のレイテンシは各サービス呼び出し時間の合計になります。一つのサービスが遅くなると、チェーン全体がブロッキングされます。

非同期

( ひどうき )

アーキテクチャでは、メッセージブローカーを通じてサービス間の

結合度

( けつごうど )

を下げ、各サービスが独立にスケーリング可能です。

同期式:  Client -> A(50ms) -> B(100ms) -> C(200ms) = 350ms 合計待機
非同期式: Client -> A(50ms) -> Message Broker -> (B, C 並列処理)
         Clientは50ms後に応答、BCは非同期で処理

非同期移行(いこう)の主要メリット:

  1. 時間的結合(じかんてきけつごう)の除去 — プロデューサーとコンシューマーが同時に稼働する必要がない
  2. 独立したスケーリング — 読み取りと書き込みワークロードを別々にスケーリング
  3. 障害(しょうがい)の分離 — 一つのサービスの障害がシステム全体に波及しない
  4. ピークトラフィックの吸収 — メッセージキューがバッファーとして機能

しかし、非同期はタダではありません。Eventual Consistency(結果整合性(けっかせいごうせい)メッセージ順序保証重複(じゅうふく)処理分散(ぶんさん)トランザクションといった新しい課題が生まれます。本記事で解説するパターンは、まさにこれらの課題への解答です。


2. Event-Driven Architecture 基礎

イベント駆動(くどう)アーキテクチャ(EDA)は非同期パターンの土台です。システムの状態変更を「イベント」として表現し、関心のあるサービスがこれを購読(こうどく)して反応します。

2-1. Domain Events vs Integration Events

区分Domain EventIntegration Event
範囲単一バウンデッドコンテキスト内部バウンデッドコンテキスト間
OrderItemAddedOrderPlaced
配信方式インメモリイベントバスメッセージブローカー(Kafka, RabbitMQ)
スキーマ管理内部的なので自由に変更可能コントラクト管理が必須
// Domain Event - バウンデッドコンテキスト内部
public record OrderItemAdded(
    UUID orderId,
    UUID productId,
    int quantity,
    BigDecimal price,
    Instant occurredAt
) implements DomainEvent {}

// Integration Event - 外部サービスに発行
public record OrderPlacedEvent(
    UUID orderId,
    UUID customerId,
    BigDecimal totalAmount,
    List<OrderLineItem> items,
    Instant occurredAt,
    int version  // スキーマバージョン管理
) implements IntegrationEvent {}

2-2. Event Bus vs Event Store

Event Bus(例:Kafka, RabbitMQ)はイベントを配信(はいしん)するパイプラインです。コンシューマーがイベントを受信すると、通常ブローカーから削除(またはTTL後に失効)されます。

Event Storeは全てのイベントを永続的に保管する貯蔵所(ちょぞうしょ)です。イベントをリプレイ(replay)して、任意の時点の状態を復元できます。

Event Bus (Kafka):
  Producer -> [Topic: orders] -> Consumer Group A (注文サービス)
                               -> Consumer Group B (通知サービス)
                               -> Consumer Group C (分析サービス)

Event Store:
  [Event #1: OrderCreated]
  [Event #2: ItemAdded]
  [Event #3: PaymentReceived]
  [Event #4: OrderShipped]
  -> イベント#1~#4をリプレイすると現在の注文状態を復元可能

2-3. Eventual Consistencyの理解

分散システムで強い一貫性(いっかんせい)(Strong Consistency)を維持するには、分散ロック、2PC(2-Phase Commit)等が必要ですが、パフォーマンスと可用性が大幅に低下します。

CAP定理(ていり)によると、ネットワークパーティション時に一貫性(C)と可用性(A)のどちらかを選択する必要があります。ほとんどの大規模システムは可用性(A)を選択し、Eventual Consistencyを受容します。

Eventual Consistencyとは「今すぐはデータが不整合かもしれないが、十分な時間が経てば全てのノードが同じ状態に収束(しゅうそく)する」ということです。

実務での意味:

  • 注文後に在庫がすぐに減らないことがある(ミリ秒〜数秒の遅延)
  • ユーザーには「注文処理中です」を先に表示し、確定後に更新
  • 一貫性ウィンドウ(Consistency Window)の最小化が鍵

3. CQRS ディープダイブ

CQRS(Command Query Responsibility Segregation)は読み取りと書き込みモデルを完全に分離するパターンです。

3-1. 基本概念

従来

( じゅうらい )

のCRUDモデルでは、一つのデータモデルが読み取りと書き込みの両方を担当します。しかし実際には、読み取りと書き込みは非常に異なる要件を持ちます。

従来のCRUD:
  Client -> [単一モデル] -> Database
            (読み取り/書き込み両方)

CQRS:
  Client --Command--> [Write Model] -> Command DB (正規化)
                            | Events
  Client <--Query---- [Read Model]  <- Read DB (非正規化、最適化)

Write Model(コマンド側):

  • ビジネスルール検証
  • ドメイン不変条件(Invariant)の保証
  • 正規化されたスキーマ
  • 強い一貫性

Read Model(クエリ側):

  • クエリに最適化された非正規化ビュー
  • 多様なストレージの利用可能(Elasticsearch, Redis, MongoDB)
  • Eventual Consistency許容

3-2. いつCQRSを使うべきか?

適している場合:

  • 読み取り/書き込み比率が極端に異なるとき(読み取りが100:1以上)
  • 複雑なドメインロジック + 多様なクエリ要件
  • 読み取りと書き込みのスケーリング要件が異なるとき
  • Event Sourcingと組み合わせて使うとき

適していない場合:

  • 単純なCRUDアプリケーション
  • 強い一貫性が絶対に必要な場合
  • ドメインが単純でクエリパターンが予測可能な場合

3-3. Spring Boot + Kafka 実装

// === Command側:注文作成 ===

@Service
@RequiredArgsConstructor
public class OrderCommandService {

    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    @Transactional
    public UUID createOrder(CreateOrderCommand command) {
        // 1. ビジネスルール検証
        validateOrder(command);

        // 2. ドメインモデルで処理
        Order order = Order.create(
            command.customerId(),
            command.items(),
            command.shippingAddress()
        );

        // 3. Write DBに保存
        orderRepository.save(order);

        // 4. イベント発行(Read Model同期用)
        OrderCreatedEvent event = new OrderCreatedEvent(
            order.getId(),
            order.getCustomerId(),
            order.getTotalAmount(),
            order.getItems().stream()
                .map(this::toEventItem)
                .toList(),
            Instant.now()
        );
        kafkaTemplate.send("order-events", order.getId().toString(), event);

        return order.getId();
    }

    private void validateOrder(CreateOrderCommand command) {
        if (command.items().isEmpty()) {
            throw new InvalidOrderException("注文項目が空です");
        }
        // 在庫確認、顧客ステータス確認など
    }
}
// === Query側:注文照会プロジェクション ===

@Service
@RequiredArgsConstructor
public class OrderQueryService {

    private final OrderReadRepository readRepository;

    public OrderDetailView getOrderDetail(UUID orderId) {
        return readRepository.findById(orderId)
            .orElseThrow(() -> new OrderNotFoundException(orderId));
    }

    public Page<OrderSummaryView> getCustomerOrders(
            UUID customerId, Pageable pageable) {
        return readRepository.findByCustomerId(customerId, pageable);
    }
}

// Read Model - クエリに最適化された非正規化ビュー
@Document(collection = "order_views")
public record OrderDetailView(
    UUID orderId,
    UUID customerId,
    String customerName,      // 非正規化:顧客名を含む
    String customerEmail,
    BigDecimal totalAmount,
    String status,
    List<OrderItemView> items,
    AddressView shippingAddress,
    Instant createdAt,
    Instant lastUpdatedAt
) {}
// === イベントハンドラー:Write -> Read 同期 ===

@Component
@RequiredArgsConstructor
public class OrderProjectionHandler {

    private final OrderReadRepository readRepository;
    private final CustomerQueryClient customerClient;

    @KafkaListener(topics = "order-events", groupId = "order-projection")
    public void handleOrderEvent(OrderEvent event) {
        switch (event) {
            case OrderCreatedEvent e -> handleOrderCreated(e);
            case OrderStatusChangedEvent e -> handleStatusChanged(e);
            case OrderCancelledEvent e -> handleOrderCancelled(e);
            default -> log.warn("不明なイベントタイプ: {}", event.getClass());
        }
    }

    private void handleOrderCreated(OrderCreatedEvent event) {
        // 顧客情報を取得して非正規化ビューを生成
        CustomerInfo customer = customerClient.getCustomer(event.customerId());

        OrderDetailView view = new OrderDetailView(
            event.orderId(),
            event.customerId(),
            customer.name(),
            customer.email(),
            event.totalAmount(),
            "CREATED",
            event.items().stream().map(this::toItemView).toList(),
            null,
            event.occurredAt(),
            event.occurredAt()
        );
        readRepository.save(view);
    }
}

3-4. Read Model 同期戦略

戦略遅延一貫性複雑度適している場合
イベントハンドラー(Kafka)数ms〜数秒Eventualほとんどの場合
CDC(Debezium)数秒Eventualレガシーシステム連携
ポーリング(Scheduled)数秒〜数分Eventual単純な場合
同期式更新即時Strongパフォーマンス要件が低い時

4. Sagaパターン完全攻略(こうりゃく)

マイクロサービスで複数のサービスにまたがるビジネストランザクションをどう管理するか?従来のモノリスのDBトランザクションは使用できません。Sagaパターンは分散トランザクションを一連のローカルトランザクションに分解します。

4-1. Choreography(振付(ふりつけ)型)

Choreography方式では中央コーディネーターがありません。各サービスがイベントを発行し、次のサービスがそのイベントを購読して自身のローカルトランザクションを実行します。

ECサイト注文フロー(Choreography):

Order Service --OrderPlaced--> Payment Service
                                    |
                            PaymentCompleted
                                    |
                                    v
                              Inventory Service
                                    |
                            InventoryReserved
                                    |
                                    v
                              Shipping Service
                                    |
                            ShipmentCreated
                                    |
                                    v
                              Order Service (ステータス更新)

失敗時(補償トランザクション):
Inventory Service --InventoryReservationFailed--> Payment Service (返金)
                                                        |
                                                 PaymentRefunded
                                                        |
                                                        v
                                                 Order Service (注文キャンセル)
// === Choreography Saga:注文サービス ===

@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Transactional
    public UUID placeOrder(PlaceOrderCommand command) {
        Order order = Order.create(command);
        order.setStatus(OrderStatus.PENDING);
        orderRepository.save(order);

        // イベント発行 - 決済サービスが購読
        kafkaTemplate.send("order-events", new OrderPlacedEvent(
            order.getId(),
            order.getCustomerId(),
            order.getTotalAmount(),
            order.getItems()
        ));

        return order.getId();
    }

    // 決済完了イベント受信
    @KafkaListener(topics = "payment-events", groupId = "order-service")
    public void onPaymentCompleted(PaymentCompletedEvent event) {
        Order order = orderRepository.findById(event.orderId()).orElseThrow();
        order.setStatus(OrderStatus.PAID);
        orderRepository.save(order);
    }

    // 決済失敗イベント受信
    @KafkaListener(topics = "payment-events", groupId = "order-service")
    public void onPaymentFailed(PaymentFailedEvent event) {
        Order order = orderRepository.findById(event.orderId()).orElseThrow();
        order.setStatus(OrderStatus.CANCELLED);
        order.setCancellationReason(event.reason());
        orderRepository.save(order);
    }
}
// === Choreography Saga:決済サービス ===

@Service
@RequiredArgsConstructor
public class PaymentService {

    private final PaymentRepository paymentRepository;
    private final PaymentGateway paymentGateway;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @KafkaListener(topics = "order-events", groupId = "payment-service")
    @Transactional
    public void onOrderPlaced(OrderPlacedEvent event) {
        try {
            // 決済処理
            PaymentResult result = paymentGateway.charge(
                event.customerId(),
                event.totalAmount()
            );

            Payment payment = Payment.create(
                event.orderId(),
                event.customerId(),
                event.totalAmount(),
                result.transactionId()
            );
            paymentRepository.save(payment);

            // 成功イベント発行 - 在庫サービスが購読
            kafkaTemplate.send("payment-events", new PaymentCompletedEvent(
                event.orderId(),
                payment.getId(),
                result.transactionId()
            ));

        } catch (PaymentException e) {
            // 失敗イベント発行 - 注文サービスが購読してキャンセル
            kafkaTemplate.send("payment-events", new PaymentFailedEvent(
                event.orderId(),
                e.getMessage()
            ));
        }
    }
}

Choreographyの長所(ちょうしょ)短所(たんしょ)

長所短所
サービス間の疎結合全体フローの把握が困難
各サービスが独立にデプロイ可能デバッグが複雑
単一障害点(SPOF)なし循環依存の発生可能性
実装が比較的シンプルサービス増加でイベント追跡の難易度上昇

4-2. Orchestration(指揮(しき)型)

Orchestration方式では中央コーディネーター(Orchestrator)が全体のワークフローを管理します。各サービスにコマンドを送り、レスポンスに応じて次のステップを決定します。

ECサイト注文フロー(Orchestration):

                    Order Saga Orchestrator
                   /          |          \
                  v           v           v
          Payment Service  Inventory  Shipping
          (charge)         (reserve)  (create)
                  v           v           v
          (result)        (result)    (result)
                   \          |          /
                    Order Saga Orchestrator
                         (最終決定)
// === Orchestration Saga:Temporalフレームワーク使用 ===

@WorkflowInterface
public interface OrderSagaWorkflow {

    @WorkflowMethod
    OrderResult processOrder(PlaceOrderCommand command);
}

@WorkflowImpl(workflows = OrderSagaWorkflow.class)
public class OrderSagaWorkflowImpl implements OrderSagaWorkflow {

    // 各サービスのActivityスタブ
    private final PaymentActivities paymentActivities =
        Workflow.newActivityStub(PaymentActivities.class,
            ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofSeconds(30))
                .setRetryOptions(RetryOptions.newBuilder()
                    .setMaximumAttempts(3)
                    .build())
                .build());

    private final InventoryActivities inventoryActivities =
        Workflow.newActivityStub(InventoryActivities.class,
            ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofSeconds(30))
                .build());

    private final ShippingActivities shippingActivities =
        Workflow.newActivityStub(ShippingActivities.class,
            ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofSeconds(60))
                .build());

    @Override
    public OrderResult processOrder(PlaceOrderCommand command) {
        String paymentId = null;
        String reservationId = null;

        try {
            // Step 1:決済
            paymentId = paymentActivities.processPayment(
                command.customerId(),
                command.totalAmount()
            );

            // Step 2:在庫予約
            reservationId = inventoryActivities.reserveInventory(
                command.items()
            );

            // Step 3:配送作成
            String shipmentId = shippingActivities.createShipment(
                command.orderId(),
                command.shippingAddress()
            );

            return OrderResult.success(command.orderId(), shipmentId);

        } catch (Exception e) {
            // 補償トランザクション実行
            compensate(paymentId, reservationId);
            return OrderResult.failed(command.orderId(), e.getMessage());
        }
    }

    private void compensate(String paymentId, String reservationId) {
        // 逆順で補償トランザクションを実行
        if (reservationId != null) {
            inventoryActivities.cancelReservation(reservationId);
        }
        if (paymentId != null) {
            paymentActivities.refundPayment(paymentId);
        }
    }
}
// === Activity実装 ===

@ActivityInterface
public interface PaymentActivities {
    String processPayment(UUID customerId, BigDecimal amount);
    void refundPayment(String paymentId);
}

@Component
@RequiredArgsConstructor
public class PaymentActivitiesImpl implements PaymentActivities {

    private final PaymentGateway paymentGateway;
    private final PaymentRepository paymentRepository;

    @Override
    public String processPayment(UUID customerId, BigDecimal amount) {
        PaymentResult result = paymentGateway.charge(customerId, amount);
        Payment payment = Payment.create(customerId, amount, result.transactionId());
        paymentRepository.save(payment);
        return payment.getId().toString();
    }

    @Override
    public void refundPayment(String paymentId) {
        Payment payment = paymentRepository.findById(UUID.fromString(paymentId))
            .orElseThrow();
        paymentGateway.refund(payment.getTransactionId());
        payment.setStatus(PaymentStatus.REFUNDED);
        paymentRepository.save(payment);
    }
}

Orchestrationの長所と短所:

長所短所
ワークフロー全体が一箇所で管理されるオーケストレーターがSPOFになる可能性
デバッグとモニタリングが容易Choreographyより結合度が高い
複雑な補償ロジックの管理が容易オーケストレーターにロジックが集中するリスク
Temporal/Cadence等のフレームワークサポート追加インフラ(ワークフローエンジン)が必要

4-3. Choreography vs Orchestration 選択ガイド

基準ChoreographyOrchestration
サービス数3〜4個以下5個以上
ワークフロー複雑度線形的、シンプル条件分岐、並列処理
可観測性要件
チーム構成独立したチーム中央プラットフォームチーム
補償ロジックシンプル複雑(複数経路)

4-4. Compensating Transactions(補償(ほしょう)トランザクション)

補償トランザクションは、既にコミットされたトランザクションを意味論的(いみろんてき)に取り消す操作です。物理的なロールバックではなく、逆作用(reverse action)を実行します。

// 補償トランザクションの例
public enum CompensationAction {
    PAYMENT_CHARGE    -> PAYMENT_REFUND,
    INVENTORY_RESERVE -> INVENTORY_RELEASE,
    SHIPPING_CREATE   -> SHIPPING_CANCEL,
    COUPON_APPLY      -> COUPON_RESTORE,
    POINTS_DEDUCT     -> POINTS_RESTORE
}

Semantic Compensation vs Exact Compensation:

  • Exact Compensation:正確に元の状態に復元(例:決済キャンセル → 全額返金)
  • Semantic Compensation:ビジネス的に意味のある補償(例:航空券キャンセル → 一部返金 + クレジット)

実務ではSemantic Compensationがより一般的です。時間が経つと正確な復元が不可能な場合が多いためです。


5. Event Sourcing

Event Sourcingは、エンティティの現在の状態を保存する代わりに、状態変更を引き起こした全てのイベントを順序通りに保存するパターンです。

5-1. 核心(かくしん)概念

従来のCRUD:
  Accountテーブル: id=1, balance=750

Event Sourcing:
  Event #1: AccountCreated(id=1, initialBalance=1000)
  Event #2: MoneyWithdrawn(id=1, amount=200)
  Event #3: MoneyDeposited(id=1, amount=150)
  Event #4: MoneyWithdrawn(id=1, amount=200)
  -> リプレイ: 1000 - 200 + 150 - 200 = 750 (現在の残高)

全ての状態変更が不変(ふへん)(immutable)なイベントログに追記(append-only)されます。削除や変更はありません。

5-2. Event Store 実装

// === Event Store インターフェース ===

public interface EventStore {
    void saveEvents(UUID aggregateId, List<DomainEvent> events, int expectedVersion);
    List<DomainEvent> getEvents(UUID aggregateId);
    List<DomainEvent> getEvents(UUID aggregateId, int fromVersion);
}

// === PostgreSQLベースEvent Store実装 ===

@Repository
@RequiredArgsConstructor
public class PostgresEventStore implements EventStore {

    private final JdbcTemplate jdbcTemplate;
    private final ObjectMapper objectMapper;

    @Override
    @Transactional
    public void saveEvents(UUID aggregateId, List<DomainEvent> events,
                          int expectedVersion) {
        // 楽観的同時実行制御
        Integer currentVersion = jdbcTemplate.queryForObject(
            "SELECT COALESCE(MAX(version), 0) FROM event_store WHERE aggregate_id = ?",
            Integer.class, aggregateId
        );

        if (currentVersion != expectedVersion) {
            throw new ConcurrencyException(
                "同時実行の衝突: 期待バージョン " + expectedVersion +
                ", 実際バージョン " + currentVersion
            );
        }

        int version = expectedVersion;
        for (DomainEvent event : events) {
            version++;
            jdbcTemplate.update(
                """
                INSERT INTO event_store
                (event_id, aggregate_id, aggregate_type, event_type,
                 event_data, version, created_at)
                VALUES (?, ?, ?, ?, ?::jsonb, ?, ?)
                """,
                UUID.randomUUID(),
                aggregateId,
                event.getAggregateType(),
                event.getClass().getSimpleName(),
                objectMapper.writeValueAsString(event),
                version,
                Instant.now()
            );
        }
    }

    @Override
    public List<DomainEvent> getEvents(UUID aggregateId) {
        return jdbcTemplate.query(
            "SELECT * FROM event_store WHERE aggregate_id = ? ORDER BY version",
            this::mapRowToEvent,
            aggregateId
        );
    }
}
-- Event Storeテーブルスキーマ
CREATE TABLE event_store (
    event_id       UUID PRIMARY KEY,
    aggregate_id   UUID NOT NULL,
    aggregate_type VARCHAR(255) NOT NULL,
    event_type     VARCHAR(255) NOT NULL,
    event_data     JSONB NOT NULL,
    version        INTEGER NOT NULL,
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    UNIQUE (aggregate_id, version)  -- 楽観的同時実行制御
);

CREATE INDEX idx_event_store_aggregate ON event_store (aggregate_id, version);
CREATE INDEX idx_event_store_type ON event_store (event_type, created_at);

5-3. Aggregateでのイベント適用

// === Event-Sourced Aggregate ===

public class OrderAggregate {

    private UUID id;
    private UUID customerId;
    private OrderStatus status;
    private BigDecimal totalAmount;
    private List<OrderItem> items = new ArrayList<>();
    private int version = 0;

    // 未コミットイベントリスト
    private final List<DomainEvent> uncommittedEvents = new ArrayList<>();

    // イベントリプレイで状態復元
    public static OrderAggregate fromHistory(List<DomainEvent> history) {
        OrderAggregate aggregate = new OrderAggregate();
        for (DomainEvent event : history) {
            aggregate.apply(event, false);
        }
        return aggregate;
    }

    // 新しいコマンド処理
    public void placeOrder(UUID customerId, List<OrderItem> items) {
        if (this.status != null) {
            throw new IllegalStateException("既に作成された注文です");
        }

        BigDecimal total = items.stream()
            .map(item -> item.price().multiply(BigDecimal.valueOf(item.quantity())))
            .reduce(BigDecimal.ZERO, BigDecimal::add);

        apply(new OrderPlacedEvent(this.id, customerId, total, items, Instant.now()), true);
    }

    public void cancelOrder(String reason) {
        if (this.status == OrderStatus.SHIPPED) {
            throw new IllegalStateException("配送済みの注文はキャンセルできません");
        }
        apply(new OrderCancelledEvent(this.id, reason, Instant.now()), true);
    }

    // イベント適用(状態変更)
    private void apply(DomainEvent event, boolean isNew) {
        when(event);  // 状態変更
        this.version++;
        if (isNew) {
            uncommittedEvents.add(event);
        }
    }

    // イベント別の状態変更ロジック
    private void when(DomainEvent event) {
        switch (event) {
            case OrderPlacedEvent e -> {
                this.id = e.orderId();
                this.customerId = e.customerId();
                this.totalAmount = e.totalAmount();
                this.items = new ArrayList<>(e.items());
                this.status = OrderStatus.PLACED;
            }
            case OrderCancelledEvent e -> {
                this.status = OrderStatus.CANCELLED;
            }
            case OrderShippedEvent e -> {
                this.status = OrderStatus.SHIPPED;
            }
            default -> throw new IllegalArgumentException("不明なイベント: " + event);
        }
    }

    public List<DomainEvent> getUncommittedEvents() {
        return Collections.unmodifiableList(uncommittedEvents);
    }

    public void clearUncommittedEvents() {
        uncommittedEvents.clear();
    }
}

5-4. Snapshotパターン

イベントが数万個蓄積(ちくせき)されると、毎回全てをリプレイするのは非効率です。Snapshotを使って特定時点の状態を保存しておけば、その後のイベントだけをリプレイすれば済みます。

// Snapshot保存と復元
@Service
public class SnapshotService {

    private static final int SNAPSHOT_INTERVAL = 100; // 100イベントごと

    public OrderAggregate loadAggregate(UUID aggregateId) {
        // 1. 最新スナップショットを取得
        Optional<Snapshot> snapshot = snapshotRepository
            .findLatest(aggregateId);

        if (snapshot.isPresent()) {
            // 2. スナップショットから状態復元
            OrderAggregate aggregate = deserialize(snapshot.get().getData());
            int fromVersion = snapshot.get().getVersion();

            // 3. スナップショット以降のイベントだけリプレイ
            List<DomainEvent> newEvents = eventStore
                .getEvents(aggregateId, fromVersion + 1);
            for (DomainEvent event : newEvents) {
                aggregate.apply(event, false);
            }
            return aggregate;

        } else {
            // スナップショットなし、全てリプレイ
            List<DomainEvent> allEvents = eventStore.getEvents(aggregateId);
            return OrderAggregate.fromHistory(allEvents);
        }
    }

    public void saveIfNeeded(OrderAggregate aggregate) {
        if (aggregate.getVersion() % SNAPSHOT_INTERVAL == 0) {
            snapshotRepository.save(new Snapshot(
                aggregate.getId(),
                aggregate.getVersion(),
                serialize(aggregate),
                Instant.now()
            ));
        }
    }
}

5-5. Event Sourcing vs CRUD 比較

基準CRUDEvent Sourcing
保存方式現在の状態のみ保存全ての状態変更イベントを保存
監査証跡別途実装が必要内蔵(イベント自体が監査ログ)
タイムトラベル不可能可能(任意時点の状態を復元)
ストレージ容量相対的に少ないイベント蓄積で増加
複雑度低い高い(イベント設計、スナップショット等)
同時実行制御悲観的/楽観的ロックイベントバージョンベースの楽観的制御
パフォーマンス読み取り最適化が容易リプレイコスト(スナップショットで緩和)
スキーマ変更マイグレーションが必要イベントアップキャスティング
CQRS連携選択事項自然な組み合わせ

5-6. Event Sourcingを使うべきでないとき

  • 単純なCRUDドメイン(ブログ、設定管理)
  • 強い一貫性が必須の場合(リアルタイム残高確認が必要なATM)
  • イベント設計が不安定な初期段階(頻繁な変更時にアップキャスティングコスト)
  • チームの経験が不足している場合(学習曲線が急)
  • 規制要件でデータ削除が必要な場合(GDPRの忘れられる権利と衝突)

6. Outbox Pattern(信頼性の核心)

6-1. 問題:二重書込(にじゅうかきこみ)(Dual Write)

分散システムで最も頻繁(ひんぱん)なミスの一つは、データベース保存とメッセージ発行を別々に実行することです。

// 危険なコード:二重書込問題
@Transactional
public void createOrder(Order order) {
    orderRepository.save(order);           // Step 1: DB保存
    kafkaTemplate.send("orders", event);   // Step 2: メッセージ発行
    // Step 1成功 + Step 2失敗 = DBにはあるがイベントは未発行
    // Step 1成功 + Step 2成功 + DBコミット失敗 = イベントは発行されたがDBにない
}

DBトランザクションとメッセージブローカー送信は異なるインフラであるため、アトミックに処理できません。

6-2. 解決:Outbox Pattern

Outboxパターンは、イベントをDBトランザクション内でoutboxテーブルに保存し、別プロセスがこれをメッセージブローカーに配信します。

1. ビジネスロジック + outbox INSERT = 単一DBトランザクション(原子性保証)
2. 別プロセス(CDC/Poller)がoutboxテーブルを読んでKafkaに発行
3. 発行成功後、outboxレコードを処理完了にマーク
-- Outboxテーブルスキーマ
CREATE TABLE outbox (
    id              UUID PRIMARY KEY,
    aggregate_type  VARCHAR(255) NOT NULL,
    aggregate_id    UUID NOT NULL,
    event_type      VARCHAR(255) NOT NULL,
    payload         JSONB NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMPTZ,
    status          VARCHAR(20) DEFAULT 'PENDING'
);

CREATE INDEX idx_outbox_pending ON outbox (status, created_at)
    WHERE status = 'PENDING';
// === Outboxパターン実装 ===

@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;

    @Transactional  // 単一トランザクションで保証
    public UUID createOrder(CreateOrderCommand command) {
        // 1. ビジネスロジック実行
        Order order = Order.create(command);
        orderRepository.save(order);

        // 2. 同じトランザクションでoutboxにイベント保存
        OutboxMessage message = OutboxMessage.builder()
            .aggregateType("Order")
            .aggregateId(order.getId())
            .eventType("OrderCreated")
            .payload(objectMapper.writeValueAsString(
                new OrderCreatedEvent(order.getId(), order.getTotalAmount())
            ))
            .build();
        outboxRepository.save(message);

        return order.getId();
        // トランザクションコミット時にorderとoutboxの両方が一緒に保存される
    }
}

6-3. CDC(Debezium)方式

DebeziumはDBのトランザクションログ(WAL)を監視して変更をキャプチャし、Kafkaに配信します。

{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "dbz_password",
    "database.dbname": "orderdb",
    "database.server.name": "order-service",
    "table.include.list": "public.outbox",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.by.field": "aggregate_type",
    "transforms.outbox.route.topic.replacement": "events.order"
  }
}

6-4. Polling vs CDC 比較

基準PollingCDC (Debezium)
遅延ポーリング間隔に依存(数秒〜数分)ほぼリアルタイム(数ms)
DB負荷定期的クエリで負荷発生WAL読み取りで負荷最小
実装複雑度低い(スケジューラー + クエリ)高い(Debeziumインフラが必要)
順序保証outboxテーブルの順序WAL順序のまま
運用負荷低いKafka Connectクラスターの運用が必要
スケーラビリティ単一ポーラーがボトルネック可能高いスケーラビリティ

7. 冪等性(べきとうせい)パターン

分散システムでは、メッセージは最低1回(at-least-once)配信されます。ネットワーク障害、ブローカー再起動、コンシューマー再デプロイ等により重複受信が発生します。冪等性は同じメッセージを複数回処理しても、結果が1回処理した場合と同じになることを保証します。

7-1. Producer冪等性(Kafka)

# Kafka Producer設定
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.properties.acks=all
spring.kafka.producer.properties.max.in.flight.requests.per.connection=5
spring.kafka.producer.properties.retries=2147483647

Kafkaの冪等プロデューサーは各メッセージにシーケンス番号を付与し、ブローカーが重複を検出します。

7-2. Consumer冪等性

// === 冪等コンシューマー実装 ===

@Component
@RequiredArgsConstructor
public class IdempotentOrderEventHandler {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderProjectionRepository orderProjectionRepository;

    @KafkaListener(topics = "order-events", groupId = "order-projection")
    @Transactional
    public void handleOrderEvent(
            @Payload OrderEvent event,
            @Header(KafkaHeaders.RECEIVED_KEY) String key,
            @Header("eventId") String eventId) {

        // 1. 既に処理済みのイベントか確認
        if (processedEventRepository.existsById(eventId)) {
            log.info("イベント重複受信を無視: eventId={}", eventId);
            return;
        }

        // 2. ビジネスロジック処理
        processEvent(event);

        // 3. 処理完了を記録(同一トランザクション)
        processedEventRepository.save(new ProcessedEvent(
            eventId,
            event.getClass().getSimpleName(),
            Instant.now()
        ));
    }

    private void processEvent(OrderEvent event) {
        switch (event) {
            case OrderCreatedEvent e -> createOrderProjection(e);
            case OrderStatusChangedEvent e -> updateOrderStatus(e);
            default -> log.warn("不明なイベントタイプ");
        }
    }
}
-- イベント処理履歴テーブル
CREATE TABLE processed_events (
    event_id    VARCHAR(255) PRIMARY KEY,
    event_type  VARCHAR(255) NOT NULL,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

7-3. Unique Constraint vs Redis SET NX

アプローチ長所短所
DB Unique Constraintトランザクションと共にアトミックDB負荷が増加
Redis SET NX非常に高速な照会Redis障害時に保証されない
DB + Redis組み合わせ高速な一次フィルター + 確実な保証実装が複雑
// Redis SET NXを利用した高速重複チェック
@Component
public class RedisIdempotencyGuard {

    private final StringRedisTemplate redisTemplate;
    private static final Duration TTL = Duration.ofHours(24);

    public boolean tryAcquire(String idempotencyKey) {
        Boolean result = redisTemplate.opsForValue()
            .setIfAbsent(
                "idempotency:" + idempotencyKey,
                "processed",
                TTL
            );
        return Boolean.TRUE.equals(result);
    }
}

8. Dead Letter Queue 戦略

メッセージ処理に失敗した場合、無限リトライはシステムを麻痺(まひ)させます。DLQ(Dead Letter Queue)は処理不能なメッセージを別キューに隔離してシステムの安定性を保証します。

8-1. Retry with Exponential Backoff

// === Spring Kafka リトライ + DLQ設定 ===

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public DefaultErrorHandler errorHandler(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // DLQに送るrecoverer
        DeadLetterPublishingRecoverer recoverer =
            new DeadLetterPublishingRecoverer(kafkaTemplate,
                (record, ex) -> new TopicPartition(
                    record.topic() + ".DLQ", record.partition()));

        // Exponential Backoff リトライ設定
        ExponentialBackOff backOff = new ExponentialBackOff();
        backOff.setInitialInterval(1000L);    // 1秒
        backOff.setMultiplier(2.0);            // 2倍ずつ増加
        backOff.setMaxInterval(60000L);        // 最大60秒
        backOff.setMaxElapsedTime(300000L);    // 最大5分

        DefaultErrorHandler errorHandler =
            new DefaultErrorHandler(recoverer, backOff);

        // リトライしない例外(ビジネスエラー)
        errorHandler.addNotRetryableExceptions(
            InvalidOrderException.class,
            DuplicateEventException.class
        );

        return errorHandler;
    }
}

8-2. DLQ モニタリングとアラート

// === DLQモニタリングサービス ===

@Component
@RequiredArgsConstructor
public class DlqMonitoringService {

    private final MeterRegistry meterRegistry;
    private final AlertService alertService;

    @KafkaListener(topics = "order-events.DLQ", groupId = "dlq-monitor")
    public void monitorDlq(
            ConsumerRecord<String, Object> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String errorMessage,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic) {

        // メトリクス記録
        meterRegistry.counter("dlq.messages.received",
            "originalTopic", originalTopic,
            "errorType", classifyError(errorMessage)
        ).increment();

        // アラート送信
        alertService.sendAlert(DlqAlert.builder()
            .originalTopic(originalTopic)
            .messageKey(record.key())
            .errorMessage(errorMessage)
            .timestamp(Instant.ofEpochMilli(record.timestamp()))
            .severity(classifySeverity(errorMessage))
            .build()
        );

        log.error("DLQメッセージ受信: topic={}, key={}, error={}",
            originalTopic, record.key(), errorMessage);
    }
}

8-3. DLQ 再処理パイプライン

// === DLQメッセージ再処理 ===

@RestController
@RequestMapping("/api/dlq")
@RequiredArgsConstructor
public class DlqReprocessController {

    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final DlqMessageRepository dlqRepository;

    // 単件再処理
    @PostMapping("/reprocess/single")
    public ResponseEntity<String> reprocessSingle(
            @RequestParam String messageId) {
        DlqMessage message = dlqRepository.findById(messageId).orElseThrow();

        // 元のトピックに再発行
        kafkaTemplate.send(
            message.getOriginalTopic(),
            message.getKey(),
            message.getValue()
        );

        message.setStatus("REPROCESSED");
        message.setReprocessedAt(Instant.now());
        dlqRepository.save(message);

        return ResponseEntity.ok("再処理リクエスト完了: " + messageId);
    }

    // バッチ再処理
    @PostMapping("/reprocess/batch")
    public ResponseEntity<String> reprocessBatch(
            @RequestParam String originalTopic,
            @RequestParam(defaultValue = "100") int batchSize) {
        List<DlqMessage> messages = dlqRepository
            .findByOriginalTopicAndStatus(originalTopic, "PENDING",
                PageRequest.of(0, batchSize));

        int count = 0;
        for (DlqMessage message : messages) {
            kafkaTemplate.send(message.getOriginalTopic(),
                message.getKey(), message.getValue());
            message.setStatus("REPROCESSED");
            count++;
        }
        dlqRepository.saveAll(messages);

        return ResponseEntity.ok(count + "件の再処理リクエスト完了");
    }
}

9. Exactly-Once Semantics

メッセージ配信保証レベルには3種類あります。

保証レベル説明複雑度
At-most-once最大1回配信(紛失の可能性)
At-least-once最低1回配信(重複の可能性)
Exactly-once正確に1回配信

9-1. Kafka Exactly-Once 実装

Kafkaは0.11からExactly-Once Semantics(EOS)をサポートしています。

// === Kafka Exactly-Once設定 ===

@Configuration
public class KafkaExactlyOnceConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTransactionManager<String, Object> kafkaTransactionManager(
            ProducerFactory<String, Object> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new DefaultKafkaConsumerFactory<>(config);
    }
}

9-2. End-to-End Exactly-Once

真のEnd-to-End Exactly-Onceは、ProducerからConsumerの最終状態保存まで保証する必要があります。

End-to-End Exactly-Onceチェーン:

Producer (idempotent + transactions)
    -> Kafka Broker (transaction log)
        -> Consumer (read_committed + manual offset)
            -> Database (idempotency key + business logic)

チェーンの全リンクが連携してExactly-Onceを保証
// === End-to-End Exactly-Once処理 ===

@Component
@RequiredArgsConstructor
public class ExactlyOnceOrderProcessor {

    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final OrderRepository orderRepository;
    private final ProcessedOffsetRepository offsetRepository;

    @KafkaListener(topics = "payment-completed")
    @Transactional("chainedKafkaTransactionManager")
    public void processPaymentCompleted(
            ConsumerRecord<String, PaymentCompletedEvent> record) {

        String offsetKey = record.topic() + "-" +
            record.partition() + "-" + record.offset();

        // 既に処理済みのオフセットか確認
        if (offsetRepository.existsById(offsetKey)) {
            return;
        }

        // ビジネスロジック
        PaymentCompletedEvent event = record.value();
        Order order = orderRepository.findById(event.orderId()).orElseThrow();
        order.markAsPaid(event.paymentId());
        orderRepository.save(order);

        // 次のステップのイベント発行(同じKafkaトランザクション)
        kafkaTemplate.send("order-paid",
            new OrderPaidEvent(order.getId(), event.paymentId()));

        // 処理済みオフセットを記録(同じDBトランザクション)
        offsetRepository.save(new ProcessedOffset(offsetKey, Instant.now()));
    }
}

10. Reactive Streams vs Virtual Threads(2025)

10-1. Project Reactor:背圧(はいあつ)(Backpressure)が核心

リアクティブストリームはノンブロッキングI/Oと背圧(Backpressure)メカニズムを核心設計原則とします。

// === Reactorベースの非同期処理 ===

@Service
public class ReactiveOrderService {

    public Flux<OrderEvent> processOrderStream() {
        return kafkaReceiver.receive()
            .flatMap(record -> {
                OrderEvent event = record.value();
                return processEvent(event)
                    .then(record.receiverOffset().commit())
                    .thenReturn(event);
            }, 16)  // 同時処理数制限(背圧)
            .onErrorResume(e -> {
                log.error("処理失敗", e);
                return Mono.empty();
            })
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
    }

    private Mono<Void> processEvent(OrderEvent event) {
        return Mono.fromCallable(() -> {
            // ビジネスロジック
            return null;
        }).subscribeOn(Schedulers.boundedElastic()).then();
    }
}

10-2. Java 21 Virtual Threads:よりシンプルなコード

Virtual Threadsは従来の同期コードスタイルを維持(いじ)しながら、高い同時実行性を実現します。

// === Virtual Threadsベースの非同期処理 ===

@Configuration
public class VirtualThreadConfig {

    @Bean
    public TomcatProtocolHandlerCustomizer<?> protocolHandlerCustomizer() {
        return protocolHandler -> {
            protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
        };
    }
}

@Service
public class VirtualThreadOrderService {

    // 同期コードだがVirtual Thread上で実行
    // -> ブロッキングI/O時にOSスレッドではなくVirtual Threadのみブロッキング
    public OrderResult processOrder(UUID orderId) {
        // 並列処理:StructuredTaskScope活用
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

            Subtask<PaymentResult> paymentTask = scope.fork(() ->
                paymentService.processPayment(orderId)
            );
            Subtask<InventoryResult> inventoryTask = scope.fork(() ->
                inventoryService.checkInventory(orderId)
            );

            scope.join();
            scope.throwIfFailed();

            return new OrderResult(
                paymentTask.get(),
                inventoryTask.get()
            );
        }
    }
}

10-3. 選択ガイド:意思決定マトリクス

基準Reactive (Reactor/WebFlux)Virtual Threads
コード複雑度高い(リアクティブ演算子の学習)低い(既存の同期コード)
背圧制御内蔵(Flux/Mono)手動実装が必要
デバッグ困難(複雑なスタックトレース)容易(従来のスタックトレース)
メモリ効率非常に良い良い(OSスレッドより)
I/O集約ワークロード最適非常に良い
CPU集約ワークロード不向き不向き(プラットフォームスレッド使用)
レガシーコード移行全面書き換えが必要設定変更のみで可能
エコシステム成熟度高い(5年以上)成長中(2023〜)

推奨ガイド:

  • 新プロジェクト + シンプルなI/O:Virtual Threads
  • ストリーミングデータ処理:Reactive Streams
  • 既存Spring MVCプロジェクト:Virtual Threads(移行コスト最小)
  • リアルタイムデータパイプライン:Reactive Streams

11. 実践アーキテクチャ:決済システム設計

全てのパターンを統合(とうごう)したECサイト決済システムの全体アーキテクチャを設計します。

                     ECサイト決済システム全体アーキテクチャ
                     ====================================

[Client] -> [API Gateway]
                |
                v
        [Order Service] --- Write DB (PostgreSQL)
           |                     |
           | (Outbox Pattern)    | (CDC - Debezium)
           |                     v
           |              [Kafka: order-events]
           |                /        |        \
           v               v        v         v
    [Saga Orchestrator]  [Payment] [Inventory] [Notification]
    (Temporal)           Service   Service     Service
           |                |        |
           | commands       |        |
           v                v        v
    [payment-commands]  [payment-  [inventory-
                        events]    events]
                           |
                    [DLQ + Monitoring]
                           |
                    [Alerting (PagerDuty)]

Read Side:
    [Kafka: order-events] -> [Order Projection] -> Read DB (MongoDB)
                           -> [Analytics Service] -> Data Warehouse
                           -> [Search Service] -> Elasticsearch

適用パターンまとめ

パターン適用箇所目的
CQRSOrder Service注文の読み取り/書き込み分離
Saga (Orchestration)Temporal Orchestrator決済-在庫-配送トランザクション管理
Event SourcingOrder Aggregate注文状態変更履歴の管理
Outbox PatternOrder Service → KafkaDB-メッセージの原子性保証
冪等性全Consumer重複メッセージ処理の防止
DLQ全Kafka Consumer処理失敗メッセージの隔離
Exactly-Once決済処理二重決済の防止

クイズ

Q1. CQRSでWrite ModelとRead Modelが異なるデータベースを使用する場合、2つのモデル間のデータ一貫性を保証するために最も一般的に使用されるアプローチは?

正解:イベントベースの非同期同期(Eventual Consistency)

Write Modelが状態を変更する際にイベントを発行し、Read Modelのプロジェクションハンドラーがこのイベントを購読してRead DBを更新します。2つのモデル間には一時的な不整合(数ms〜数秒)が存在し得ますが、最終的に一貫性が保証されます。

強い一貫性が必要な場合は、CQRSの適合性を再検討すべきです。

Q2. Choreography SagaでサービスAがイベントを発行し、サービスBが処理に失敗した場合、サービスAはどのようにこれを認知できますか?

正解: サービスBが失敗イベント(例:PaymentFailed)を発行し、サービスAがこれを購読して認知します。Choreographyでは各サービスが成功/失敗イベントの両方を発行する必要があり、関連サービスがこれを購読して適切に対応します。

これがChoreographyの短所の一つです。サービスが増えるほどイベントフローの追跡が複雑になり、タイムアウトベースの検知が追加で必要になることがあります。

Q3. Event Sourcingでイベントが100万個蓄積されたAggregateをロードする際のパフォーマンス問題を解決する核心技法は?

正解:Snapshotパターン

定期的に(例:100イベントごと)Aggregateの現在の状態をスナップショットとして保存します。ロード時に最新のスナップショットから開始し、その後のイベントだけをリプレイします。

例:100万個のイベント中、最後のスナップショットが999,900番目であれば、100個のイベントだけリプレイすれば良いです。

Q4. Outbox PatternでCDC(Change Data Capture)方式がPolling方式より好まれる主な理由は?

正解: CDCはDBのトランザクションログ(WAL)を直接読み取るため、ほぼリアルタイム(数ms)で変更を検知します。Pollingは定期的にクエリを実行するため遅延が大きく(数秒〜数分)、頻繁なクエリでDBに負荷をかけます。

また、CDCはWALの順序そのままでイベントを配信するため順序保証が自然で、DBトランザクションログを読むため追加のクエリ負荷がほとんどありません。

Q5. KafkaでExactly-Once Semanticsを達成するには、Producer、Broker、Consumerそれぞれでどのような設定が必要ですか?

正解:

  • Producer:enable.idempotence=true + transactional.id設定(冪等プロデューサー + トランザクション)
  • Broker:トランザクションログの維持(デフォルトで有効)
  • Consumer:isolation.level=read_committed + 手動オフセットコミット + 冪等処理

End-to-End Exactly-Onceのためには、ConsumerがDBに結果を保存する際にも冪等性を保証する必要があります。Kafkaトランザクションはkafka内部のExactly-Onceのみを保証し、外部システム(DB)まで含めるにはConsumer側の冪等性が必須です。


参考資料

  1. Martin Fowler — "Event Sourcing" (martinfowler.com)
  2. Chris Richardson — "Microservices Patterns" (Manning, 2018)
  3. Vaughn Vernon — "Implementing Domain-Driven Design" (Addison-Wesley, 2013)
  4. Greg Young — "CQRS Documents" (cqrs.files.wordpress.com)
  5. Apache Kafka Documentation — "Exactly-Once Semantics" (kafka.apache.org)
  6. Debezium Documentation — "Outbox Event Router" (debezium.io)
  7. Temporal.io Documentation — "Saga Pattern with Temporal" (docs.temporal.io)
  8. Netflix Tech Blog — "Evolution of the Netflix Data Pipeline" (netflixtechblog.com)
  9. Uber Engineering Blog — "Building Reliable Reprocessing and Dead Letter Queues" (eng.uber.com)
  10. Confluent Blog — "Exactly-Once Semantics Are Possible" (confluent.io)
  11. AWS Architecture Blog — "Saga Orchestration Pattern" (aws.amazon.com)
  12. Microsoft Azure Docs — "CQRS Pattern" (learn.microsoft.com)
  13. Spring for Apache Kafka Reference — "Error Handling and DLQ" (docs.spring.io)
  14. JEP 444 — "Virtual Threads" (openjdk.org)
  15. Project Reactor Reference — "Backpressure" (projectreactor.io)
  16. Pat Helland — "Idempotence Is Not a Medical Condition" (ACM Queue, 2012)