Skip to content
Published on

イベントソーシングとCQRSパターン実践実装ガイド:設計から運用まで

Authors
イベントソーシングとCQRSパターン実践実装ガイド:設計から運用まで

なぜイベントソーシングとCQRSを一緒に扱うのか

イベントソーシング(Event Sourcing)とCQRS(Command Query Responsibility Segregation)は独立したパターンですが、プロダクション環境ではほぼ常に一緒に使用されます。イベントソーシングで状態変更の全履歴をイベントストリームに記録すると、現在の状態はイベントリプレイ(replay)でしか復元できないため、読み取り性能が急激に低下します。CQRSで読み取り専用プロジェクション(Read Model)を分離すれば、この問題が解決されます。

本記事は概念説明にとどまりません。EventStoreDBベースのイベントストア構築、TypeScriptとPythonでのコマンドハンドラーとプロジェクションの実装、スナップショット戦略、イベントスキーマバージョン管理、障害復旧手順まで運用レベルの内容を全て網羅します。2025年以降EventStoreDBがKurrentにリブランディングされた状況と最新のgRPCクライアントAPIも反映しています。

アーキテクチャ全体構造

イベントソーシング + CQRSシステムの全体フローは以下の通りです。

  1. クライアントがコマンド(Command)を送信する
  2. コマンドハンドラーがドメイン不変条件(invariant)を検証する
  3. 検証を通過するとドメインイベントを生成しイベントストアにappendする
  4. イベントストアが**サブスクライバー(Subscriber)**にイベントをプッシュする
  5. プロジェクションハンドラーがイベントを受信して**読み取りモデル(Read Model)**を更新する
  6. クエリハンドラーが読み取りモデルからデータを照会してクライアントに返す

核心原則:書き込みパス(Command Path)と読み取りパス(Query Path)は完全に分離されます。書き込み側はイベントストアにのみ書き込み、読み取り側はプロジェクションされた読み取りモデルからのみ照会します。

イベントストアソリューション比較

プロダクションで使用可能なイベントストアソリューションを比較します。技術スタックと運用環境に合った選択が重要です。

