Skip to content
Published on

CQRS + Event Sourcingマイクロサービスアーキテクチャ:Kafkaベース実装とSagaパターン実践ガイド

Authors
  • Name
    Twitter
CQRS Event Sourcing Kafka

はじめに -- なぜCQRS + Event Sourcingなのか

マイクロサービスアーキテクチャが成熟するにつれ、サービス間のデータ整合性と状態管理に対する要求水準が変わってきた。従来のCRUDベースアーキテクチャは単一データベースに現在の状態のみを保管するため、「なぜこの状態になったのか」を遡って追跡できない。特に金融、EC、物流のように監査証跡(Audit Trail)が重要なドメインでは、状態変更の全履歴を保存する必要がある。

CQRS(Command Query Responsibility Segregation)は書き込みモデルと読み取りモデルを分離し、それぞれのモデルを独立的に最適化できるようにする。Event Sourcingは現在の状態ではなく状態変更イベントの不変シーケンスを保存し、任意の時点の状態をイベントリプレイで再構成できるようにする。この2つのパターンを組み合わせると、書き込み側はイベントをappend-onlyで記録し、読み取り側はイベントからプロジェクションされたビューを照会する、スケーラビリティとトレーサビリティを同時に確保したアーキテクチャを実現できる。

Apache Kafkaは分散コミットログという本質的な特性のおかげで、イベントストアの役割を自然に果たす。トピックパーティション別の順序保証、コンシューマグループベースの水平スケーリング、log compactionによるスナップショット維持などがEvent Sourcingに適した基盤を提供する。本記事では、Kafkaをイベントストアとして活用してCQRS + Event SourcingアーキテクチャをSpring Boot(Java/Kotlin)ベースで実装し、Sagaパターンによる分散トランザクション管理、プロダクション環境での運用ノウハウと障害復旧手順までを解説する。

CQRSパターンのコア原理

CommandとQueryの分離

CQRSの本質は、Bertrand MeyerのCQS(Command Query Separation)原則をアーキテクチャレベルに拡張したものである。CQSはメソッドレベルで「状態を変更するメソッド(Command)は値を返さず、値を返すメソッド(Query)は状態を変更しない」という原則である。CQRSはこれをシステムレベルに引き上げ、書き込みパスと読み取りパスを完全に独立したモデル(場合によっては独立したデータストア)に分離する。

**Command側(Write Model)**はドメイン不変条件(invariant)を検証し、ビジネスロジックを実行し、状態変更イベントを生成する。Aggregate単位で整合性境界を設定し、イベントストアにイベントをappendする。

**Query側(Read Model)**は事前にプロジェクションされた非正規化ビューからデータを照会する。読み取りパフォーマンスに最適化されたスキーマを自由に設計でき、書き込み側のドメインモデルとは独立して変更可能である。

Write ModelとRead Model分離の利点

書き込みと読み取りを分離すると、以下の利点が得られる。

  • 独立したスケーリング:読み取りトラフィックが書き込みトラフィックの10倍以上のシステムで、読み取り側のみを水平スケーリングできる
  • モデルの最適化:書き込み側は正規化されたドメインモデルで不変条件を強く保護し、読み取り側は非正規化ビューで照会パフォーマンスを最大化する
  • 複数プロジェクション:同一のイベントストリームから、検索用のElasticsearchインデックス、分析用のデータウェアハウス、リアルタイムダッシュボードなど多様な読み取りモデルを同時に構築できる
  • 技術スタックの多様化:書き込み側はPostgreSQL、読み取り側はMongoDBやRedisなど用途に合ったストレージを選択できる

Event Sourcingの概念とイベントストア

不変イベントログ

Event Sourcingにおいて、システムの真実の源泉(Source of Truth)は現在の状態ではなく、ドメインイベントの不変シーケンスである。すべての状態変更はイベントで表現され、一度記録されたイベントは絶対に変更・削除しない。現在の状態は、そのAggregateに属するすべてのイベントを順番にリプレイして再構成する。

OrderCreated { orderId: "ORD-001", customerId: "CUST-42", items: [...] }
OrderItemAdded { orderId: "ORD-001", productId: "PROD-7", qty: 2 }
PaymentReceived { orderId: "ORD-001", amount: 59000, method: "CARD" }
OrderShipped { orderId: "ORD-001", trackingNumber: "KR1234567890" }

上記のイベントストリームを最初からリプレイすると、注文ORD-001の現在の状態(配送完了、決済完了など)を正確に復元できる。OrderItemAddedがどのような文脈で発生したか、決済がいつ行われたかなど、状態変更の全履歴を完全に保存できる。

状態再構成(State Reconstruction)

Aggregateの現在の状態を求めるには、そのAggregateのすべてのイベントを最初から順番に適用(apply)する必要がある。イベント数が増えるとリプレイコストが増加するため、一定間隔でスナップショット(Snapshot)を保存してリプレイ開始点を短縮する。

