Skip to content
Published on

Event-Driven Architecture + CQRS + Event Sourcing 実践実装:Kafka/RabbitMQ基盤の分散システム設計

Authors
Event-Driven Architecture with CQRS and Event Sourcing

はじめに

マイクロサービスアーキテクチャが一般化するにつれ、サービス間通信方式はシステムのスケーラビリティとレジリエンスを決定づける核心要素となった。同期REST呼び出しベースのアーキテクチャはサービス間の強い結合を生み、一つのサービス障害がシステム全体に伝播するカスケード障害を引き起こす。

Event-Driven Architecture(EDA)はこの問題を解決する代表的なパターンだ。サービスはイベントを発行(publish)し、関心のあるサービスがそれを購読(subscribe)して非同期で処理する。ここにCQRS(Command Query Responsibility Segregation)とEvent Sourcingを組み合わせると、読み取り/書き込みワークロードを独立してスケーリングし、すべての状態変更の監査証跡(audit trail)を自然に確保できる。

本記事ではEDAのコアパターンからCQRSとEvent Sourcingの実装、メッセージブローカー比較、Sagaパターン、そしてプロダクションで経験する障害事例と対応方法をコードレベルで扱う。

Event-Driven Architecture コアパターン

イベントの種類

EDAにおけるイベントは大きく3つに分類される。

種類説明特徴
Domain Eventビジネスドメインで発生した意味のある事実OrderPlaced, PaymentCompleted過去形命名、不変
Integration Eventサービス境界を越えて伝播するイベントOrderShipped(物流サービスへ)サービス間の契約
Notification Event単純な通知(データ最小化)OrderStatusChanged受信者が詳細データを別途取得

EDA通信パターン比較

パターン結合度配信保証順序保証ユースケース
Pub/SubAt-least-once保証なし通知、キャッシュ無効化
Event StreamingAt-least-onceパーティション内保証ログ集約、リアルタイム分析
Event Sourcing自己参照永続保存アグリゲート内保証監査証跡、状態復元
Request-Reply同期リクエスト-レスポンスペア同期的な確認が必要な場合

TypeScriptイベント基本構造

// ドメインイベント基本インターフェース
interface DomainEvent {
  readonly eventId: string
  readonly eventType: string
  readonly aggregateId: string
  readonly aggregateType: string
  readonly version: number
  readonly timestamp: Date
  readonly metadata: EventMetadata
  readonly payload: Record<string, unknown>
}

interface EventMetadata {
  readonly correlationId: string
  readonly causationId: string
  readonly userId?: string
  readonly traceId?: string
}

// 具体的なイベント例
class OrderPlacedEvent implements DomainEvent {
  readonly eventType = 'OrderPlaced'
  readonly aggregateType = 'Order'

  constructor(
    public readonly eventId: string,
    public readonly aggregateId: string,
    public readonly version: number,
    public readonly timestamp: Date,
    public readonly metadata: EventMetadata,
    public readonly payload: {
      customerId: string
      items: Array<{ productId: string; quantity: number; price: number }>
      totalAmount: number
      currency: string
    }
  ) {}
}

CQRS(Command Query Responsibility Segregation)深掘り

CQRSのコア原則

CQRSは読み取り(Query)モデルと書き込み(Command)モデルを完全に分離するパターンだ。従来のCRUDモデルでは同一のデータモデルで読み取りと書き込みを両方処理するが、CQRSではそれぞれの目的に最適化された別々のモデルを使用する。

// Command側 - 書き込みモデル
interface Command {
  readonly commandId: string
  readonly commandType: string
  readonly timestamp: Date
}

class PlaceOrderCommand implements Command {
  readonly commandType = 'PlaceOrder'

  constructor(
    public readonly commandId: string,
    public readonly timestamp: Date,
    public readonly customerId: string,
    public readonly items: Array<{
      productId: string
      quantity: number
      price: number
    }>,
    public readonly shippingAddress: string
  ) {}
}

// Command Handler
class PlaceOrderHandler {
  constructor(
    private readonly orderRepository: OrderRepository,
    private readonly eventBus: EventBus
  ) {}