ソリューション言語エコシステムストレージ方式プロジェクション内蔵ライセンス運用複雑度
EventStoreDB (Kurrent)言語非依存 (gRPC)専用ファイルシステムあり (JSベース)BSL (商用無料)中間
Axon ServerJVM (Java/Kotlin)専用ストレージなし (Frameworkで処理)オープンソース + Enterprise中間
Marten.NET (C#)PostgreSQLあり (C#ベース)MIT低い
EventSourcingDB言語非依存 (gRPC)専用エンジンなしオープンソース低い
PostgreSQL + 自作言語非依存RDBMSなし (自作)オープンソース高い
MongoDB + 自作言語非依存Document DBなし (自作)SSPL高い

EventStoreDBはGreg Youngが設計した専用イベントストアで、ストリーム単位のappend-onlyストレージ、内蔵プロジェクション、catch-upサブスクリプションなどイベントソーシングに必要な機能をネイティブで提供します。本記事ではEventStoreDBを基準に実装例を作成します。

ドメインイベント設計

イベントソーシングシステムにおいて、ドメインイベントはシステムの最も重要な契約(contract)です。一度保存されたイベントは絶対に変更・削除しません。したがって、イベントスキーマの設計には慎重を期す必要があります。

イベント設計原則

  • 過去形の動詞で命名する:OrderCreatedPaymentProcessedItemShipped
  • ビジネスの意図を込める:ReservationStatusChangedではなくRoomBooked
  • **自己完結的(self-contained)**であること:イベント1つで何が起こったのかを完全に把握できる必要がある
  • 単純型のみ使用する:string、number、boolean、配列。Value Objectをイベントに直接含めない
  • バージョンフィールドを含む:スキーマ進化に必須

TypeScriptイベント定義

// events/order-events.ts

interface BaseEvent {
  eventType: string
  eventVersion: number
  aggregateId: string
  timestamp: string
  metadata: {
    correlationId: string
    causationId: string
    userId: string
  }
}

interface OrderCreated extends BaseEvent {
  eventType: 'OrderCreated'
  eventVersion: 1
  data: {
    orderId: string
    customerId: string
    items: Array<{
      productId: string
      productName: string
      quantity: number
      unitPrice: number
    }>
    totalAmount: number
    currency: string
    shippingAddress: {
      street: string
      city: string
      zipCode: string
      country: string
    }
  }
}

interface OrderPaymentConfirmed extends BaseEvent {
  eventType: 'OrderPaymentConfirmed'
  eventVersion: 1
  data: {
    orderId: string
    paymentId: string
    amount: number
    method: 'CREDIT_CARD' | 'BANK_TRANSFER' | 'WALLET'
    confirmedAt: string
  }
}

interface OrderCancelled extends BaseEvent {
  eventType: 'OrderCancelled'
  eventVersion: 1
  data: {
    orderId: string
    reason: string
    cancelledBy: string
    refundAmount: number
  }
}

type OrderEvent = OrderCreated | OrderPaymentConfirmed | OrderCancelled

correlationIdcausationIdをメタデータに必ず含めます。分散システムで1つのユーザーリクエストが複数のイベントを発生させる場合、この2つのフィールドがないと障害追跡が不可能になります。

コマンドハンドラーとAggregate実装

コマンドハンドラーはイベントソーシングシステムの書き込みパスを担当します。Aggregateはドメイン不変条件を守る境界であり、イベントをリプレイして現在の状態を復元します。

TypeScript Aggregate実装

// aggregates/order-aggregate.ts

import { EventStoreDBClient, jsonEvent, FORWARDS, START } from '@eventstore/db-client'

interface OrderState {
  orderId: string
  status: 'CREATED' | 'PAID' | 'SHIPPED' | 'CANCELLED'
  customerId: string
  totalAmount: number
  items: Array<{ productId: string; quantity: number; unitPrice: number }>
  version: number
}

class OrderAggregate {
  private state: OrderState
  private pendingEvents: OrderEvent[] = []

  constructor() {
    this.state = {
      orderId: '',
      status: 'CREATED',
      customerId: '',
      totalAmount: 0,
      items: [],
      version: -1,
    }
  }

  // イベントリプレイで状態を復元
  static async load(client: EventStoreDBClient, orderId: string): Promise<OrderAggregate> {
    const aggregate = new OrderAggregate()
    const streamName = `order-${orderId}`

    const events = client.readStream(streamName, {
      direction: FORWARDS,
      fromRevision: START,
    })

    for await (const resolvedEvent of events) {
      const event = resolvedEvent.event
      if (event) {
        aggregate.apply(event.data as OrderEvent, false)
        aggregate.state.version = Number(resolvedEvent.event!.revision)
      }
    }

    return aggregate
  }

  // コマンド:注文作成
  createOrder(command: {
    orderId: string
    customerId: string
    items: Array<{ productId: string; quantity: number; unitPrice: number }>
  }): void {
    // 不変条件の検証
    if (this.state.orderId !== '') {
      throw new Error(`Order ${command.orderId} already exists`)
    }
    if (command.items.length === 0) {
      throw new Error('Order must contain at least one item')
    }

    const totalAmount = command.items.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0)

    const event: OrderCreated = {
      eventType: 'OrderCreated',
      eventVersion: 1,
      aggregateId: command.orderId,
      timestamp: new Date().toISOString(),
      metadata: { correlationId: '', causationId: '', userId: '' },
      data: {
        orderId: command.orderId,
        customerId: command.customerId,
        items: command.items.map((i) => ({
          ...i,
          productName: '', // 照会時に補完
        })),
        totalAmount,
        currency: 'KRW',
        shippingAddress: { street: '', city: '', zipCode: '', country: 'KR' },
      },
    }

    this.apply(event, true)
  }

  // コマンド:決済確認
  confirmPayment(
    paymentId: string,
    amount: number,
    method: 'CREDIT_CARD' | 'BANK_TRANSFER' | 'WALLET'
  ): void {
    if (this.state.status !== 'CREATED') {
      throw new Error(`Cannot confirm payment for order in status: ${this.state.status}`)
    }
    if (amount !== this.state.totalAmount) {
      throw new Error(
        `Payment amount ${amount} does not match order total ${this.state.totalAmount}`
      )
    }

    const event: OrderPaymentConfirmed = {
      eventType: 'OrderPaymentConfirmed',
      eventVersion: 1,
      aggregateId: this.state.orderId,
      timestamp: new Date().toISOString(),
      metadata: { correlationId: '', causationId: '', userId: '' },
      data: {
        orderId: this.state.orderId,
        paymentId,
        amount,
        method,
        confirmedAt: new Date().toISOString(),
      },
    }

    this.apply(event, true)
  }

  // イベント適用(状態遷移)
  private apply(event: OrderEvent, isNew: boolean): void {
    switch (event.eventType) {
      case 'OrderCreated':
        this.state.orderId = event.data.orderId
        this.state.customerId = event.data.customerId
        this.state.totalAmount = event.data.totalAmount
        this.state.items = event.data.items
        this.state.status = 'CREATED'
        break
      case 'OrderPaymentConfirmed':
        this.state.status = 'PAID'
        break
      case 'OrderCancelled':
        this.state.status = 'CANCELLED'
        break
    }

    if (isNew) {
      this.pendingEvents.push(event)
    }
  }

  // イベントストアに保存
  async save(client: EventStoreDBClient): Promise<void> {
    if (this.pendingEvents.length === 0) return

    const streamName = `order-${this.state.orderId}`
    const events = this.pendingEvents.map((e) =>
      jsonEvent({ type: e.eventType, data: e.data, metadata: e.metadata })
    )

    await client.appendToStream(streamName, events, {
      expectedRevision: this.state.version === -1 ? 'no_stream' : BigInt(this.state.version),
    })

    this.pendingEvents = []
  }
}