[Snapshot at version 50: { status: "PAID", total: 59000, ... }]
[Event 51] OrderItemRemoved { ... }
[Event 52] RefundInitiated { ... }
... (現在のバージョンまで)

スナップショットはパフォーマンス最適化の手段に過ぎず、イベントログ自体が真実の源泉であるという点は変わらない。

Kafkaをイベントストアとして活用する

なぜKafkaなのか

Apache Kafkaは分散コミットログシステムであり、以下の特性がEvent Sourcingと相性が良い。

  • Append-onlyログ:パーティション内でメッセージの順序が保証され、一度書き込まれたメッセージは変更されない
  • 耐久性(Durability):レプリケーションファクター(replication factor)設定でデータ損失を防止する
  • 水平スケーリング:トピックパーティションを増やしてスループットを線形に拡張できる
  • コンシューマグループ:複数の読み取りモデルが独立的にイベントを消費できる
  • Log Compaction:キーごとに最新メッセージのみを保持して、スナップショットの役割を果たせる

トピック設計戦略

Event SourcingにおけるKafkaトピック設計は、アーキテクチャの核心的な決定事項である。Aggregateタイプ別にトピックを分離し、Aggregate IDをメッセージキーとして使用して、同じAggregateのイベントが同じパーティションに順番に保存されるようにする。

# kafka-topics.yml - トピック設計例
topics:
  # 注文Aggregateイベントトピック
  - name: order-events
    partitions: 12
    replication-factor: 3
    config:
      retention.ms: -1 # 無期限保存(イベントストア)
      cleanup.policy: delete # イベントログは削除ポリシー
      min.insync.replicas: 2 # 最低2つのISRを維持
      max.message.bytes: 1048576 # 1MBメッセージサイズ制限
      message.timestamp.type: CreateTime

  # 注文スナップショットトピック(Log Compaction)
  - name: order-snapshots
    partitions: 12
    replication-factor: 3
    config:
      cleanup.policy: compact # キーごとに最新メッセージのみ保持
      min.compaction.lag.ms: 86400000 # 最低24時間後にcompaction
      delete.retention.ms: 604800000 # 削除マーカー7日保持

  # Saga状態トピック
  - name: order-saga-events
    partitions: 12
    replication-factor: 3
    config:
      retention.ms: 2592000000 # 30日保存
      cleanup.policy: delete

Kafkaプロデューサー/コンシューマー実装(Spring Boot + Kotlin)

Kafkaをイベントストアとして活用するSpring Bootプロジェクトの基本設定とプロデューサー実装である。

// build.gradle.kts
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.kafka:spring-kafka")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
}

// application.yml
// spring:
//   kafka:
//     bootstrap-servers: kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092
//     producer:
//       acks: all
//       retries: 3
//       key-serializer: org.apache.kafka.common.serialization.StringSerializer
//       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
//       properties:
//         enable.idempotence: true
//         max.in.flight.requests.per.connection: 5
//     consumer:
//       group-id: order-projection
//       auto-offset-reset: earliest
//       enable-auto-commit: false
//       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
//       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
// EventPublisher.kt - Kafkaベースのイベント発行
@Component
class KafkaEventPublisher(
    private val kafkaTemplate: KafkaTemplate<String, DomainEvent>,
    private val objectMapper: ObjectMapper
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    /**
     * ドメインイベントをKafkaトピックに発行する。
     * Aggregate IDをキーとして使用し、パーティション内の順序を保証する。
     */
    suspend fun publish(event: DomainEvent): RecordMetadata {
        val topic = resolveTopicName(event)
        val key = event.aggregateId

        val record = ProducerRecord<String, DomainEvent>(topic, key, event).apply {
            headers().add("eventType", event.eventType.toByteArray())
            headers().add("eventVersion", event.version.toString().toByteArray())
            headers().add("correlationId", event.metadata.correlationId.toByteArray())
            headers().add("timestamp", Instant.now().toString().toByteArray())
        }

        return try {
            val result = kafkaTemplate.send(record).get()
            logger.info(
                "イベント発行完了: topic={}, partition={}, offset={}, key={}",
                result.recordMetadata.topic(),
                result.recordMetadata.partition(),
                result.recordMetadata.offset(),
                key
            )
            result.recordMetadata
        } catch (e: Exception) {
            logger.error("イベント発行失敗: topic={}, key={}, error={}", topic, key, e.message)
            throw EventPublishException("イベント発行に失敗しました: ${e.message}", e)
        }
    }

    private fun resolveTopicName(event: DomainEvent): String {
        return when (event) {
            is OrderEvent -> "order-events"
            is PaymentEvent -> "payment-events"
            is InventoryEvent -> "inventory-events"
            else -> throw IllegalArgumentException("不明なイベントタイプ: ${event::class}")
        }
    }
}

コンシューマグループベースのプロジェクション実装