  async handle(command: PlaceOrderCommand): Promise<string> {
    // ビジネス検証
    await this.validateInventory(command.items)
    await this.validateCustomerCredit(command.customerId)

    // アグリゲート生成とイベント発行
    const order = Order.create(command.customerId, command.items, command.shippingAddress)

    await this.orderRepository.save(order)

    // ドメインイベント発行
    for (const event of order.getUncommittedEvents()) {
      await this.eventBus.publish(event)
    }

    return order.id
  }

  private async validateInventory(
    items: Array<{ productId: string; quantity: number; price: number }>
  ): Promise<void> {
    // 在庫確認ロジック
  }

  private async validateCustomerCredit(customerId: string): Promise<void> {
    // 顧客与信確認ロジック
  }
}

Query側 - 読み取りモデル

// Query側 - 読み取りに最適化された非正規化モデル
interface OrderSummaryReadModel {
  orderId: string
  customerName: string
  customerEmail: string
  orderDate: Date
  status: string
  totalAmount: number
  itemCount: number
  lastUpdated: Date
}

// Query Handler
class GetOrderSummaryHandler {
  constructor(private readonly readDb: ReadDatabase) {}

  async handle(orderId: string): Promise<OrderSummaryReadModel | null> {
    // 読み取り専用DBから非正規化データを直接取得
    return this.readDb.query('SELECT * FROM order_summaries WHERE order_id = ?', [orderId])
  }
}

// Projection - イベントを読み取りモデルに変換
class OrderSummaryProjection {
  constructor(private readonly readDb: ReadDatabase) {}

  async handleOrderPlaced(event: OrderPlacedEvent): Promise<void> {
    await this.readDb.execute(
      `INSERT INTO order_summaries (order_id, customer_name, order_date, status, total_amount, item_count, last_updated)
       VALUES (?, ?, ?, 'PLACED', ?, ?, ?)`,
      [
        event.aggregateId,
        event.payload.customerId,
        event.timestamp,
        event.payload.totalAmount,
        event.payload.items.length,
        new Date(),
      ]
    )
  }

  async handleOrderShipped(event: DomainEvent): Promise<void> {
    await this.readDb.execute(
      `UPDATE order_summaries SET status = 'SHIPPED', last_updated = ? WHERE order_id = ?`,
      [new Date(), event.aggregateId]
    )
  }
}

Event Sourcing実装

Event Store設計

Event Sourcingでは状態を直接保存せず、状態変更イベントのシーケンスを保存する。現在の状態はイベントを順番にリプレイ(replay)して復元する。

// Event Storeインターフェース
interface EventStore {
  append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>
  getEvents(aggregateId: string, fromVersion?: number): Promise<DomainEvent[]>
  getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]>
}

// PostgreSQLベースのEvent Store実装
class PostgresEventStore implements EventStore {
  constructor(private readonly pool: Pool) {}

  async append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {
    const client = await this.pool.connect()
    try {
      await client.query('BEGIN')

      // 楽観的同時実行制御(Optimistic Concurrency Control)
      const result = await client.query(
        'SELECT MAX(version) as current_version FROM events WHERE aggregate_id = $1',
        [aggregateId]
      )
      const currentVersion = result.rows[0]?.current_version ?? 0

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, but current version is ${currentVersion}`
        )
      }

      // イベントバッチ挿入
      for (const event of events) {
        await client.query(
          `INSERT INTO events (event_id, aggregate_id, aggregate_type, event_type, version, payload, metadata, timestamp)
           VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
          [
            event.eventId,
            event.aggregateId,
            event.aggregateType,
            event.eventType,
            event.version,
            JSON.stringify(event.payload),
            JSON.stringify(event.metadata),
            event.timestamp,
          ]
        )
      }

      await client.query('COMMIT')
    } catch (error) {
      await client.query('ROLLBACK')
      throw error
    } finally {
      client.release()
    }
  }

  async getEvents(aggregateId: string, fromVersion: number = 0): Promise<DomainEvent[]> {
    const result = await this.pool.query(
      'SELECT * FROM events WHERE aggregate_id = $1 AND version > $2 ORDER BY version ASC',
      [aggregateId, fromVersion]
    )
    return result.rows.map(this.deserializeEvent)
  }

  async getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]> {
    const query = fromTimestamp
      ? 'SELECT * FROM events WHERE event_type = $1 AND timestamp > $2 ORDER BY timestamp ASC'
      : 'SELECT * FROM events WHERE event_type = $1 ORDER BY timestamp ASC'
    const params = fromTimestamp ? [eventType, fromTimestamp] : [eventType]
    const result = await this.pool.query(query, params)
    return result.rows.map(this.deserializeEvent)
  }

  private deserializeEvent(row: any): DomainEvent {
    return {
      eventId: row.event_id,
      eventType: row.event_type,
      aggregateId: row.aggregate_id,
      aggregateType: row.aggregate_type,
      version: row.version,
      timestamp: row.timestamp,
      metadata: JSON.parse(row.metadata),
      payload: JSON.parse(row.payload),
    }
  }
}