expectedRevisionパラメータが楽観的同時実行制御(Optimistic Concurrency Control)の核心です。同じAggregateに対して2つのコマンドが同時に実行されると、先に保存した側が成功し、後の側はWrongExpectedVersionErrorを受け取ります。リトライロジックでイベントを再ロードしてコマンドを再実行する必要があります。

プロジェクション設計と実装

プロジェクションはイベントストリームから読み取りモデルを構築するプロセスです。イベントソーシングシステムにおいて、プロジェクションは「望む形のビューを自由に作れる」という核心的な利点を実現する部分です。

プロジェクションパターン分類

  • インラインプロジェクション(Inline Projection):イベント保存と同時に読み取りモデルを更新します。強い一貫性を保証しますが、書き込み性能が低下します。
  • 非同期プロジェクション(Async Projection):サブスクリプション(Subscription)を通じて非同期で読み取りモデルを更新します。結果整合性(Eventual Consistency)ですが、書き込み性能が良好です。ほとんどのプロダクションシステムがこの方式を採用しています。
  • ライブプロジェクション(Live Projection):リクエストごとにイベントをリプレイします。常に最新状態を保証しますが、イベントが多い場合は性能が急激に低下します。

Python非同期プロジェクション実装

以下はPythonでEventStoreDBのPersistent Subscriptionを利用した非同期プロジェクションの実装例です。

# projections/order_summary_projection.py

import asyncio
import json
from datetime import datetime
from esdbclient import EventStoreDBClient, NewEvent, StreamState
from dataclasses import dataclass, asdict
from typing import Optional
import asyncpg