// OrderProjectionConsumer.kt - 読み取りモデルプロジェクション
@Component
class OrderProjectionConsumer(
    private val orderReadRepository: OrderReadRepository,
    private val acknowledgment: Acknowledgment?  // 手動コミット
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    @KafkaListener(
        topics = ["order-events"],
        groupId = "order-read-projection",
        containerFactory = "kafkaListenerContainerFactory"
    )
    fun onOrderEvent(
        record: ConsumerRecord<String, DomainEvent>,
        ack: Acknowledgment
    ) {
        val event = record.value()
        val aggregateId = record.key()

        logger.debug(
            "イベント受信: type={}, aggregateId={}, offset={}",
            event.eventType, aggregateId, record.offset()
        )

        try {
            when (event) {
                is OrderCreated -> handleOrderCreated(event)
                is OrderItemAdded -> handleOrderItemAdded(event)
                is PaymentReceived -> handlePaymentReceived(event)
                is OrderShipped -> handleOrderShipped(event)
                is OrderCancelled -> handleOrderCancelled(event)
            }
            // プロジェクション成功後にオフセットを手動コミット
            ack.acknowledge()
        } catch (e: Exception) {
            logger.error(
                "プロジェクション失敗: type={}, aggregateId={}, error={}",
                event.eventType, aggregateId, e.message
            )
            // リトライロジックまたはDLQに送信
            throw e
        }
    }

    private fun handleOrderCreated(event: OrderCreated) {
        val orderView = OrderReadModel(
            orderId = event.aggregateId,
            customerId = event.customerId,
            status = OrderStatus.CREATED,
            totalAmount = BigDecimal.ZERO,
            items = mutableListOf(),
            createdAt = event.timestamp,
            updatedAt = event.timestamp
        )
        orderReadRepository.save(orderView)
    }

    private fun handlePaymentReceived(event: PaymentReceived) {
        orderReadRepository.findById(event.aggregateId)?.let { order ->
            order.status = OrderStatus.PAID
            order.paymentMethod = event.method
            order.paidAmount = event.amount
            order.updatedAt = event.timestamp
            orderReadRepository.save(order)
        }
    }

    // 残りのハンドラー実装は省略
}

AggregateとCommand Handlerの実装

ドメインイベント定義(Kotlin)

// DomainEvent.kt - ドメインイベント基底クラス
sealed class DomainEvent(
    open val aggregateId: String,
    open val eventType: String,
    open val version: Int,
    open val timestamp: Instant,
    open val metadata: EventMetadata
)

data class EventMetadata(
    val correlationId: String = UUID.randomUUID().toString(),
    val causationId: String = "",
    val userId: String = ""
)

// OrderEvents.kt - 注文ドメインイベント
sealed class OrderEvent : DomainEvent()

data class OrderCreated(
    override val aggregateId: String,
    val customerId: String,
    val items: List<OrderItemData>,
    override val version: Int = 1,
    override val timestamp: Instant = Instant.now(),
    override val metadata: EventMetadata = EventMetadata()
) : OrderEvent() {
    override val eventType: String = "OrderCreated"
}

data class PaymentReceived(
    override val aggregateId: String,
    val amount: BigDecimal,
    val method: String,
    val transactionId: String,
    override val version: Int = 1,
    override val timestamp: Instant = Instant.now(),
    override val metadata: EventMetadata = EventMetadata()
) : OrderEvent() {
    override val eventType: String = "PaymentReceived"
}

data class OrderShipped(
    override val aggregateId: String,
    val trackingNumber: String,
    val carrier: String,
    override val version: Int = 1,
    override val timestamp: Instant = Instant.now(),
    override val metadata: EventMetadata = EventMetadata()
) : OrderEvent() {
    override val eventType: String = "OrderShipped"
}

data class OrderCancelled(
    override val aggregateId: String,
    val reason: String,
    val cancelledBy: String,
    override val version: Int = 1,
    override val timestamp: Instant = Instant.now(),
    override val metadata: EventMetadata = EventMetadata()
) : OrderEvent() {
    override val eventType: String = "OrderCancelled"
}

Order Aggregateの実装

// OrderAggregate.kt - Event SourcingベースのAggregate
class OrderAggregate private constructor() {
    lateinit var orderId: String
        private set
    lateinit var customerId: String
        private set
    var status: OrderStatus = OrderStatus.DRAFT
        private set
    var totalAmount: BigDecimal = BigDecimal.ZERO
        private set
    val items: MutableList<OrderItem> = mutableListOf()
    var version: Long = 0
        private set

    // 未コミットイベントリスト
    private val uncommittedEvents: MutableList<DomainEvent> = mutableListOf()

    companion object {
        /**
         * イベントストリームからAggregate状態を復元する。
         */
        fun rehydrate(events: List<DomainEvent>): OrderAggregate {
            val aggregate = OrderAggregate()
            events.forEach { event -> aggregate.apply(event) }
            return aggregate
        }
    }

    // ──────────── Command Handlers ────────────