アグリゲートとイベントリプレイ

// Event Sourcedアグリゲート
abstract class EventSourcedAggregate {
  private uncommittedEvents: DomainEvent[] = []
  protected version: number = 0

  abstract get id(): string

  protected apply(event: DomainEvent): void {
    this.when(event)
    this.version = event.version
    this.uncommittedEvents.push(event)
  }

  protected abstract when(event: DomainEvent): void

  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents]
  }

  clearUncommittedEvents(): void {
    this.uncommittedEvents = []
  }

  loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.when(event)
      this.version = event.version
    }
  }
}

// Orderアグリゲート
class Order extends EventSourcedAggregate {
  private _id: string = ''
  private _customerId: string = ''
  private _status: OrderStatus = OrderStatus.DRAFT
  private _items: OrderItem[] = []
  private _totalAmount: number = 0

  get id(): string {
    return this._id
  }

  static create(
    customerId: string,
    items: Array<{ productId: string; quantity: number; price: number }>,
    shippingAddress: string
  ): Order {
    const order = new Order()
    const orderId = generateUUID()
    const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0)

    order.apply({
      eventId: generateUUID(),
      eventType: 'OrderPlaced',
      aggregateId: orderId,
      aggregateType: 'Order',
      version: 1,
      timestamp: new Date(),
      metadata: { correlationId: generateUUID(), causationId: generateUUID() },
      payload: { customerId, items, totalAmount, shippingAddress, currency: 'KRW' },
    })

    return order
  }

  confirm(): void {
    if (this._status !== OrderStatus.PLACED) {
      throw new Error('Order can only be confirmed when in PLACED status')
    }

    this.apply({
      eventId: generateUUID(),
      eventType: 'OrderConfirmed',
      aggregateId: this._id,
      aggregateType: 'Order',
      version: this.version + 1,
      timestamp: new Date(),
      metadata: { correlationId: generateUUID(), causationId: generateUUID() },
      payload: { confirmedAt: new Date().toISOString() },
    })
  }

  protected when(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderPlaced':
        this._id = event.aggregateId
        this._customerId = event.payload.customerId as string
        this._status = OrderStatus.PLACED
        this._items = event.payload.items as OrderItem[]
        this._totalAmount = event.payload.totalAmount as number
        break
      case 'OrderConfirmed':
        this._status = OrderStatus.CONFIRMED
        break
      case 'OrderShipped':
        this._status = OrderStatus.SHIPPED
        break
      case 'OrderCancelled':
        this._status = OrderStatus.CANCELLED
        break
    }
  }
}

enum OrderStatus {
  DRAFT = 'DRAFT',
  PLACED = 'PLACED',
  CONFIRMED = 'CONFIRMED',
  SHIPPED = 'SHIPPED',
  CANCELLED = 'CANCELLED',
}

Python Event Store実装

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Protocol
from uuid import uuid4
import json
import asyncpg


@dataclass(frozen=True)
class DomainEvent:
    event_id: str
    event_type: str
    aggregate_id: str
    aggregate_type: str
    version: int
    timestamp: datetime
    payload: dict[str, Any]
    metadata: dict[str, str] = field(default_factory=dict)


class EventStore(Protocol):
    async def append(
        self,
        aggregate_id: str,
        events: list[DomainEvent],
        expected_version: int,
    ) -> None: ...

    async def get_events(
        self, aggregate_id: str, from_version: int = 0
    ) -> list[DomainEvent]: ...


