Skip to content
Published on

Event Sourcing + CQRS アーキテクチャ実践実装ガイド:イベントストアからプロジェクション・Sagaパターンまで

Authors
  • Name
    Twitter
Event Sourcing CQRS

はじめに

従来のCRUDシステムはデータの現在の状態のみを保存する。UPDATEが実行されると以前の値は失われ、DELETEが実行されるとデータ自体が消滅する。銀行口座の残高が100万円であることは分かるが、その残高がどのような入出金プロセスを経て形成されたかは分からない。

Event Sourcingはこの問題を根本的に解決する。状態変更そのものをイベントとして記録し、現在の状態はイベントの順次再生によって復元する。CQRSはこの構造の上で書き込み(Command)と読み取り(Query)を完全に分離し、それぞれの要件に最適化する。

本記事では、Event SourcingとCQRSを組み合わせたアーキテクチャの全体実装プロセスを扱う。イベントストア設計からAggregate実装、プロジェクション構築、Sagaパターンを活用した分散トランザクション管理、スナップショット最適化、そして運用時に発生する障害事例と復旧手順まで、実践コードとともに解説する。

1. Event Sourcing コア原理

1.1 イベントとは何か

Event Sourcingにおけるイベント(Event)とは、**過去に発生した変更不可能な事実(Immutable Fact)**である。イベントは過去形で命名され、ビジネスの意図を明確に表現しなければならない。

不適切なイベント設計と適切なイベント設計の比較は以下の通りである。

  • 不適切な例:NameChangedEmailChanged - プロパティ単位のイベントはビジネス的意味を持たない
  • 適切な例:CustomerRelocatedOrderConfirmed - ドメインの行為を表現するイベント

1.2 状態復元の原理

現在の状態は、初期状態にすべてのイベントを順次適用(replay)することで計算される。

現在の状態 = fold(初期状態, [イベント1, イベント2, ..., イベントN])

例えば、銀行口座の状態復元プロセスは以下の通りである。

初期残高: 0  -> AccountOpened(金額: 0)         => 残高: 0  -> MoneyDeposited(金額: 100万円)    => 残高: 100万円
  -> MoneyWithdrawn(金額: 30万円)     => 残高: 70万円
  -> MoneyDeposited(金額: 50万円)     => 残高: 120万円

1.3 イベントの不変性原則

一度保存されたイベントは絶対に修正・削除してはならない。誤ったイベントが発行された場合は、**補償イベント(Compensating Event)**を新たに発行して論理的に取り消す。

// 誤った出金が発生した場合 - イベントを削除しない
// 代わりに補償イベントを発行する
interface MoneyWithdrawnCompensated {
  type: 'MoneyWithdrawnCompensated'
  originalEventId: string
  amount: number
  reason: string
  timestamp: Date
}

2. CQRS アーキテクチャパターン

2.1 CommandとQueryの分離

CQRSの核心はシンプルである。データを変更するモデルとデータを照会するモデルを分離することだ。

[クライアント]
    |
    |-- Command(書込) --> [Command Handler] --> [Write Model / Event Store]
    |                                                     |
    |                                              イベント発行
    |                                                     |
    |                                                     v
    |                                           [Projection Engine]
    |                                                     |
    |                                                     v
    +-- Query(読取) ----> [Query Handler] ----> [Read Model / View DB]

2.2 なぜ分離するのか

読み取りと書き込みの要件は根本的に異なる。

  • 書き込み(Command):ビジネスルール検証、ドメイン不変条件(invariant)保証、トランザクション整合性が必要
  • 読み取り(Query):多様なビュー対応、低レイテンシ、高スループットが必要

一つのモデルで両方を満たそうとすると、双方に妥協が生じる。CQRSはそれぞれのモデルを独立に最適化できるようにする。

2.3 Command Handler 実装

// Command定義
interface CreateOrderCommand {
  orderId: string
  customerId: string
  items: Array<{ productId: string; quantity: number; price: number }>
}