    fun createOrder(command: CreateOrderCommand): OrderCreated {
        require(status == OrderStatus.DRAFT) {
            "すでに作成された注文です: orderId=$orderId"
        }
        val event = OrderCreated(
            aggregateId = command.orderId,
            customerId = command.customerId,
            items = command.items,
            metadata = EventMetadata(
                correlationId = command.correlationId,
                userId = command.userId
            )
        )
        applyAndRecord(event)
        return event
    }

    fun receivePayment(command: ReceivePaymentCommand): PaymentReceived {
        require(status == OrderStatus.CREATED) {
            "決済可能な状態ではありません: status=$status"
        }
        require(command.amount >= totalAmount) {
            "決済金額が不足しています: required=$totalAmount, received=${command.amount}"
        }
        val event = PaymentReceived(
            aggregateId = orderId,
            amount = command.amount,
            method = command.method,
            transactionId = command.transactionId
        )
        applyAndRecord(event)
        return event
    }

    fun cancelOrder(command: CancelOrderCommand): OrderCancelled {
        require(status != OrderStatus.SHIPPED && status != OrderStatus.CANCELLED) {
            "キャンセルできない状態です: status=$status"
        }
        val event = OrderCancelled(
            aggregateId = orderId,
            reason = command.reason,
            cancelledBy = command.cancelledBy
        )
        applyAndRecord(event)
        return event
    }

    // ──────────── Event Handlers(状態変更) ────────────

    private fun apply(event: DomainEvent) {
        when (event) {
            is OrderCreated -> {
                orderId = event.aggregateId
                customerId = event.customerId
                status = OrderStatus.CREATED
                totalAmount = event.items.sumOf { it.price * it.quantity.toBigDecimal() }
                items.addAll(event.items.map { OrderItem(it.productId, it.quantity, it.price) })
            }
            is PaymentReceived -> {
                status = OrderStatus.PAID
            }
            is OrderShipped -> {
                status = OrderStatus.SHIPPED
            }
            is OrderCancelled -> {
                status = OrderStatus.CANCELLED
            }
            else -> { /* 不明なイベントは無視 */ }
        }
        version++
    }

    private fun applyAndRecord(event: DomainEvent) {
        apply(event)
        uncommittedEvents.add(event)
    }

    fun getUncommittedEvents(): List<DomainEvent> = uncommittedEvents.toList()

    fun markEventsAsCommitted() {
        uncommittedEvents.clear()
    }
}

enum class OrderStatus {
    DRAFT, CREATED, PAID, SHIPPED, DELIVERED, CANCELLED
}

Command Handlerサービス

// OrderCommandHandler.java - コマンド処理サービス(Java例)
@Service
@Transactional
public class OrderCommandHandler {

    private final KafkaEventPublisher eventPublisher;
    private final OrderEventStore eventStore;
    private final SnapshotStore snapshotStore;

    private static final int SNAPSHOT_INTERVAL = 50;

    public OrderCommandHandler(
            KafkaEventPublisher eventPublisher,
            OrderEventStore eventStore,
            SnapshotStore snapshotStore) {
        this.eventPublisher = eventPublisher;
        this.eventStore = eventStore;
        this.snapshotStore = snapshotStore;
    }

    public String handleCreateOrder(CreateOrderCommand command) {
        // 1. 新しいAggregateを生成
        OrderAggregate aggregate = OrderAggregate.rehydrate(Collections.emptyList());

        // 2. コマンド実行(ドメイン不変条件の検証を含む)
        OrderCreated event = aggregate.createOrder(command);

        // 3. イベントをKafkaに発行
        eventPublisher.publish(event);

        // 4. イベントコミット完了をマーク
        aggregate.markEventsAsCommitted();

        return event.getAggregateId();
    }

    public void handleReceivePayment(ReceivePaymentCommand command) {
        // 1. イベントストアからAggregateを復元
        OrderAggregate aggregate = loadAggregate(command.getOrderId());

        // 2. コマンド実行
        PaymentReceived event = aggregate.receivePayment(command);

        // 3. イベント発行
        eventPublisher.publish(event);

        // 4. スナップショット条件の確認
        if (aggregate.getVersion() % SNAPSHOT_INTERVAL == 0) {
            snapshotStore.saveSnapshot(aggregate);
        }

        aggregate.markEventsAsCommitted();
    }

    private OrderAggregate loadAggregate(String aggregateId) {
        // スナップショットがあればスナップショットからロード
        Optional<Snapshot> snapshot = snapshotStore.findLatest(aggregateId);
        long fromVersion = snapshot.map(Snapshot::getVersion).orElse(0L);

        // スナップショット以降のイベントのみロード
        List<DomainEvent> events = eventStore.loadEvents(aggregateId, fromVersion);

        if (snapshot.isPresent()) {
            OrderAggregate aggregate = snapshot.get().toAggregate();
            events.forEach(aggregate::apply);
            return aggregate;
        }

        return OrderAggregate.rehydrate(events);
    }
}

Sagaパターンで分散トランザクションを管理