class PostgresEventStore:
    def __init__(self, pool: asyncpg.Pool):
        self._pool = pool

    async def append(
        self,
        aggregate_id: str,
        events: list[DomainEvent],
        expected_version: int,
    ) -> None:
        async with self._pool.acquire() as conn:
            async with conn.transaction():
                # 楽観的同時実行制御
                row = await conn.fetchrow(
                    "SELECT MAX(version) AS cur FROM events WHERE aggregate_id = $1",
                    aggregate_id,
                )
                current = row["cur"] or 0
                if current != expected_version:
                    raise ConcurrencyError(
                        f"Expected {expected_version}, got {current}"
                    )

                for event in events:
                    await conn.execute(
                        """
                        INSERT INTO events
                            (event_id, aggregate_id, aggregate_type,
                             event_type, version, payload, metadata, timestamp)
                        VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
                        """,
                        event.event_id,
                        event.aggregate_id,
                        event.aggregate_type,
                        event.event_type,
                        event.version,
                        json.dumps(event.payload),
                        json.dumps(event.metadata),
                        event.timestamp,
                    )

    async def get_events(
        self, aggregate_id: str, from_version: int = 0
    ) -> list[DomainEvent]:
        async with self._pool.acquire() as conn:
            rows = await conn.fetch(
                """
                SELECT * FROM events
                WHERE aggregate_id = $1 AND version > $2
                ORDER BY version ASC
                """,
                aggregate_id,
                from_version,
            )
            return [self._deserialize(row) for row in rows]

    @staticmethod
    def _deserialize(row) -> DomainEvent:
        return DomainEvent(
            event_id=row["event_id"],
            event_type=row["event_type"],
            aggregate_id=row["aggregate_id"],
            aggregate_type=row["aggregate_type"],
            version=row["version"],
            timestamp=row["timestamp"],
            payload=json.loads(row["payload"]),
            metadata=json.loads(row["metadata"]),
        )

メッセージブローカー比較:Kafka vs RabbitMQ vs NATS

主要特性比較

項目Apache KafkaRabbitMQNATS JetStream
モデル分散ログ(Append-only)メッセージキュー(Push型)ストリーミング(Pull/Push)
メッセージ保存設定期間中永久保存消費後削除(デフォルト)設定期間保存
順序保証パーティション内保証キュー単位保証ストリーム内保証
スループット秒間数百万件秒間数万件秒間数十万件
レイテンシms単位(バッチ)us単位(単件)us単位
コンシューマグループネイティブサポート競合コンシューマパターンネイティブサポート
リプレイオフセットベース自由移動限定的(dead letter)シーケンスベース移動
運用複雑度高(ZooKeeper/KRaft)
適したケースイベントストリーミング、ログ集約タスクキュー、RPC軽量メッセージング、IoT

Kafka Producer/Consumer例

import { Kafka, logLevel } from 'kafkajs'

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
  logLevel: logLevel.WARN,
  retry: {
    initialRetryTime: 100,
    retries: 8,
  },
})

// Producer - イベント発行
class KafkaEventPublisher {
  private producer = kafka.producer({
    idempotent: true, // 冪等性保証
    maxInFlightRequests: 5,
    transactionalId: 'order-service-producer',
  })

  async publish(event: DomainEvent): Promise<void> {
    await this.producer.send({
      topic: `events.${event.aggregateType.toLowerCase()}`,
      messages: [
        {
          key: event.aggregateId, // パーティショニングキー = アグリゲートID
          value: JSON.stringify(event),
          headers: {
            'event-type': event.eventType,
            'correlation-id': event.metadata.correlationId,
            'content-type': 'application/json',
          },
        },
      ],
    })
  }

  async publishBatch(events: DomainEvent[]): Promise<void> {
    const transaction = await this.producer.transaction()
    try {
      for (const event of events) {
        await transaction.send({
          topic: `events.${event.aggregateType.toLowerCase()}`,
          messages: [
            {
              key: event.aggregateId,
              value: JSON.stringify(event),
              headers: {
                'event-type': event.eventType,
                'correlation-id': event.metadata.correlationId,
              },
            },
          ],
        })
      }
      await transaction.commit()
    } catch (error) {
      await transaction.abort()
      throw error
    }
  }
}

// Consumer - イベント消費(冪等性保証付き)
class KafkaEventConsumer {
  private consumer = kafka.consumer({ groupId: 'projection-service' })
  private processedEvents = new Set<string>()