// Command Handler
class OrderCommandHandler {
  constructor(
    private readonly eventStore: EventStore,
    private readonly orderRepository: EventSourcedRepository<Order>
  ) {}

  async handle(command: CreateOrderCommand): Promise<void> {
    // 1. Aggregateロード(イベント再生)
    const order = await this.orderRepository.load(command.orderId)

    // 2. ビジネスロジック実行(イベント生成)
    order.create(command.customerId, command.items)

    // 3. イベント保存
    await this.orderRepository.save(order)
  }
}

3. Event Store 設計と実装

3.1 イベントストアスキーマ

イベントストアのコアテーブル構造は以下の通りである。

CREATE TABLE events (
    -- グローバル一意識別子
    event_id        UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    -- イベントが属するストリーム(Aggregate)識別子
    stream_id       VARCHAR(255) NOT NULL,
    -- ストリーム内のイベント順序(楽観的同時実行制御に使用)
    stream_version  BIGINT NOT NULL,
    -- イベントタイプ(デシリアライズに使用)
    event_type      VARCHAR(255) NOT NULL,
    -- イベントペイロード(JSON)
    event_data      JSONB NOT NULL,
    -- メタデータ(相関ID、ユーザー情報など)
    metadata        JSONB DEFAULT '{}',
    -- イベント発生時刻
    created_at      TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    -- グローバル順序(プロジェクションで全体順序追跡に使用)
    global_position BIGSERIAL NOT NULL,

    -- 同一ストリーム内でのバージョン重複防止(楽観的同時実行制御)
    UNIQUE(stream_id, stream_version)
);

-- ストリーム別イベント照会インデックス
CREATE INDEX idx_events_stream ON events(stream_id, stream_version);
-- 全体イベント順序インデックス(プロジェクション用)
CREATE INDEX idx_events_global ON events(global_position);
-- イベントタイプ別照会インデックス
CREATE INDEX idx_events_type ON events(event_type);

3.2 TypeScript Event Store 実装

import { Pool } from 'pg'

interface DomainEvent {
  eventType: string
  data: Record<string, unknown>
  metadata?: Record<string, unknown>
}

interface StoredEvent extends DomainEvent {
  eventId: string
  streamId: string
  streamVersion: number
  globalPosition: number
  createdAt: Date
}

class PostgresEventStore {
  constructor(private readonly pool: Pool) {}