マイクロサービス環境において、注文処理は注文サービス、決済サービス、在庫サービス、配送サービスなど複数のサービスにまたがる分散トランザクションを必要とする。従来の2PC(Two-Phase Commit)はパフォーマンス低下と可用性の問題からマイクロサービスには適していない。Sagaパターンは各サービスのローカルトランザクションを順次実行し、失敗時に補償トランザクション(Compensating Transaction)を逆順に実行して整合性を維持する。

Orchestration vs Choreography

Saga実装方式には2つのアプローチがある。

**Choreography(コレオグラフィー)**方式は中央の調整者なしに、各サービスがイベントを発行し、他のサービスがイベントを購読して自身のタスクを実行する。サービス間の結合度は低いが、Saga全体のフローを把握しにくく、複雑なシナリオでのデバッグが困難である。

**Orchestration(オーケストレーション)**方式は中央のSaga Orchestratorが各ステップの実行と補償を調整する。Saga全体のフローが一箇所に集中するため可視性が高く、複雑なビジネスロジックの管理が容易である。

比較項目ChoreographyOrchestration
結合度低い(イベントベース)中程度(オーケストレーター依存)
可視性低い(フロー追跡が困難)高い(中央集中制御)
複雑度管理シンプルなSagaに適している複雑なSagaに適している
単一障害点なしオーケストレーターがSPOFになり得る
デバッグイベントチェーンの追跡が必要オーケストレーターのログで容易に追跡
テスト容易性統合テストが複雑オーケストレーターの単体テストが可能
適したサービス数2-4個4個以上

KafkaベースのSaga Orchestrator実装

// OrderSagaOrchestrator.kt - 注文処理Saga
@Component
class OrderSagaOrchestrator(
    private val kafkaTemplate: KafkaTemplate<String, SagaCommand>,
    private val sagaStateStore: SagaStateStore
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    /**
     * Saga開始:注文作成イベントを受信するとSagaを開始する。
     */
    @KafkaListener(topics = ["order-events"], groupId = "order-saga-orchestrator")
    fun onOrderCreated(record: ConsumerRecord<String, DomainEvent>, ack: Acknowledgment) {
        val event = record.value()
        if (event !is OrderCreated) return

        val sagaId = UUID.randomUUID().toString()
        val sagaState = SagaState(
            sagaId = sagaId,
            orderId = event.aggregateId,
            currentStep = SagaStep.RESERVE_INVENTORY,
            status = SagaStatus.STARTED,
            startedAt = Instant.now()
        )
        sagaStateStore.save(sagaState)

        // Step 1:在庫予約コマンド発行
        val reserveCommand = ReserveInventoryCommand(
            sagaId = sagaId,
            orderId = event.aggregateId,
            items = event.items
        )
        kafkaTemplate.send("inventory-commands", event.aggregateId, reserveCommand)
        logger.info("Saga開始: sagaId={}, orderId={}, step=RESERVE_INVENTORY", sagaId, event.aggregateId)
        ack.acknowledge()
    }

    /**
     * Step 2:在庫予約成功時に決済要求
     */
    @KafkaListener(topics = ["inventory-events"], groupId = "order-saga-orchestrator")
    fun onInventoryReserved(record: ConsumerRecord<String, DomainEvent>, ack: Acknowledgment) {
        val event = record.value()
        if (event !is InventoryReserved) return

        val sagaState = sagaStateStore.findBySagaId(event.sagaId) ?: return
        sagaState.currentStep = SagaStep.PROCESS_PAYMENT
        sagaStateStore.save(sagaState)

        val paymentCommand = ProcessPaymentCommand(
            sagaId = event.sagaId,
            orderId = sagaState.orderId,
            amount = sagaState.totalAmount
        )
        kafkaTemplate.send("payment-commands", sagaState.orderId, paymentCommand)
        logger.info("Saga進行: sagaId={}, step=PROCESS_PAYMENT", event.sagaId)
        ack.acknowledge()
    }

    /**
     * 補償トランザクション:決済失敗時に在庫予約をキャンセル
     */
    @KafkaListener(topics = ["payment-events"], groupId = "order-saga-orchestrator")
    fun onPaymentFailed(record: ConsumerRecord<String, DomainEvent>, ack: Acknowledgment) {
        val event = record.value()
        if (event !is PaymentFailed) return

        val sagaState = sagaStateStore.findBySagaId(event.sagaId) ?: return
        sagaState.status = SagaStatus.COMPENSATING
        sagaStateStore.save(sagaState)

        // 補償:在庫予約キャンセル
        val cancelReservation = CancelInventoryReservationCommand(
            sagaId = event.sagaId,
            orderId = sagaState.orderId
        )
        kafkaTemplate.send("inventory-commands", sagaState.orderId, cancelReservation)

        // 補償:注文キャンセル
        val cancelOrder = CancelOrderCommand(
            orderId = sagaState.orderId,
            reason = "決済失敗: ${event.failureReason}",
            cancelledBy = "SAGA_ORCHESTRATOR"
        )
        kafkaTemplate.send("order-commands", sagaState.orderId, cancelOrder)

        logger.warn("Saga補償実行: sagaId={}, reason={}", event.sagaId, event.failureReason)
        ack.acknowledge()
    }
}