  async start(): Promise<void> {
    await this.consumer.subscribe({
      topics: ['events.order'],
      fromBeginning: false,
    })

    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        const event: DomainEvent = JSON.parse(message.value!.toString())

        // 冪等性チェック - 処理済みイベントをスキップ
        if (await this.isAlreadyProcessed(event.eventId)) {
          console.log(`Skipping duplicate event: ${event.eventId}`)
          return
        }

        await this.handleEvent(event)
        await this.markAsProcessed(event.eventId)
      },
    })
  }

  private async isAlreadyProcessed(eventId: string): Promise<boolean> {
    return this.processedEvents.has(eventId)
  }

  private async markAsProcessed(eventId: string): Promise<void> {
    this.processedEvents.add(eventId)
  }

  private async handleEvent(event: DomainEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderPlaced':
        await this.handleOrderPlaced(event)
        break
      case 'OrderConfirmed':
        await this.handleOrderConfirmed(event)
        break
    }
  }

  private async handleOrderPlaced(event: DomainEvent): Promise<void> {
    console.log(`Processing OrderPlaced for ${event.aggregateId}`)
  }

  private async handleOrderConfirmed(event: DomainEvent): Promise<void> {
    console.log(`Processing OrderConfirmed for ${event.aggregateId}`)
  }
}

Sagaパターン:分散トランザクション管理

Orchestration Saga vs Choreography Saga

項目OrchestrationChoreography
制御方式中央オーケストレーターが調整各サービスが自律的に反応
結合度オーケストレーターに依存サービス間の疎結合
複雑性オーケストレーターに集中サービスに分散
追跡中央で状態確認可能分散された状態追跡が必要
エラー処理中央で補償トランザクション実行各サービスが補償イベントを発行
適した場合複雑なワークフロー(5ステップ以上)単純なワークフロー(3ステップ以下)

Orchestration Saga実装

// Saga状態マシン
enum SagaStatus {
  STARTED = 'STARTED',
  ORDER_CREATED = 'ORDER_CREATED',
  PAYMENT_PROCESSED = 'PAYMENT_PROCESSED',
  INVENTORY_RESERVED = 'INVENTORY_RESERVED',
  COMPLETED = 'COMPLETED',
  COMPENSATING = 'COMPENSATING',
  FAILED = 'FAILED',
}

interface SagaStep {
  name: string
  execute: (context: SagaContext) => Promise<void>
  compensate: (context: SagaContext) => Promise<void>
}

interface SagaContext {
  sagaId: string
  orderId: string
  customerId: string
  items: Array<{ productId: string; quantity: number; price: number }>
  paymentId?: string
  reservationId?: string
  [key: string]: unknown
}

class OrderSagaOrchestrator {
  private steps: SagaStep[] = []
  private completedSteps: SagaStep[] = []

  constructor(
    private readonly sagaRepository: SagaRepository,
    private readonly eventBus: EventBus
  ) {
    this.steps = [
      {
        name: 'CreateOrder',
        execute: async (ctx) => {
          await this.eventBus.publish({
            eventType: 'CreateOrderRequested',
            aggregateId: ctx.orderId,
            payload: { customerId: ctx.customerId, items: ctx.items },
          } as DomainEvent)
        },
        compensate: async (ctx) => {
          await this.eventBus.publish({
            eventType: 'CancelOrderRequested',
            aggregateId: ctx.orderId,
            payload: { reason: 'Saga compensation' },
          } as DomainEvent)
        },
      },
      {
        name: 'ProcessPayment',
        execute: async (ctx) => {
          const totalAmount = ctx.items.reduce((sum, item) => sum + item.price * item.quantity, 0)
          await this.eventBus.publish({
            eventType: 'ProcessPaymentRequested',
            aggregateId: ctx.orderId,
            payload: { customerId: ctx.customerId, amount: totalAmount },
          } as DomainEvent)
        },
        compensate: async (ctx) => {
          await this.eventBus.publish({
            eventType: 'RefundPaymentRequested',
            aggregateId: ctx.orderId,
            payload: { paymentId: ctx.paymentId },
          } as DomainEvent)
        },
      },
      {
        name: 'ReserveInventory',
        execute: async (ctx) => {
          await this.eventBus.publish({
            eventType: 'ReserveInventoryRequested',
            aggregateId: ctx.orderId,
            payload: { items: ctx.items },
          } as DomainEvent)
        },
        compensate: async (ctx) => {
          await this.eventBus.publish({
            eventType: 'ReleaseInventoryRequested',
            aggregateId: ctx.orderId,
            payload: { reservationId: ctx.reservationId },
          } as DomainEvent)
        },
      },
    ]
  }