@dataclass
class OrderSummaryReadModel:
    order_id: str
    customer_id: str
    status: str
    total_amount: float
    item_count: int
    created_at: str
    updated_at: str
    payment_method: Optional[str] = None
    cancelled_reason: Optional[str] = None


class OrderSummaryProjection:
    def __init__(self, esdb_client: EventStoreDBClient, pg_pool: asyncpg.Pool):
        self.esdb = esdb_client
        self.pg_pool = pg_pool
        self._checkpoint_interval = 100
        self._processed_count = 0

    async def start(self):
        """catch-upサブスクリプションでプロジェクションを開始"""
        last_position = await self._load_checkpoint()

        subscription = self.esdb.subscribe_to_all(
            from_position=last_position,
            filter_include=[r"order-.*"],  # order-で始まるストリームのみサブスクライブ
        )

        for event in subscription:
            try:
                await self._handle_event(event)
                self._processed_count += 1

                # 定期的にチェックポイントを保存
                if self._processed_count % self._checkpoint_interval == 0:
                    await self._save_checkpoint(event.commit_position)

            except Exception as e:
                print(f"Projection error at position {event.commit_position}: {e}")
                # エラー発生時にチェックポイントを保存して再起動
                await self._save_checkpoint(event.commit_position)
                raise

    async def _handle_event(self, event):
        """イベントタイプ別ハンドラーにルーティング"""
        handler_map = {
            'OrderCreated': self._on_order_created,
            'OrderPaymentConfirmed': self._on_payment_confirmed,
            'OrderCancelled': self._on_order_cancelled,
        }

        handler = handler_map.get(event.type)
        if handler:
            data = json.loads(event.data)
            await handler(data)

    async def _on_order_created(self, data: dict):
        """注文作成イベント処理 - 読み取りモデルINSERT"""
        async with self.pg_pool.acquire() as conn:
            await conn.execute("""
                INSERT INTO order_summary (
                    order_id, customer_id, status, total_amount,
                    item_count, created_at, updated_at
                ) VALUES ($1, $2, $3, $4, $5, $6, $7)
                ON CONFLICT (order_id) DO UPDATE SET
                    status = EXCLUDED.status,
                    updated_at = EXCLUDED.updated_at
            """,
                data['orderId'],
                data['customerId'],
                'CREATED',
                data['totalAmount'],
                len(data['items']),
                datetime.fromisoformat(data.get('createdAt', datetime.now().isoformat())),
                datetime.now(),
            )

    async def _on_payment_confirmed(self, data: dict):
        """決済確認イベント処理 - 読み取りモデルUPDATE"""
        async with self.pg_pool.acquire() as conn:
            await conn.execute("""
                UPDATE order_summary
                SET status = 'PAID',
                    payment_method = $2,
                    updated_at = $3
                WHERE order_id = $1
            """, data['orderId'], data['method'], datetime.now())

    async def _on_order_cancelled(self, data: dict):
        """注文キャンセルイベント処理"""
        async with self.pg_pool.acquire() as conn:
            await conn.execute("""
                UPDATE order_summary
                SET status = 'CANCELLED',
                    cancelled_reason = $2,
                    updated_at = $3
                WHERE order_id = $1
            """, data['orderId'], data['reason'], datetime.now())

    async def _load_checkpoint(self) -> Optional[int]:
        """最後の処理位置をロード"""
        async with self.pg_pool.acquire() as conn:
            row = await conn.fetchrow(
                "SELECT position FROM projection_checkpoints WHERE name = $1",
                'order_summary'
            )
            return row['position'] if row else None

    async def _save_checkpoint(self, position: int):
        """処理位置を保存"""
        async with self.pg_pool.acquire() as conn:
            await conn.execute("""
                INSERT INTO projection_checkpoints (name, position, updated_at)
                VALUES ($1, $2, NOW())
                ON CONFLICT (name) DO UPDATE SET
                    position = EXCLUDED.position,
                    updated_at = NOW()
            """, 'order_summary', position)