Read Modelプロジェクションと同期

Materialized Viewパターン

CQRSの読み取り側はイベントを消費してMaterialized View(マテリアライズドビュー)を構築する。このプロジェクションは特定のクエリパターンに最適化された非正規化データ構造である。1つのイベントストリームから複数のプロジェクションを同時に維持でき、各プロジェクションは独自のコンシューマグループとストレージを持つ。

注文イベントストリームから生成できるプロジェクションの例は以下の通りである。

  • 注文詳細ビュー:MongoDBに注文のすべての情報を非正規化して保存。注文詳細画面で単一クエリですべてのデータを返却
  • 顧客別注文リストビュー:Redis Sorted Setで顧客別注文リストを最新順に維持。マイページの注文履歴に最適化
  • 検索インデックス:Elasticsearchに注文データをインデキシング。複合条件検索と全文検索をサポート
  • 分析ビュー:ClickHouseのようなOLAPデータベースに注文統計を蓄積。売上分析ダッシュボード用

プロジェクション同期とEventual Consistency

CQRSにおいて読み取りモデルは書き込みモデルと結果整合性(Eventually Consistent)を維持する。イベントが発行されてプロジェクションが更新されるまでの時間差が存在する。この遅延は通常ミリ秒から秒単位だが、コンシューマラグ(Consumer Lag)が発生するとさらに大きくなりうる。

Eventual Consistencyに対処する戦略は以下の通りである。

  • Read-your-writes保証:書き込み後の読み取りリクエストにイベントバージョンを含め、読み取り側で当該バージョン以上のプロジェクションが完了するまで待機するか、書き込み側のデータを直接返却
  • UI楽観的更新(Optimistic Update):クライアントでコマンド送信後、サーバーの応答を待たずにUIを先行更新。その後プロジェクション結果でUIを同期
  • Polling/WebSocket:プロジェクション更新完了時にクライアントに通知

アーキテクチャ比較:CQRS vs 従来型CRUD vs Event-Driven

各アーキテクチャパターンの特性を詳細に比較する。プロジェクトの要件とチームの能力に合ったパターンを選択すべきである。

比較項目従来型CRUDEvent-Driven(イベント駆動)CQRS + Event Sourcing
データ保存現在の状態のみ保存現在の状態 + 一部イベントログ全イベント履歴(不変ログ)
読み取り/書き込みモデル単一モデルを共有単一または分離完全分離
監査証跡(Audit)別途実装が必要部分的にサポートネイティブサポート(イベント自体が監査ログ)
スケーリング主に垂直スケーリング水平スケーリング可能読み取り/書き込み独立した水平スケーリング
状態復元不可(現在の状態のみ)部分的に可能任意の時点の状態を完全復元可能
実装の複雑度低い中程度高い
学習曲線低い中程度高い
デバッグ直感的(DBクエリ)イベント追跡が必要イベントリプレイで状態追跡
Eventual Consistency該当なし(即時整合性)発生しうる読み取りモデルで常に発生
適したドメインシンプルなCRUD、社内管理システムサービス間非同期通信金融、EC、物流、監査が必要なドメイン
ストレージコスト低い中程度高い(全イベントを保管)
トランザクション管理ACID(ローカルトランザクション)Eventually ConsistentSagaパターンが必要

いつCQRS + Event Sourcingを導入すべきか

CQRS + Event Sourcingは強力だが、すべてのシステムに必要なわけではない。導入を検討すべき状況は以下の通りである。

  • 状態変更の全履歴保存が法的/ビジネス的に要求される場合(金融取引、医療記録など)
  • 読み取りと書き込みのトラフィックパターンが極端に異なる場合(読み取りが書き込みの100倍以上)
  • 複数のサービスが同じイベントを基に異なるビューを生成する必要がある場合
  • タイムトラベル(Time Travel)クエリが必要な場合(特定時点の状態照会)
  • テンポラルクエリがコアビジネス機能である場合(過去の価格履歴、在庫変動履歴)

逆に、シンプルなCRUD中心のアプリケーションや、チームにEvent Sourcing経験がなくスケジュールが厳しい場合は、従来型アーキテクチャがより現実的な選択である。

運用上の注意事項

Eventual Consistencyへの対処

CQRSシステムにおいてEventual Consistencyは本質的な特性である。以下の状況に備える必要がある。

  • 読み取りモデルの同期遅延:書き込み完了後、読み取りモデルが即座に更新されない可能性がある。コンシューマグループのConsumer Lagモニタリングは必須である
  • ユーザーの混乱防止:注文後すぐに注文一覧を照会した際に直前の注文が表示されなければ、ユーザーからのクレームが発生する。Read-your-writes保証またはUI楽観的更新を必ず実装すべきである
  • データ不整合の検知:定期的にイベントストアと読み取りモデル間の整合性検証バッチを実行する必要がある