  /**
   * イベントをストリームに追加する。
   * expectedVersionで楽観的同時実行制御を行う。
   */
  async appendToStream(
    streamId: string,
    expectedVersion: number,
    events: DomainEvent[]
  ): Promise<StoredEvent[]> {
    const client = await this.pool.connect()
    try {
      await client.query('BEGIN')

      // 楽観的同時実行検査
      const versionCheck = await client.query(
        'SELECT MAX(stream_version) as current_version FROM events WHERE stream_id = $1',
        [streamId]
      )
      const currentVersion = versionCheck.rows[0].current_version ?? -1

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, but current version is ${currentVersion}`
        )
      }

      const storedEvents: StoredEvent[] = []
      for (let i = 0; i < events.length; i++) {
        const event = events[i]
        const version = expectedVersion + i + 1

        const result = await client.query(
          `INSERT INTO events (stream_id, stream_version, event_type, event_data, metadata)
           VALUES ($1, $2, $3, $4, $5)
           RETURNING event_id, global_position, created_at`,
          [
            streamId,
            version,
            event.eventType,
            JSON.stringify(event.data),
            JSON.stringify(event.metadata ?? {}),
          ]
        )

        storedEvents.push({
          eventId: result.rows[0].event_id,
          streamId,
          streamVersion: version,
          globalPosition: result.rows[0].global_position,
          createdAt: result.rows[0].created_at,
          ...event,
        })
      }

      await client.query('COMMIT')
      return storedEvents
    } catch (error) {
      await client.query('ROLLBACK')
      throw error
    } finally {
      client.release()
    }
  }

  /**
   * ストリームのすべてのイベントを順序通りに読み取る。
   */
  async readStream(streamId: string, fromVersion = 0): Promise<StoredEvent[]> {
    const result = await this.pool.query(
      `SELECT * FROM events
       WHERE stream_id = $1 AND stream_version >= $2
       ORDER BY stream_version ASC`,
      [streamId, fromVersion]
    )
    return result.rows.map(this.mapToStoredEvent)
  }

  /**
   * 全イベントをグローバル順序で読み取る(プロジェクション用)。
   */
  async readAll(fromPosition = 0, limit = 1000): Promise<StoredEvent[]> {
    const result = await this.pool.query(
      `SELECT * FROM events
       WHERE global_position > $1
       ORDER BY global_position ASC
       LIMIT $2`,
      [fromPosition, limit]
    )
    return result.rows.map(this.mapToStoredEvent)
  }

  private mapToStoredEvent(row: any): StoredEvent {
    return {
      eventId: row.event_id,
      streamId: row.stream_id,
      streamVersion: row.stream_version,
      globalPosition: row.global_position,
      eventType: row.event_type,
      data: row.event_data,
      metadata: row.metadata,
      createdAt: row.created_at,
    }
  }
}

3.3 専用Event Storeソリューション比較

項目EventStoreDB (Kurrent)Axon ServerPostgreSQL 自前実装
タイプ専用イベントDBCQRSフレームワークサーバー汎用RDBMS
プロジェクションサーバーサイドJavaScriptTracking Processor自前実装が必要
サブスクリプションモデルCatch-up, PersistentTracking, SubscribingPolling, LISTEN/NOTIFY
クラスタリング内蔵(Gossipプロトコル)Axon Server Enterprise外部ソリューション必要
学習コスト高(フレームワーク全体)低(SQL基盤)
柔軟性低(Axonエコシステム依存)
運用複雑度低~中

4. AggregateとDomain Eventの実装

4.1 Aggregate Base Class

Event SourcingにおけるAggregateは、イベントを発行しイベントを再生して状態を復元する中核単位である。

abstract class EventSourcedAggregate {
  private uncommittedEvents: DomainEvent[] = []
  private _version: number = -1

  get version(): number {
    return this._version
  }

  /**
   * 外部から呼び出して状態を変更する。
   * 内部的にイベントを生成し適用する。
   */
  protected apply(event: DomainEvent): void {
    this.when(event)
    this.uncommittedEvents.push(event)
    this._version++
  }

  /**
   * イベントに基づいて内部状態を変更するハンドラ。
   * サブクラスで実装する。
   */
  protected abstract when(event: DomainEvent): void

  /**
   * 保存済みイベントを再生して状態を復元する。
   */
  loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.when(event)
      this._version++
    }
  }

  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents]
  }

  clearUncommittedEvents(): void {
    this.uncommittedEvents = []
  }
}

4.2 Order Aggregate 実装

// Domain Events
interface OrderCreated {
  eventType: 'OrderCreated'
  data: {
    orderId: string
    customerId: string
    items: Array<{ productId: string; quantity: number; unitPrice: number }>
    totalAmount: number
  }
}

interface OrderConfirmed {
  eventType: 'OrderConfirmed'
  data: {
    orderId: string
    confirmedAt: string
  }
}

interface OrderCancelled {
  eventType: 'OrderCancelled'
  data: {
    orderId: string
    reason: string
    cancelledAt: string
  }
}

type OrderEvent = OrderCreated | OrderConfirmed | OrderCancelled

// Order Aggregate
class Order extends EventSourcedAggregate {
  private orderId!: string
  private customerId!: string
  private items: Array<{ productId: string; quantity: number; unitPrice: number }> = []
  private totalAmount: number = 0
  private status: 'CREATED' | 'CONFIRMED' | 'CANCELLED' = 'CREATED'

  static create(
    orderId: string,
    customerId: string,
    items: Array<{ productId: string; quantity: number; unitPrice: number }>
  ): Order {
    const order = new Order()
    const totalAmount = items.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0)

    // ビジネスルール検証
    if (items.length === 0) {
      throw new Error('注文には最低1つ以上の商品が必要です')
    }
    if (totalAmount <= 0) {
      throw new Error('注文合計は0より大きくなければなりません')
    }

    order.apply({
      eventType: 'OrderCreated',
      data: { orderId, customerId, items, totalAmount },
    })

    return order
  }

  confirm(): void {
    if (this.status !== 'CREATED') {
      throw new Error('CREATED状態の注文のみ確認できます')
    }
    this.apply({
      eventType: 'OrderConfirmed',
      data: { orderId: this.orderId, confirmedAt: new Date().toISOString() },
    })
  }

  cancel(reason: string): void {
    if (this.status === 'CANCELLED') {
      throw new Error('既にキャンセルされた注文です')
    }
    this.apply({
      eventType: 'OrderCancelled',
      data: { orderId: this.orderId, reason, cancelledAt: new Date().toISOString() },
    })
  }

  protected when(event: DomainEvent): void {
    const orderEvent = event as OrderEvent
    switch (orderEvent.eventType) {
      case 'OrderCreated':
        this.orderId = orderEvent.data.orderId
        this.customerId = orderEvent.data.customerId
        this.items = orderEvent.data.items
        this.totalAmount = orderEvent.data.totalAmount
        this.status = 'CREATED'
        break
      case 'OrderConfirmed':
        this.status = 'CONFIRMED'
        break
      case 'OrderCancelled':
        this.status = 'CANCELLED'
        break
    }
  }
}

5. プロジェクションとRead Model

5.1 プロジェクションの役割

プロジェクション(Projection)はイベントストリームを購読し、**読み取りに最適化されたビュー(Read Model)**を構築するプロセスである。Event Sourcingのイベントストリームから直接照会するのは非効率的であるため、様々な照会要件に合わせた非正規化ビューを別途維持する。

5.2 Projection Engine 実装

interface ProjectionCheckpoint {
  projectionName: string
  lastProcessedPosition: number
}

abstract class Projection {
  abstract readonly name: string

  abstract handle(event: StoredEvent): Promise<void>

  canHandle(eventType: string): boolean {
    return this.handledEventTypes().includes(eventType)
  }

  abstract handledEventTypes(): string[]
}

class ProjectionEngine {
  private projections: Projection[] = []

  constructor(
    private readonly eventStore: PostgresEventStore,
    private readonly checkpointStore: CheckpointStore
  ) {}

  register(projection: Projection): void {
    this.projections.push(projection)
  }

  /**
   * 登録された全プロジェクションを実行する。
   * 各プロジェクションは最後に処理した位置から再開する。
   */
  async run(): Promise<void> {
    while (true) {
      for (const projection of this.projections) {
        const checkpoint = await this.checkpointStore.get(projection.name)
        const lastPosition = checkpoint?.lastProcessedPosition ?? 0

        const events = await this.eventStore.readAll(lastPosition, 100)

        for (const event of events) {
          if (projection.canHandle(event.eventType)) {
            try {
              await projection.handle(event)
            } catch (error) {
              console.error(
                `Projection ${projection.name} failed at position ${event.globalPosition}:`,
                error
              )
              break
            }
          }
          await this.checkpointStore.save({
            projectionName: projection.name,
            lastProcessedPosition: event.globalPosition,
          })
        }
      }
      // ポーリング間隔
      await new Promise((resolve) => setTimeout(resolve, 500))
    }
  }
}

5.3 注文ダッシュボード Read Model

class OrderDashboardProjection extends Projection {
  readonly name = 'order-dashboard'

  constructor(private readonly db: Pool) {
    super()
  }

  handledEventTypes(): string[] {
    return ['OrderCreated', 'OrderConfirmed', 'OrderCancelled']
  }

  async handle(event: StoredEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderCreated':
        await this.db.query(
          `INSERT INTO order_dashboard (order_id, customer_id, total_amount, status, created_at)
           VALUES ($1, $2, $3, 'CREATED', $4)
           ON CONFLICT (order_id) DO NOTHING`,
          [event.data.orderId, event.data.customerId, event.data.totalAmount, event.createdAt]
        )
        break

      case 'OrderConfirmed':
        await this.db.query(
          `UPDATE order_dashboard SET status = 'CONFIRMED', confirmed_at = $2 WHERE order_id = $1`,
          [event.data.orderId, event.data.confirmedAt]
        )
        break

      case 'OrderCancelled':
        await this.db.query(
          `UPDATE order_dashboard SET status = 'CANCELLED', cancel_reason = $2 WHERE order_id = $1`,
          [event.data.orderId, event.data.reason]
        )
        break
    }
  }
}

6. Sagaパターンと分散トランザクション

6.1 Sagaとは何か

Sagaパターンは、複数サービスにまたがるトランザクションを一連のローカルトランザクションと補償トランザクションで管理するパターンである。従来の2PC(Two-Phase Commit)と異なり、長時間ロックを保持しないため、拡張性に優れている。

二つの実装方式がある。

  • Choreography(コレオグラフィ):各サービスがイベントを発行し、他のサービスが反応する。中央調整者が存在しない。
  • Orchestration(オーケストレーション):中央オーケストレーターが全体フローを管理し、各サービスにコマンドを送信する。

6.2 Orchestration Saga 実装

// Saga状態定義
type OrderSagaStatus =
  | 'STARTED'
  | 'PAYMENT_PENDING'
  | 'PAYMENT_COMPLETED'
  | 'INVENTORY_RESERVED'
  | 'COMPLETED'
  | 'COMPENSATING'
  | 'FAILED'

interface SagaStep {
  name: string
  execute: () => Promise<void>
  compensate: () => Promise<void>
}

class OrderSaga {
  private status: OrderSagaStatus = 'STARTED'
  private completedSteps: SagaStep[] = []

  constructor(
    private readonly orderId: string,
    private readonly paymentService: PaymentService,
    private readonly inventoryService: InventoryService,
    private readonly sagaLog: SagaLogStore
  ) {}

  async execute(orderData: {
    customerId: string
    items: any[]
    totalAmount: number
  }): Promise<void> {
    const steps: SagaStep[] = [
      {
        name: 'ProcessPayment',
        execute: async () => {
          await this.paymentService.processPayment(
            this.orderId,
            orderData.customerId,
            orderData.totalAmount
          )
          this.status = 'PAYMENT_COMPLETED'
        },
        compensate: async () => {
          await this.paymentService.refundPayment(this.orderId)
        },
      },
      {
        name: 'ReserveInventory',
        execute: async () => {
          await this.inventoryService.reserve(this.orderId, orderData.items)
          this.status = 'INVENTORY_RESERVED'
        },
        compensate: async () => {
          await this.inventoryService.releaseReservation(this.orderId)
        },
      },
    ]

    for (const step of steps) {
      try {
        await this.sagaLog.record(this.orderId, step.name, 'EXECUTING')
        await step.execute()
        await this.sagaLog.record(this.orderId, step.name, 'COMPLETED')
        this.completedSteps.push(step)
      } catch (error) {
        await this.sagaLog.record(this.orderId, step.name, 'FAILED')
        console.error(`Saga step ${step.name} failed:`, error)

        // 補償トランザクション実行(逆順)
        this.status = 'COMPENSATING'
        await this.compensate()
        this.status = 'FAILED'
        return
      }
    }

    this.status = 'COMPLETED'
    await this.sagaLog.record(this.orderId, 'Saga', 'COMPLETED')
  }

  private async compensate(): Promise<void> {
    // 完了した段階を逆順で補償
    const stepsToCompensate = [...this.completedSteps].reverse()
    for (const step of stepsToCompensate) {
      try {
        await this.sagaLog.record(this.orderId, step.name, 'COMPENSATING')
        await step.compensate()
        await this.sagaLog.record(this.orderId, step.name, 'COMPENSATED')
      } catch (compensateError) {
        // 補償失敗時は手動介入が必要
        await this.sagaLog.record(this.orderId, step.name, 'COMPENSATION_FAILED')
        console.error(`Compensation failed for step ${step.name}:`, compensateError)
      }
    }
  }
}

6.3 Choreography vs Orchestration 比較

項目ChoreographyOrchestration
結合度低い(イベント駆動)中程度(オーケストレーター依存)
可視性低い(フロー追跡が困難)高い(中央でフロー管理)
複雑度サービス数増加で急激に上昇オーケストレーターに集中
単一障害点なしオーケストレーター
テスト容易性困難比較的容易
適したケース2~3サービスの単純なフロー4サービス以上の複雑なフロー

7. スナップショットとイベント圧縮

7.1 スナップショットの必要性

Aggregateのイベントが数千、数万件に達すると、状態復元時間が急激に増加する。スナップショットは特定時点のAggregate状態をシリアライズして保存し、以降のイベントのみ再生する最適化手法である。

interface Snapshot {
  streamId: string
  version: number
  state: Record<string, unknown>
  createdAt: Date
}

class SnapshotRepository {
  constructor(private readonly pool: Pool) {}

  async save(snapshot: Snapshot): Promise<void> {
    await this.pool.query(
      `INSERT INTO snapshots (stream_id, version, state, created_at)
       VALUES ($1, $2, $3, $4)
       ON CONFLICT (stream_id) DO UPDATE SET version = $2, state = $3, created_at = $4`,
      [snapshot.streamId, snapshot.version, JSON.stringify(snapshot.state), snapshot.createdAt]
    )
  }

  async load(streamId: string): Promise<Snapshot | null> {
    const result = await this.pool.query('SELECT * FROM snapshots WHERE stream_id = $1', [streamId])
    return result.rows.length > 0 ? result.rows[0] : null
  }
}

class EventSourcedRepository<T extends EventSourcedAggregate> {
  private readonly SNAPSHOT_INTERVAL = 100 // 100イベントごとにスナップショット

  constructor(
    private readonly eventStore: PostgresEventStore,
    private readonly snapshotRepo: SnapshotRepository,
    private readonly factory: () => T
  ) {}

  async load(streamId: string): Promise<T> {
    const aggregate = this.factory()

    // 1. スナップショットロード試行
    const snapshot = await this.snapshotRepo.load(streamId)
    let fromVersion = 0

    if (snapshot) {
      ;(aggregate as any).restoreFromSnapshot(snapshot.state)
      ;(aggregate as any)._version = snapshot.version
      fromVersion = snapshot.version + 1
    }

    // 2. スナップショット以降のイベントのみ再生
    const events = await this.eventStore.readStream(streamId, fromVersion)
    aggregate.loadFromHistory(events)

    return aggregate
  }

  async save(aggregate: T, streamId: string): Promise<void> {
    const uncommittedEvents = aggregate.getUncommittedEvents()
    if (uncommittedEvents.length === 0) return

    await this.eventStore.appendToStream(
      streamId,
      aggregate.version - uncommittedEvents.length,
      uncommittedEvents
    )

    // スナップショット作成要否確認
    if (aggregate.version > 0 && aggregate.version % this.SNAPSHOT_INTERVAL === 0) {
      await this.snapshotRepo.save({
        streamId,
        version: aggregate.version,
        state: (aggregate as any).toSnapshot(),
        createdAt: new Date(),
      })
    }

    aggregate.clearUncommittedEvents()
  }
}

7.2 スナップショット戦略ガイドライン

  • 作成頻度:100~1,000イベントごと(ドメイン特性に応じて調整)
  • 保存場所:同一ストリームまたは別ストレージ(別ストレージ推奨)
  • 作成タイミング:非同期バックグラウンドで作成し、書き込みレイテンシに影響を与えないようにする
  • リプレイ時間が100msを超える場合:スナップショットの導入を積極的に検討する

8. Event Sourcing vs 従来のCRUD比較

項目Event Sourcing従来のCRUD
データ保存イベント(変更履歴)を保存現在の状態のみ保存
履歴追跡完全な監査証跡を自動提供別途監査テーブルが必要
タイムトラベルクエリ特定時点の状態復元が可能別途スナップショットテーブルが必要
クエリ複雑度高い(Read Modelの別途構築)低い(SQL直接照会)
整合性モデル主に結果整合性即時整合性
スキーマ変更イベントバージョン管理が必要ALTER TABLE
デバッグイベント再生で問題を再現現在の状態のみ確認可能
学習コスト高い低い
適したドメイン金融、注文、物流など履歴重要ドメイン設定管理、カタログなど単純CRUD

9. 障害事例と復旧手順

9.1 イベントストア障害

症状:イベント保存失敗によりCommand処理が停止

復旧手順

  1. DB接続状態の確認と障害原因の特定
  2. イベントストア復旧後、未処理Commandのリトライ
  3. グローバルポジションのギャップ確認 -- ギャップが発生した場合はプロジェクションの整合性検証

9.2 プロジェクション同期遅延

症状:Read Modelが最新状態を反映していない

復旧手順

  1. プロジェクションチェックポイントとイベントストアの最新ポジションを比較
  2. 遅延原因の特定(処理速度、障害、ポイズンイベントなど)
  3. 必要に応じてプロジェクションを初期化し、最初から再構築(rebuild)

9.3 Saga補償失敗

症状:補償トランザクション実行中に外部サービス障害が発生

復旧手順

  1. SagaログからCOMPENSATION_FAILED状態を照会
  2. 失敗した補償ステップを手動でリトライ
  3. リトライ回数超過時はDead Letter Queueに記録し、手動介入を実施

9.4 イベントスキーマ変更

症状:イベント構造が変更され、既存イベントのデシリアライズが失敗

復旧手順

  1. Upcasterパターンで旧バージョンイベントを新バージョンに変換
  2. プロジェクションで両方のバージョンを処理できるようにハンドラを修正
  3. 全プロジェクションのrebuildを実行

10. 運用チェックリスト

導入前レビュー

  • ドメインにEvent Sourcingが本当に必要か?(履歴追跡、監査、タイムトラベルクエリの必要性)
  • 結果整合性を受容できるか?
  • チームがDDDとイベント駆動の思考に慣れているか?

設計フェーズ

  • イベントがビジネスの意図を含んでいるか?(プロパティ変更ではなくドメイン行為)
  • Aggregate境界は適切か?(大きすぎず小さすぎないか)
  • イベントスキーマのバージョン管理戦略は策定されているか?
  • プロジェクションrebuild戦略は準備されているか?

運用フェーズ

  • イベントストア容量モニタリングとアーカイブ戦略
  • プロジェクション遅延時間(lag)のモニタリング
  • Saga失敗率と補償トランザクション成功率の追跡
  • スナップショット作成周期とディスク使用量の管理
  • ストリーム別イベント数のモニタリング(異常増加の検知)

障害対策

  • プロジェクション全体rebuildの手順と想定所要時間の文書化
  • Saga Dead Letter Queue処理プロセスの策定
  • イベントストアのバックアップと復旧手順の検証
  • Upcasterテストの自動化

まとめ

Event SourcingとCQRSは強力なアーキテクチャパターンであるが、すべてのシステムに適しているわけではない。履歴追跡、監査ログ、タイムトラベルクエリが核心的な要件であるドメイン(金融、注文管理、物流など)で最大の価値を発揮する。

導入を検討する場合、以下を推奨する。

  1. コアドメインにのみ選択的に適用せよ。すべてのサービスにEvent Sourcingを適用する必要はない。
  2. プロジェクションrebuildを最初から設計せよ。運用中のプロジェクション再構築は必ず必要になる。
  3. イベントスキーマ進化戦略を先に策定せよ。イベントは永久に保存されるため、スキーマ変更コストが最も高い。
  4. Saga補償失敗に備えよ。分散システムにおいて補償が失敗するのは時間の問題である。

結果整合性を受容し、イベント駆動の思考に転換する準備ができていれば、Event Sourcing + CQRSはシステムの復元力と拡張性を根本的に向上させるアーキテクチャ選択となるだろう。