プロジェクション実装で最も重要なのは**冪等性(Idempotency)**です。上記コードのON CONFLICT ... DO UPDATE構文が核心です。プロジェクションが途中で失敗して再起動すると、すでに処理したイベントを再び受け取る可能性があります。冪等に処理しなければデータが破損します。

読み取りモデルテーブルスキーマ

-- プロジェクションが使用する読み取りモデルテーブル
CREATE TABLE order_summary (
    order_id        VARCHAR(36) PRIMARY KEY,
    customer_id     VARCHAR(36) NOT NULL,
    status          VARCHAR(20) NOT NULL,
    total_amount    DECIMAL(15,2) NOT NULL,
    item_count      INTEGER NOT NULL,
    payment_method  VARCHAR(20),
    cancelled_reason TEXT,
    created_at      TIMESTAMP NOT NULL,
    updated_at      TIMESTAMP NOT NULL
);

-- 顧客別注文照会最適化インデックス
CREATE INDEX idx_order_summary_customer ON order_summary(customer_id, status);

-- ステータス別フィルタリングインデックス
CREATE INDEX idx_order_summary_status ON order_summary(status, created_at DESC);

-- プロジェクションチェックポイントテーブル
CREATE TABLE projection_checkpoints (
    name        VARCHAR(100) PRIMARY KEY,
    position    BIGINT NOT NULL,
    updated_at  TIMESTAMP NOT NULL
);

読み取りモデルはプロジェクションの要件に合わせて自由に設計します。正規化する必要はありません。顧客ダッシュボード用、管理者統計用、検索エンジンインデックス用など、複数のプロジェクションを同時に運用できます。新しいビューが必要になれば、新しいプロジェクションを追加してイベントを最初からリプレイするだけです。

スナップショット戦略

Aggregateに数千、数万件のイベントが蓄積されると、毎回全イベントをリプレイするのは非効率です。スナップショットは特定時点のAggregate状態を保存してリプレイ範囲を縮小します。

スナップショット適用基準

スナップショットを早すぎる段階で導入すると複雑度だけが上がります。以下の基準で判断します。

  • Aggregateあたりの平均イベント数が50以上であればスナップショット導入を検討する
  • イベントリプレイ時間が100msを超過する場合はスナップショットが必要
  • スナップショットの間隔は通常100〜500イベントごとに生成する

TypeScriptスナップショット実装

// snapshots/snapshot-store.ts

interface Snapshot<T> {
  aggregateId: string
  aggregateType: string
  version: number // スナップショット時点のAggregateバージョン
  schemaVersion: number // スナップショットスキーマバージョン(アップグレード検出用)
  state: T
  createdAt: string
}

class SnapshotStore {
  private client: EventStoreDBClient
  private snapshotInterval: number

  constructor(client: EventStoreDBClient, snapshotInterval = 200) {
    this.client = client
    this.snapshotInterval = snapshotInterval
  }

  async saveSnapshot<T>(snapshot: Snapshot<T>): Promise<void> {
    const streamName = `snapshot-${snapshot.aggregateType}-${snapshot.aggregateId}`
    const event = jsonEvent({
      type: 'Snapshot',
      data: snapshot,
    })

    // スナップショットストリームは最新1つだけ保持(maxCount設定)
    await this.client.appendToStream(streamName, [event])
    await this.client.setStreamMetadata(streamName, {
      maxCount: 3, // 直近3つを保持してロールバック可能性を確保
    })
  }