冪等性(Idempotency)の保証

ネットワーク障害、コンシューマリバランシング、Kafkaのat-least-once配信保証などにより、同一イベントが重複配信される可能性がある。プロジェクションハンドラーは必ず冪等に実装する必要がある。

冪等性保証の戦略は以下の通りである。

  • イベントIDベースの重複防止:処理済みイベントIDを別テーブルに記録し、重複イベント受信時は無視する
  • Upsertパターン:読み取りモデル更新時にINSERT OR UPDATE(UPSERT)を使用して、同一イベントを複数回適用しても結果が同じになるようにする
  • オフセットベースの検証:コンシューマが最後に処理したオフセットをストレージに一緒に記録し、以前のオフセットのイベントはスキップする

イベントバージョン管理(Event Versioning)

イベントスキーマは時間の経過とともに必ず変更される。不変イベントログでスキーマを変更するには、既存のイベントを修正するのではなく、互換性のある方法で進化させる必要がある。

  • 後方互換(Backward Compatible)変更:新しいフィールドをオプショナルとして追加するのは安全である。以前のイベントにはそのフィールドがないため、デフォルト値を使用する
  • Upcasterパターン:以前のバージョンのイベントを最新バージョンに変換するUpcasterを登録する。イベントロード時に自動的に変換が適用される
  • 新しいイベントタイプの追加:既存イベントタイプの意味が大きく変わる場合、新しいイベントタイプを定義してコンシューマで両タイプを処理する
  • バージョンフィールドの必須化:すべてのイベントにversionフィールドを含めて、コンシューマがイベントバージョンに応じて分岐処理できるようにする

Kafka運用関連の注意事項

  • retention.ms設定:イベントストアトピックはretention.ms=-1(無期限保存)に設定する必要がある。デフォルト値(7日)が適用されるとイベントが削除され、状態復元が不可能になる
  • パーティション数の決定:パーティション数は最大コンシューマ並列度を決定する。一度増やしたパーティションは減らせないため、慎重に決定する必要がある。Aggregate IDベースのパーティショニングでパーティション数を変更すると、同じAggregateのイベントが異なるパーティションに分散され、順序保証が破綻する可能性がある
  • min.insync.replicas:プロダクションでは必ず2以上に設定する。acks=allと併用してデータ損失を防止する
  • Consumer Lagモニタリングkafka-consumer-groups.shまたはBurrowなどのツールでコンシューマラグを継続的にモニタリングする。ラグが継続的に増加する場合、コンシューマの処理能力を拡張する必要がある
# Consumer Lagモニタリングスクリプト
#!/bin/bash

BOOTSTRAP_SERVERS="kafka-broker-1:9092,kafka-broker-2:9092"
CONSUMER_GROUPS=("order-read-projection" "order-saga-orchestrator" "search-indexer")
ALERT_THRESHOLD=10000

for GROUP in "${CONSUMER_GROUPS[@]}"; do
    echo "=== Consumer Group: $GROUP ==="

    # コンシューマグループ状態照会
    kafka-consumer-groups.sh \
        --bootstrap-server "$BOOTSTRAP_SERVERS" \
        --describe \
        --group "$GROUP" 2>/dev/null

    # 総Lag計算
    TOTAL_LAG=$(kafka-consumer-groups.sh \
        --bootstrap-server "$BOOTSTRAP_SERVERS" \
        --describe \
        --group "$GROUP" 2>/dev/null \
        | tail -n +3 \
        | awk '{sum += $6} END {print sum}')

    echo "Total Lag: $TOTAL_LAG"

    # しきい値超過時にアラート
    if [ "$TOTAL_LAG" -gt "$ALERT_THRESHOLD" ]; then
        echo "[ALERT] Consumer group $GROUP lag ($TOTAL_LAG) exceeds threshold ($ALERT_THRESHOLD)"
        # SlackウェブフックまたはPagerDuty呼び出し
        curl -s -X POST "$SLACK_WEBHOOK_URL" \
            -H 'Content-Type: application/json' \
            -d "{\"text\":\"[ALERT] Kafka Consumer Lagしきい値超過\\nGroup: $GROUP\\nLag: $TOTAL_LAG\\nThreshold: $ALERT_THRESHOLD\"}"
    fi

    echo ""
done

障害事例と復旧手順

事例1:プロジェクションバグによる読み取りモデルの汚染

プロジェクションハンドラーのバグにより読み取りモデルが誤って更新されるのは、CQRSシステムで最も一般的な障害シナリオである。Event Sourcingの核心的な利点は、この場合にイベントストリームから読み取りモデルを完全に再構築できることである。