  async start(context: SagaContext): Promise<void> {
    await this.sagaRepository.save(context.sagaId, SagaStatus.STARTED, context)
    await this.executeNextStep(context, 0)
  }

  private async executeNextStep(context: SagaContext, stepIndex: number): Promise<void> {
    if (stepIndex >= this.steps.length) {
      await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPLETED)
      return
    }

    const step = this.steps[stepIndex]
    try {
      await step.execute(context)
      this.completedSteps.push(step)
      await this.executeNextStep(context, stepIndex + 1)
    } catch (error) {
      console.error(`Saga step '${step.name}' failed:`, error)
      await this.compensate(context)
    }
  }

  private async compensate(context: SagaContext): Promise<void> {
    await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPENSATING)

    // 完了済みステップを逆順で補償
    for (const step of [...this.completedSteps].reverse()) {
      try {
        await step.compensate(context)
      } catch (error) {
        console.error(`Compensation for '${step.name}' failed:`, error)
        // 補償失敗時は手動介入が必要 - アラート送信
      }
    }

    await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.FAILED)
  }
}

Snapshot戦略:イベントリプレイ最適化

イベント数が多くなるとリプレイ時間が長くなる。Snapshotは特定時点のアグリゲート状態を保存してこの問題を解決する。

// Snapshotベースのアグリゲートロード
class EventSourcedRepository<T extends EventSourcedAggregate> {
  private readonly SNAPSHOT_INTERVAL = 50 // 50イベントごとにスナップショット

  constructor(
    private readonly eventStore: EventStore,
    private readonly snapshotStore: SnapshotStore,
    private readonly factory: () => T
  ) {}

  async load(aggregateId: string): Promise<T> {
    const aggregate = this.factory()

    // 1. 最新スナップショットをロード
    const snapshot = await this.snapshotStore.getLatest(aggregateId)

    if (snapshot) {
      aggregate.restoreFromSnapshot(snapshot.state)
      // 2. スナップショット以降のイベントのみリプレイ
      const events = await this.eventStore.getEvents(aggregateId, snapshot.version)
      aggregate.loadFromHistory(events)
    } else {
      // 3. 全イベントリプレイ
      const events = await this.eventStore.getEvents(aggregateId)
      aggregate.loadFromHistory(events)
    }

    return aggregate
  }

  async save(aggregate: T): Promise<void> {
    const events = aggregate.getUncommittedEvents()
    await this.eventStore.append(aggregate.id, events, aggregate.version - events.length)

    // スナップショット生成条件確認
    if (aggregate.version % this.SNAPSHOT_INTERVAL === 0) {
      await this.snapshotStore.save({
        aggregateId: aggregate.id,
        version: aggregate.version,
        state: aggregate.toSnapshot(),
        timestamp: new Date(),
      })
    }

    aggregate.clearUncommittedEvents()
  }
}

運用上の注意事項

必ず知っておくべき重要な警告

  1. イベントスキーマ変更はDBマイグレーションより危険:Event Storeのイベントは不変であるため、スキーマを変更すると過去のイベントリプレイが壊れる。必ず下位互換性(backward compatibility)を維持しなければならない。

  2. CQRS導入は複雑度を最低2倍にする:読み取りモデルと書き込みモデルの同期遅延(eventual consistency)をUIとビジネスロジックの両方で必ず処理しなければならない。

  3. Saga補償トランザクション失敗は手動介入が必要:補償トランザクションも失敗する可能性があり、その場合Dead Letter Queueと手動復旧プロセスが必ず必要だ。

  4. イベント順序逆転はデータ整合性を破壊する:Kafkaのパーティショニングキーが不適切に設定されると、同一アグリゲートのイベントが異なるパーティションに分散され、順序が逆転する。

  5. Event Storeの無限成長を管理すべき:SnapshotとアーカイビングなしではEvent Storeが無限に大きくなり、リプレイ性能が急激に低下する。

障害事例と復旧手順

事例1:イベント順序逆転

症状:OrderPlacedの前にOrderShippedイベントが到着し、プロジェクションが失敗する。

原因:Kafkaパーティショニングキーを設定しなかったため、イベントが異なるパーティションに分散された。

復旧