  async loadSnapshot<T>(
    aggregateType: string,
    aggregateId: string,
    currentSchemaVersion: number
  ): Promise<Snapshot<T> | null> {
    const streamName = `snapshot-${aggregateType}-${aggregateId}`

    try {
      const events = this.client.readStream(streamName, {
        direction: BACKWARDS,
        fromRevision: END,
        maxCount: 1,
      })

      for await (const resolved of events) {
        const snapshot = resolved.event!.data as Snapshot<T>

        // スキーマバージョンが異なる場合はスナップショットを無視(全イベントリプレイ)
        if (snapshot.schemaVersion !== currentSchemaVersion) {
          console.warn(
            `Snapshot schema mismatch for ${aggregateId}: ` +
              `expected ${currentSchemaVersion}, got ${snapshot.schemaVersion}. ` +
              `Replaying all events.`
          )
          return null
        }

        return snapshot
      }
    } catch (error) {
      // スナップショットストリームがない場合はnullを返す
      return null
    }

    return null
  }

  shouldTakeSnapshot(currentVersion: number, lastSnapshotVersion: number): boolean {
    return currentVersion - lastSnapshotVersion >= this.snapshotInterval
  }
}

スナップショットのschemaVersionフィールドに注目する必要があります。Aggregateの構造が変更されると、既存のスナップショットはもはや有効ではありません。schemaVersionが不一致の場合、スナップショットを無視して全イベントをリプレイし、新しいスナップショットを生成します。これがイベントソーシングの利点です。イベントは不変なので、いつでも再リプレイできます。

注意:スナップショットは性能最適化技法であり、必須要素ではありません。スナップショットなしでもシステムは正常に動作する必要があります。スナップショットが破損したり無効化された場合、全イベントリプレイへの自動フォールバックを実装してください。

イベントバージョン管理(Event Versioning)

プロダクションシステムが進化すると、イベントスキーマも変更されます。イベントソーシングでは保存されたイベントは絶対に変更しないため、スキーマ変更を安全に処理する戦略が必要です。

バージョン管理戦略比較

戦略説明適した状況リスク
Weak Schema新フィールド追加時にデフォルト値を使用、既存イベントと互換フィールド追加のみ必要な場合低い
Upcastingイベントデシリアライズ時にミドルウェアで変換フィールド名変更、型変更中間
New Event Type完全に新しいイベントタイプを定義イベントの意味自体が変更低い
Copy-Replace Streamストリームを新スキーマで複製後に置換大規模スキーママイグレーション高い

Upcasting実装例

最も実用的な戦略はUpcastingです。イベントをデシリアライズする過程で以前のバージョンを現在のバージョンに変換するミドルウェアを配置します。

// versioning/event-upcaster.ts

type Upcaster = (event: any) => any

// バージョン別アップキャスター登録
const upcasters: Map<string, Map<number, Upcaster>> = new Map()

// OrderCreated v1 -> v2: shippingAddressを構造化されたオブジェクトに変更
upcasters.set(
  'OrderCreated',
  new Map([
    [
      1,
      (event: any) => {
        // v1でshippingAddressが単一文字列だった場合
        const address =
          typeof event.data.shippingAddress === 'string'
            ? {
                street: event.data.shippingAddress,
                city: 'UNKNOWN',
                zipCode: 'UNKNOWN',
                country: 'KR',
              }
            : event.data.shippingAddress

        return {
          ...event,
          eventVersion: 2,
          data: {
            ...event.data,
            shippingAddress: address,
            // v2で追加されたフィールドにデフォルト値を適用
            orderSource: event.data.orderSource ?? 'WEB',
          },
        }
      },
    ],
  ])
)

function upcastEvent(event: any): any {
  const eventUpcasters = upcasters.get(event.eventType)
  if (!eventUpcasters) return event

  let current = event
  const targetVersion = Math.max(...Array.from(eventUpcasters.keys())) + 1

  // 現在のバージョンから最新バージョンまで順次適用
  for (let v = current.eventVersion; v < targetVersion; v++) {
    const upcaster = eventUpcasters.get(v)
    if (upcaster) {
      current = upcaster(current)
    }
  }

  return current
}

// 使用例
// const rawEvent = loadFromEventStore();
// const currentEvent = upcastEvent(rawEvent);
// aggregate.apply(currentEvent);