復旧手順は以下の通りである。

  1. 問題のあるプロジェクションコンシューマを停止する
  2. プロジェクションハンドラーのバグを修正してデプロイする
  3. 読み取りモデルストレージ(MongoDB、Elasticsearchなど)の該当インデックス/コレクションを削除する
  4. コンシューマグループのオフセットをearliestにリセットする
  5. 修正されたプロジェクションコンシューマを再起動して、イベントを最初からリプレイする
# コンシューマグループオフセットリセット(注意:コンシューマが停止した状態でのみ実行)
# --to-earliest:トピックの最初からリプレイ
kafka-consumer-groups.sh \
    --bootstrap-server kafka-broker-1:9092 \
    --group order-read-projection \
    --topic order-events \
    --reset-offsets \
    --to-earliest \
    --execute

# 実行結果の確認
kafka-consumer-groups.sh \
    --bootstrap-server kafka-broker-1:9092 \
    --describe \
    --group order-read-projection

イベント数が数百万件以上の場合、フルリプレイにはかなりの時間がかかる。この期間中、読み取りモデルは最新状態と差異が生じるため、リプレイの進行状況をモニタリングし、ユーザーにデータ更新中であることを案内する必要がある。

事例2:スナップショット破損(Snapshot Corruption)

スナップショットストレージに破損したスナップショットが保存されると、Aggregateロード時に誤った状態からイベントリプレイが始まり、ビジネスロジックエラーが発生する。

症状:特定のAggregate IDに対するコマンドが予想と異なる不変条件検証エラーを発生させる。例えば、すでに決済済みの注文に対して「決済可能な状態ではありません」エラーの代わりに「すでに作成された注文です」エラーが発生する。

復旧手順は以下の通りである。

  1. 該当Aggregateのスナップショットを削除する
  2. イベントストアから該当Aggregateの全イベントを最初からリプレイして状態を復元する
  3. 正常な状態が確認できたら新しいスナップショットを生成する
  4. スナップショット生成ロジックのバグを調査して修正する

事例3:Kafkaパーティションリバランシング中のイベント順序逆転

コンシューマグループのリバランシングが発生すると、特定パーティションの所有者が変更される。手動コミットモードで処理完了済みイベントのオフセットがコミットされる前にリバランシングが発生すると、新しいコンシューマがすでに処理されたイベントを再度受信する可能性がある。

対応戦略は以下の通りである。

  • プロジェクションハンドラーを冪等に実装して、重複イベント処理でも最終状態が同一になるよう保証する
  • ConsumerRebalanceListenerを実装して、リバランシング時に処理中のイベントのオフセットを即座にコミットする
  • max.poll.interval.msを適切に設定して、イベント処理時間が長い場合の不必要なリバランシングを防止する

事例4:イベントスキーマ変更時の後方互換性違反

イベントの必須フィールドを削除したり型を変更したりすると、既存イベントのデシリアライズが失敗する。イベントログは不変であるため、既存イベントのスキーマを変更することはできない。

予防原則は以下の通りである。

  • フィールドの追加は常にOptional(オプショナル)にする
  • フィールド削除の代わりにdeprecated処理し、コンシューマで無視するようにする
  • Schema Registry(Confluent Schema Registry)を導入して互換性を自動検証する
  • CI/CDパイプラインでイベントスキーマ互換性テストを必須ゲートに設定する

チェックリスト

CQRS + Event Sourcing + Kafkaアーキテクチャをプロダクションに導入する際に点検すべき項目である。

設計段階

  • Aggregate境界がビジネス不変条件を基準に正しく設定されているか
  • イベント名が過去形動詞でビジネスの意図を明確に表現しているか
  • イベントスキーマにバージョンフィールドが含まれているか
  • Sagaが必要な分散トランザクションが特定されているか
  • 各Sagaの補償トランザクションがすべてのステップについて定義されているか
  • 読み取りモデルのクエリパターンが事前定義されているか

Kafka設定

  • イベントストアトピックのretention.ms-1(無期限)に設定されているか
  • min.insync.replicasが2以上で、acks=allが設定されているか
  • プロデューサーのenable.idempotence=trueが設定されているか
  • パーティション数が将来の拡張を考慮して十分に設定されているか
  • Aggregate IDをメッセージキーとして使用してパーティション内の順序を保証しているか

プロジェクションとコンシューマ

  • プロジェクションハンドラーが冪等に実装されているか
  • 手動オフセットコミットが設定されているか(enable.auto.commit=false
  • DLQ(Dead Letter Queue)が設定されて失敗したイベントを隔離しているか
  • Consumer Lagモニタリングとアラートが構成されているか
  • フルリプレイ(Full Replay)手順がドキュメント化されテストされているか

運用

  • イベントスキーマ互換性検証がCI/CDに含まれているか
  • スナップショット戦略(間隔、ストレージ)が決定されているか
  • Eventual Consistencyに対するユーザーエクスペリエンス対策が講じられているか(UI楽観的更新など)
  • Kafkaクラスターのディスク容量計画が策定されているか(無期限保存を考慮)
  • 障害復旧ランブック(Runbook)が作成されているか

参考資料