# 1. コンシューマグループオフセットリセット
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group projection-service \
  --topic events.order \
  --reset-offsets --to-earliest --execute

# 2. プロジェクションテーブル初期化
psql -c "TRUNCATE TABLE order_summaries;"

# 3. プロジェクションサービス再起動(完全リプレイ)
kubectl rollout restart deployment/projection-service

予防:必ずアグリゲートIDをKafkaパーティショニングキーとして使用する。

事例2:重複イベント処理

症状:注文が二重に生成されたり、決済が2回処理される。

原因:コンシューマ障害後の再起動時に、すでに処理されたメッセージを再度消費する(at-least-once特性)。

復旧

// 冪等性キーテーブルベースの重複防止
class IdempotencyGuard {
  constructor(private readonly db: Database) {}

  async executeOnce<T>(idempotencyKey: string, operation: () => Promise<T>): Promise<T | null> {
    try {
      // UNIQUE制約で重複挿入を防止
      await this.db.execute('INSERT INTO processed_events (event_id, processed_at) VALUES (?, ?)', [
        idempotencyKey,
        new Date(),
      ])
    } catch (error) {
      // 処理済みイベント
      console.log(`Event ${idempotencyKey} already processed, skipping`)
      return null
    }

    return operation()
  }
}

事例3:スキーマ進化の失敗

症状:新バージョンのイベントハンドラーが過去のイベントをデシリアライズできず、プロジェクションが中断される。

原因:イベントに必須フィールドを追加したが、過去のイベントにはそのフィールドがない。

復旧

// イベントアップキャスター(Upcaster)パターン
class EventUpcaster {
  private upcasters: Map<string, (event: any) => DomainEvent> = new Map()

  register(eventType: string, fromVersion: number, upcaster: (event: any) => any): void {
    this.upcasters.set(`${eventType}:v${fromVersion}`, upcaster)
  }

  upcast(event: any): DomainEvent {
    const key = `${event.eventType}:v${event.schemaVersion || 1}`
    const upcaster = this.upcasters.get(key)

    if (upcaster) {
      return this.upcast(upcaster(event)) // 再帰的に最新バージョンまで
    }

    return event
  }
}

// 使用例:v1 -> v2変換
const upcaster = new EventUpcaster()
upcaster.register('OrderPlaced', 1, (event) => ({
  ...event,
  schemaVersion: 2,
  payload: {
    ...event.payload,
    currency: event.payload.currency || 'KRW', // デフォルト値追加
    shippingMethod: 'STANDARD', // 新フィールドにデフォルト値設定
  },
}))

プロダクションチェックリスト

Event Store

  • イベントテーブルにaggregate_id + versionユニークインデックス設定
  • 楽観的同時実行制御(Optimistic Concurrency)実装確認
  • Snapshot戦略設定(50〜100イベントごと)
  • イベントアーカイビングポリシー策定(コールドストレージ移行)
  • Event Storeバックアップと復旧手順の検証

CQRS

  • Read Model再構築(Rebuild)プロセス自動化
  • 読み取り/書き込みDB分離とレプリケーション遅延モニタリング
  • Projection失敗時のアラートと自動リトライ設定
  • Eventual Consistency処理(UIでの楽観的更新)

メッセージブローカー

  • Kafka:パーティショニングキーをアグリゲートIDに設定
  • Consumer Group lagモニタリング(BurrowまたはKafka Exporter)
  • Dead Letter Queue(DLQ)設定とモニタリング
  • メッセージシリアライゼーション形式決定(Avro + Schema Registry推奨)
  • ブローカークラスタレプリケーションファクター3以上設定

Saga

  • 補償トランザクションの定義とテスト完了
  • Saga状態永続化(DB保存)確認
  • Sagaタイムアウト設定(無限待機防止)
  • 補償失敗時の手動復旧プロセス文書化

イベントスキーマ

  • Schema Registry導入(Confluent Schema Registry、AWS Glue)
  • 下位互換性検証自動化(CIパイプラインに含める)
  • イベントアップキャスター(Upcaster)実装
  • イベントカタログ文書の維持

モニタリング

  • イベント処理レイテンシメトリクス収集
  • Consumer Group lagアラート設定
  • イベント処理失敗率ダッシュボード構成
  • Correlation IDベースのエンドツーエンドトレーシング

参考資料