イベントバージョン管理で最も重要な原則は既存のイベントを絶対に変更しないことです。イベントストアに保存されたデータは不変です。代わりに読み取り時点で変換(Upcasting)するか、新しいイベントタイプを定義します。

障害復旧手順

イベントソーシングシステムでの障害復旧は従来のシステムとは異なります。イベントストアが真実の源泉(Single Source of Truth)であるため、プロジェクションはいつでも再構築できます。

プロジェクション破損時の復旧

プロジェクションに誤ったデータが含まれている場合や、プロジェクションロジックにバグがあった場合に使用する手順です。

  1. プロジェクションサービスの停止:該当プロジェクションのサブスクリプションを停止する
  2. 読み取りモデルテーブルのDROPまたはTRUNCATE:既存の誤ったデータを削除する
  3. チェックポイントの初期化projection_checkpointsテーブルから該当プロジェクションのpositionを削除する
  4. プロジェクションサービスの再起動:イベントストアの最初からすべてのイベントをリプレイして読み取りモデルを再構築する
  5. 再構築完了の確認:プロジェクションが現在位置まで追いついた(caught-up)ことを確認する

警告:イベントが数億件ある場合、再構築には数時間かかる可能性があります。並列処理可能なプロジェクションアーキテクチャを事前に設計するか、パーティション単位で再構築できるように準備してください。

イベントストアクラスター障害時

EventStoreDBはリーダー-フォロワーアーキテクチャを使用します。リーダーノードがダウンすると、フォロワーの1つが自動的にリーダーに昇格します。この過程での注意点は以下の通りです。

  • 書き込み失敗処理:リーダー切り替え中に書き込みが失敗した場合はリトライする。expectedRevisionで冪等性が保証される
  • サブスクリプション再接続:Persistent Subscriptionは自動再接続されるが、Catch-up Subscriptionは最後のチェックポイントから手動で再接続する必要がある
  • スプリットブレイン防止:最低3ノードクラスターを運用して、過半数(quorum)ベースのリーダー選出を可能にする

一般的な障害シナリオと対応

障害シナリオ原因対応方法
プロジェクションデータ不整合プロジェクションロジックバグプロジェクション再構築(全イベントリプレイ)
Aggregateロード失敗イベントストリーム破損スナップショットフォールバック後に部分リプレイ、クラスターレプリカを確認
同時書き込み競合同じAggregateの同時修正WrongExpectedVersionErrorをキャッチしてリトライ
サブスクリプション遅延プロジェクション処理速度不足パーティショニングまたはプロジェクションインスタンスの水平スケーリング
イベントストアディスク満杯イベント増加アーカイブポリシーの適用、ストリームmaxAge/maxCount設定
スナップショットスキーマ不一致Aggregate構造変更後のデプロイ全イベントリプレイへの自動フォールバック

運用チェックリスト

プロダクションデプロイ前に必ず確認すべき項目です。

設計段階チェックリスト

  • イベントスキーマにeventVersionフィールドが含まれているか
  • イベントメタデータにcorrelationIdcausationIdが含まれているか
  • Aggregateの不変条件検証ロジックが完全か
  • コマンド失敗時のエラー応答が明確か
  • CQRSが本当に必要なドメインか(単純なCRUDではないか再確認)

実装段階チェックリスト

  • プロジェクションハンドラーが冪等(Idempotent)か
  • 楽観的同時実行制御が実装されているか(expectedRevision
  • イベントシリアライズ/デシリアライズテストが作成されているか
  • Upcasterが以前のバージョンのイベントを正しく変換するか
  • スナップショットスキーマバージョン不一致時に全イベントリプレイにフォールバックするか

運用段階チェックリスト

  • イベントストアクラスターが3ノード以上で構成されているか
  • プロジェクションconsumer lagモニタリングが設定されているか
  • イベントストアディスク使用量アラートが設定されているか
  • プロジェクション再構築手順がドキュメント化されているか
  • スナップショット生成周期と保持ポリシーが設定されているか
  • Dead Letter Queue(DLQ)が設定されているか(処理不可イベントの隔離)
  • 障害時の手動イベント補正スクリプトが準備されているか

よくあるミスとアンチパターン

実践で頻繁に発生するミスを整理します。イベントソーシングプロジェクトが失敗するほとんどの原因がここにあります。

アンチパターン1:イベントに現在の状態全体を保存

// BAD:イベントに全状態を入れるのはCRUDと変わらない
interface OrderUpdated {
  eventType: 'OrderUpdated'
  data: {
    order: Order // 全Orderオブジェクト
  }
}

// GOOD:変更された事実のみをイベントとして記録
interface OrderItemAdded {
  eventType: 'OrderItemAdded'
  data: {
    orderId: string
    productId: string
    quantity: number
    unitPrice: number
  }
}

アンチパターン2:プロジェクションで外部サービスを呼び出す

プロジェクションハンドラーで外部APIを呼び出すと、イベントリプレイ(replay)時に副作用が発生します。プロジェクションは純粋関数のようにイベントデータだけで読み取りモデルを更新すべきです。外部サービス呼び出しが必要なロジックは別のPolicyハンドラーやSagaに分離します。

アンチパターン3:すべてのドメインにイベントソーシングを適用

イベントソーシングは状態変更履歴がビジネス価値を持つドメインに適用します。ユーザー設定、コードテーブルなど単純なCRUDドメインにイベントソーシングを適用すると複雑度だけが上がり、得るものはありません。システムのコアドメイン(Core Domain)にのみ選択的に適用するのが正しい戦略です。

アンチパターン4:イベントストリームに10万件以上蓄積

1つのAggregateストリームにイベントが10万件以上蓄積されると、ロード時間が数秒以上かかります。スナップショットで緩和できますが、根本的な原因はAggregate境界の設計が間違っていることです。Aggregateをより小さな単位に分割するか、定期的なイベント生成パターンを見直してください。

イベントソーシング vs 従来型CRUD判断基準

最後に、イベントソーシング導入の可否を判断する際に参考になる比較表です。

基準従来型CRUDイベントソーシング
現在状態の照会シンプル(直接読み取り)複雑(イベントリプレイまたはプロジェクション)
変更履歴の追跡別途監査ログが必要自動確保(イベント自体が履歴)
スキーマ変更ALTER TABLEマイグレーションイベントバージョン管理 + Upcasting
デバッグ現在状態のみ確認可能全履歴リプレイで問題原因追跡可能
データ整合性保証ACIDトランザクションAggregate単位の一貫性 + 結果整合性
ストレージ容量現在状態のみ保存すべてのイベントが蓄積(保存コスト高い)
読み取り性能インデックス最適化で十分プロジェクション設計が必須
初期開発速度速い遅い(学習曲線、インフラ構成)
複雑ドメイン対応力ドメイン複雑度増加時に限界イベント中心モデリングで拡張性確保
チーム能力要件一般的DDD、イベントモデリング経験が必要

金融取引、物流追跡、医療記録、コラボレーションツールのように変更履歴自体がビジネス価値であるドメインでは、イベントソーシングは卓越した選択です。それ以外の場合は従来のCRUDを維持し、必要な部分にのみ監査ログを追加するのが現実的です。

まとめ

イベントソーシングとCQRSは強力ですが複雑なパターンです。成功裏に導入するには以下を覚えてください。

  • イベントスキーマの設計に最も多くの時間を投資すること。イベントは永遠に残ります。
  • プロジェクションは必ず冪等に実装すること。イベントリプレイが安全でなければ運用が成り立ちません。
  • スナップショットは性能最適化ツールであり、アーキテクチャの必須要素ではありません。必要な時に導入してください。
  • イベントバージョン管理戦略をプロジェクト初期に確立すること。後から追加するのは困難です。
  • すべてのドメインに適用しないこと。コアドメインにのみ選択的に適用してください。

References