Skip to content
Published on

イベントソーシング本番運用のアンチパターン:スキーマ進化、スナップショット、イベントストアスケーリング

Authors
  • Name
    Twitter
イベントソーシング本番運用パターン

この記事の位置づけ

本シリーズの前回の記事では、イベントソーシングとCQRSの基本的な実装(EventStoreDBベースのアーキテクチャやKafkaベースのマイクロサービスパターン、Sagaオーケストレーション)を解説した。本記事は、コアコンセプト(append-onlyイベントストリーム、リプレイによるアグリゲート状態復元、リードモデルプロジェクション、コマンドパスとクエリパスの分離)を理解していることを前提とする。

本記事では、本番環境で発生する問題とそれを防ぐパターンに特化する。具体的には、イベントスキーマの進化とバージョニング戦略、パフォーマンスのためのスナップショット、大規模なプロジェクション再構築、イベントストア技術選定のトレードオフ、本番障害を引き起こすアンチパターン、実際の障害事例と復旧手順を取り上げる。

イベントスキーマの進化とバージョニング

イベントソーシングシステムにおいて、イベントはイミュータブル(不変)である。一度永続化されたイベントは、変更も削除もできない。しかし、ビジネス要件は常に変化する。フィールドが追加、名前変更、削除される。イベント構造は進化する。これがイベントソーシングにおける最も困難な運用上の課題である。

4つのバージョニング戦略

スキーマ変更に対処する主要な戦略は4つあり、それぞれ異なるトレードオフがある。

戦略1: 弱いスキーマ(寛容なリーダー)

古いフィールドを維持しつつ、新しいオプショナルフィールドを追加する。コンシューマーは理解できないフィールドを無視する。

// バージョン1: 元のイベント
interface OrderPlacedV1 {
  type: 'OrderPlaced'
  orderId: string
  customerId: string
  totalAmount: number
}

// バージョン2: shippingAddressを追加(後方互換のためオプショナル)
interface OrderPlacedV2 {
  type: 'OrderPlaced'
  orderId: string
  customerId: string
  totalAmount: number
  shippingAddress?: {
    street: string
    city: string
    zipCode: string
    country: string
  }
  currency?: string // デフォルト: 'USD'
}

// コンシューマーは寛容なリーダーパターンを使用
function handleOrderPlaced(event: OrderPlacedV1 | OrderPlacedV2) {
  const currency = 'currency' in event ? event.currency : 'USD'
  const address = 'shippingAddress' in event ? event.shippingAddress : null // フォールバックとして顧客プロファイルから取得
}

これは追加的な変更にのみ有効である。フィールドの名前変更、構造変更、削除が必要な場合は破綻する。

戦略2: アップキャスティング(読み取り時変換)

古いイベントを読み取り時に最新のスキーマバージョンに変換する。イベントストアには元のバイト列が保持されるが、アプリケーション層が処理前にアップキャスターチェーンを適用する。

from dataclasses import dataclass
from typing import Any, Callable

@dataclass
class EventEnvelope:
    stream_id: str
    event_type: str
    version: int
    data: dict
    metadata: dict

class UpcasterChain:
    """アップキャスターをチェーンしてイベントを任意のバージョンから最新に変換する。"""

    def __init__(self):
        self._upcasters: dict[tuple[str, int], Callable] = {}

    def register(self, event_type: str, from_version: int,
                 upcaster: Callable[[dict], dict]):
        self._upcasters[(event_type, from_version)] = upcaster

    def upcast(self, envelope: EventEnvelope,
               target_version: int) -> EventEnvelope:
        current = envelope
        while current.version < target_version:
            key = (current.event_type, current.version)
            if key not in self._upcasters:
                raise ValueError(
                    f"No upcaster for {current.event_type} "
                    f"v{current.version} -> v{current.version + 1}"
                )
            new_data = self._upcasters[key](current.data)
            current = EventEnvelope(
                stream_id=current.stream_id,
                event_type=current.event_type,
                version=current.version + 1,
                data=new_data,
                metadata=current.metadata,
            )
        return current

# アップキャスターの登録
chain = UpcasterChain()

# V1 -> V2: 'name'を'firstName'と'lastName'に分割
chain.register('CustomerRegistered', 1, lambda d: {
    **d,
    'firstName': d['name'].split(' ')[0],
    'lastName': ' '.join(d['name'].split(' ')[1:]),
})

# V2 -> V3: デフォルト値で'tier'フィールドを追加
chain.register('CustomerRegistered', 2, lambda d: {
    **d,
    'tier': 'standard',
})

アップキャスティングは、ほとんどのスキーマ進化シナリオで推奨されるアプローチである。元のイベントはストレージ上でそのまま保持され、インメモリ表現のみが変化する。

戦略3: 新しいイベントタイプ

イベントのセマンティクスが根本的に変わる場合は、既存のイベントをバージョニングするのではなく、新しいイベントタイプを導入する。

// OrderPlacedV3で破壊的変更を加える代わりに、
// 完全に新しいイベントタイプを作成
interface OrderSubmitted {
  type: 'OrderSubmitted' // 新しい名前が新しいセマンティクスを示す
  orderId: string
  customer: {
    id: string
    tier: 'standard' | 'premium' | 'enterprise'
  }
  lineItems: Array<{
    productId: string
    quantity: number
    unitPrice: number
    discount: number
  }>
  pricing: {
    subtotal: number
    tax: number
    shipping: number
    total: number
    currency: string
  }
  submittedAt: string
}

バージョン番号の増殖を避けられるが、移行期間中はプロジェクションが古いイベントタイプと新しいイベントタイプの両方を処理する必要がある。

戦略4: コピーアンドトランスフォーム(イベントストア移行)

すべてのイベントに変換を適用して、イベントストア全体を書き直す。これは最終手段である。アップキャスティングが読み取り時にコスト過大になった場合、またはGDPR削除権などの規制コンプライアンスのためにデータを消去する必要がある場合にのみ使用する。

-- 警告: これは破壊的操作。必ず完全バックアップを取得すること。
-- ステップ1: 新しいeventsテーブルを作成
CREATE TABLE events_v2 (
    global_position BIGSERIAL PRIMARY KEY,
    stream_id UUID NOT NULL,
    stream_position INTEGER NOT NULL,
    event_type VARCHAR(256) NOT NULL,
    data JSONB NOT NULL,
    metadata JSONB NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE (stream_id, stream_position)
);

-- ステップ2: 変換を伴う移行
INSERT INTO events_v2 (stream_id, stream_position, event_type, data, metadata, created_at)
SELECT
    stream_id,
    stream_position,
    event_type,
    CASE
        WHEN event_type = 'CustomerRegistered' AND data->>'version' = '1'
        THEN jsonb_build_object(
            'version', '3',
            'firstName', split_part(data->>'name', ' ', 1),
            'lastName', regexp_replace(data->>'name', '^[^ ]+ ', ''),
            'email', data->>'email',
            'tier', 'standard'
        )
        ELSE data
    END,
    metadata,
    created_at
FROM events
ORDER BY global_position;

-- ステップ3: テーブルのスワップ(メンテナンスウィンドウ中に実施)
ALTER TABLE events RENAME TO events_deprecated;
ALTER TABLE events_v2 RENAME TO events;

スキーマ進化の判断マトリクス

変更タイプ弱いスキーマアップキャスティング新しいイベントタイプコピーアンドトランスフォーム
オプショナルフィールド追加最適過剰過剰過剰
フィールド名変更不可最適良好大規模
フィールド分割不可最適良好大規模
フィールド削除リスクあり良好最適良好
セマンティクス変更不可不可最適良好
規制対応の消去不可不可不可必須

スナップショット戦略

アグリゲートのライフタイム中に数千のイベントが蓄積されると、コマンドのたびにすべてのイベントをリプレイするのは許容できないほど遅くなる。スナップショットは、アグリゲートの現在の状態を定期的に保存することで、将来のロードではスナップショット以降のイベントのみをリプレイすればよくなる。

スナップショット実装パターン

パターン1: 定期スナップショット(N件ごと)

from dataclasses import dataclass, field
from typing import Optional
import json

@dataclass
class Snapshot:
    stream_id: str
    version: int  # スナップショット時のイベントバージョン
    state: dict
    created_at: str

class SnapshotStore:
    """PostgreSQLバックエンドのスナップショットストア。"""

    def __init__(self, conn):
        self._conn = conn

    async def save_snapshot(self, snapshot: Snapshot):
        await self._conn.execute("""
            INSERT INTO snapshots (stream_id, version, state, created_at)
            VALUES ($1, $2, $3, $4)
            ON CONFLICT (stream_id) DO UPDATE
            SET version = $4, state = $3, created_at = $4
        """, snapshot.stream_id, snapshot.version,
             json.dumps(snapshot.state), snapshot.created_at)

    async def load_snapshot(self, stream_id: str) -> Optional[Snapshot]:
        row = await self._conn.fetchrow("""
            SELECT stream_id, version, state, created_at
            FROM snapshots WHERE stream_id = $1
        """, stream_id)
        if row:
            return Snapshot(
                stream_id=row['stream_id'],
                version=row['version'],
                state=json.loads(row['state']),
                created_at=row['created_at'],
            )
        return None

class AggregateRepository:
    SNAPSHOT_INTERVAL = 100  # 100件ごとにスナップショット

    def __init__(self, event_store, snapshot_store):
        self._event_store = event_store
        self._snapshot_store = snapshot_store

    async def load(self, stream_id: str):
        snapshot = await self._snapshot_store.load_snapshot(stream_id)

        if snapshot:
            aggregate = self._rebuild_from_snapshot(snapshot)
            start_version = snapshot.version + 1
        else:
            aggregate = self._create_empty()
            start_version = 0

        events = await self._event_store.read_stream(
            stream_id, from_version=start_version
        )
        for event in events:
            aggregate.apply(event)

        return aggregate

    async def save(self, aggregate):
        new_events = aggregate.uncommitted_events
        current_version = aggregate.version

        await self._event_store.append(
            aggregate.stream_id, new_events, current_version
        )

        # 閾値に達したらスナップショットを作成
        if current_version % self.SNAPSHOT_INTERVAL == 0:
            await self._snapshot_store.save_snapshot(Snapshot(
                stream_id=aggregate.stream_id,
                version=current_version,
                state=aggregate.to_dict(),
                created_at=self._now(),
            ))

パターン2: 時間ベースのスナップショット

イベント数ではなく時間間隔でスナップショットを取得する。アグリゲート間でイベント頻度が大きく異なる場合に有効。

パターン3: オンデマンドスナップショット

ロードレイテンシが閾値を超えた場合にのみスナップショットを作成する。アクセス頻度が低いアグリゲートに対して不要なスナップショット書き込みを避けられる。

スナップショットのアンチパターン

  • 早すぎるスナップショット導入: パフォーマンス問題を計測で確認する前にスナップショットを導入してはならない。ほとんどのアグリゲートは100件未満のイベントであり、リプレイは瞬時に完了する。
  • スナップショットのバージョン結合: スナップショットにはバージョンが必要。アグリゲートの状態構造を変更すると、古いスナップショットが読み取り不能になる。スナップショットには必ずスキーマバージョンを含める。
  • 古いスナップショットを削除しない: スナップショットはストレージを蓄積する。アグリゲートごとに最新のスナップショットのみを保持するリテンションポリシーを実装する。

プロジェクションの再構築

プロジェクション(リードモデル)はイベントストリームから派生する。プロジェクションにバグがある場合、または新しいプロジェクションを導入する場合、関連するすべてのイベントをリプレイしてゼロから再構築する必要がある。

大規模な再構築

数百万のイベントに対するプロジェクション再構築は自明ではない。ポジション0からイベントを1件ずつ処理するナイーブなアプローチは、数時間から数日かかる可能性がある。

import asyncio
from dataclasses import dataclass

@dataclass
class ProjectionRebuildConfig:
    batch_size: int = 1000
    max_concurrent_batches: int = 5
    checkpoint_interval: int = 10_000
    progress_log_interval: int = 50_000

class ProjectionRebuilder:
    """バッチ処理とチェックポイントによる最適化されたプロジェクション再構築。"""

    def __init__(self, event_store, projection_handler, checkpoint_store,
                 config: ProjectionRebuildConfig = None):
        self._event_store = event_store
        self._handler = projection_handler
        self._checkpoint_store = checkpoint_store
        self._config = config or ProjectionRebuildConfig()

    async def rebuild(self, projection_name: str):
        # 再構築が中断された場合、最後のチェックポイントから再開
        last_position = await self._checkpoint_store.get(
            f"rebuild:{projection_name}"
        )
        start_position = last_position + 1 if last_position else 0

        total_processed = 0
        position = start_position

        while True:
            events = await self._event_store.read_all(
                from_position=position,
                count=self._config.batch_size,
            )
            if not events:
                break

            # バッチを処理
            await self._handler.handle_batch(events)
            position = events[-1].global_position + 1
            total_processed += len(events)

            # 定期的にチェックポイント
            if total_processed % self._config.checkpoint_interval == 0:
                await self._checkpoint_store.save(
                    f"rebuild:{projection_name}",
                    events[-1].global_position,
                )

            if total_processed % self._config.progress_log_interval == 0:
                print(
                    f"再構築進捗: {total_processed}件処理済み, "
                    f"ポジション: {position}"
                )

        print(f"再構築完了: 合計{total_processed}件処理済み")

プロジェクション再構築の主要原則

  1. 冪等性: プロジェクションは冪等でなければならない。同じイベントを2回処理しても同じ結果を生む必要がある。盲目的なINSERTではなくUPSERTまたはINSERT ... ON CONFLICTを使用する。
  2. チェックポイント: 最後に処理したグローバルポジションを保存し、障害後に再構築を再開できるようにする。
  3. ブルーグリーンプロジェクション: 新しいプロジェクションを別のテーブルまたはデータベースに構築し、アトミックにスワップする。再構築中のダウンタイムを回避できる。
  4. バックプレッシャー: プロジェクションが外部システム(Elasticsearch、Redis)に書き込む場合、リプレイ中にそれらを圧倒しないようレート制限を実装する。

イベントストア技術比較

適切なイベントストアの選定は、重要なアーキテクチャ上の決定である。以下の表は、主要な4つの選択肢を重要なディメンションで比較する。

ディメンションEventStoreDBApache KafkaPostgreSQLDynamoDB
主な用途専用イベントストア分散ストリーミング基盤汎用RDBMSマネージドNoSQL(AWS)
ストリーム単位の読み取りネイティブ(ストリームIDでインデックス)コンシューマーのキーフィルタリングが必要stream_idカラムのカスタムクエリパーティションキークエリ
グローバル順序組み込みグローバルポジションパーティション内のみ、グローバル順序なしBIGSERIALグローバルポジションカラムネイティブのグローバル順序なし
楽観的同時実行制御ネイティブの期待バージョンサポートなしカスタムバージョンカラムチェック条件式(Condition Expressions)
組み込みプロジェクションあり(JavaScriptベース)なし(Kafka StreamsまたはksqlDBを使用)なし(カスタムコード)なし(DynamoDB Streams + Lambda)
最大スループット約15,000書き込み/秒(ノードあたり)100,000+書き込み/秒(クラスターあたり)10,000-50,000書き込み/秒(チューニング後)実質無制限(オンデマンド)
サブスクリプションモデルキャッチアップ・永続サブスクリプションコンシューマーグループ(オフセット追跡)ポーリングまたはLISTEN/NOTIFYDynamoDB Streams(CDC)
運用複雑度中(専用クラスター)高(ZooKeeper/KRaft、ブローカー、スキーマレジストリ)低(既存インフラ)低(フルマネージド)
データ保持無制限(append-only)設定可能(リテンションポリシー)無制限(手動管理)設定可能(TTL)
GDPR削除暗号シュレッディングまたはストリーム削除トピックコンパクション(トゥームストーン)SQL DELETE(不変性を破壊)TTLまたは条件付き削除
最適な用途専用ESシステム高スループットイベントストリーミング既存PostgreSQLを持つチームAWSネイティブサーバーレス

使い分けの指針

  • EventStoreDB: 専用のイベントソーシングシステムを構築し、ストリーム、サブスクリプション、プロジェクションのネイティブサポートが必要な場合。書き込みスループットが秒間15,000イベント未満の場合。
  • Apache Kafka: 主な要件がマイクロサービス間の高スループットイベントストリーミングである場合。イベントソーシングのプリミティブ(ストリーム読み取り、同時実行制御)をKafka上に自前またはフレームワークで構築する意思がある場合。
  • PostgreSQL: すでにPostgreSQLを運用しており、新しいインフラを導入せずにイベントソーシングを始めたい場合。強いトランザクション保証が必要で、標準SQLでストリーム読み取りを実装できる場合。
  • DynamoDB: AWS上で運用しており、フルマネージドのサーバーレスイベントストアが必要な場合。DynamoDB StreamsがプロジェクションのためのCDCを提供し、従量課金モデルが変動ワークロードに適している場合。

本番運用のアンチパターン

アンチパターン1: プロパティソーシング

NameChangedEmailChangedAddressChangedのようなイベントを保存する代わりに、CustomerRelocatedCustomerContactUpdatedのようなドメイン的に意味のあるイベントを保存すべきである。プロパティソーシングされたイベントはビジネス意図を持たず、変更が行われた理由を理解することが不可能になる。

// 悪い例: プロパティソーシング - ビジネス意図がない
const badEvents = [
  { type: 'FieldUpdated', field: 'status', value: 'suspended' },
  { type: 'FieldUpdated', field: 'reason', value: 'payment_failed' },
]

// 良い例: ビジネス意味を持つドメインイベント
const goodEvents = [
  {
    type: 'AccountSuspendedDueToPaymentFailure',
    accountId: 'ACC-123',
    failedPaymentId: 'PAY-456',
    suspendedAt: '2026-03-07T10:30:00Z',
    retryScheduledAt: '2026-03-10T10:30:00Z',
  },
]

アンチパターン2: サービス間通信としてのイベントソーシング

イベントソーシングは単一の境界づけられたコンテキスト内のローカルな決定である。内部ドメインイベントを他のサービスに直接公開すべきではない。代わりに、内部イベントをパブリックコントラクトにマッピングした統合イベントをメッセージバスに公開する。

アンチパターン3: 大きなイベント(ファットイベント)

すべてのイベントにアグリゲートの状態全体を格納すると、イベントソーシングの目的が失われる。イベントにはデルタ(何が変わったか、なぜ変わったか)のみを含めるべきである。

アンチパターン4: イベントハンドラの冪等性の欠如

プロジェクションハンドラがバッチ処理中にクラッシュすると、イベントは再配信される。冪等性がなければ、リードモデルにデータが重複する。

-- 悪い例: 盲目的なINSERT(再配信時に重複)
INSERT INTO order_summary (order_id, status, total)
VALUES ('ORD-001', 'placed', 150.00);

-- 良い例: 冪等なUPSERT
INSERT INTO order_summary (order_id, status, total, last_event_position)
VALUES ('ORD-001', 'placed', 150.00, 42)
ON CONFLICT (order_id) DO UPDATE
SET status = EXCLUDED.status,
    total = EXCLUDED.total,
    last_event_position = EXCLUDED.last_event_position
WHERE order_summary.last_event_position < EXCLUDED.last_event_position;

アンチパターン5: 相関IDと因果IDの欠如

相関IDがないと、イベントストリーム間の本番障害デバッグはほぼ不可能になる。すべてのイベントにcorrelationId(発信元リクエスト)とcausationId(このイベントを直接引き起こしたイベント)を含めるべきである。

実際の障害事例

事例1: プロジェクション遅延による古い読み取り

シナリオ: あるECプラットフォームで、ピーク時に在庫プロジェクションがイベントストアから30秒遅延し、チェックアウト失敗が発生した。リードモデルでは商品が「在庫あり」と表示されていたが、ライトモデルでは在庫が既に枯渇していたため注文が拒否された。

根本原因: プロジェクションハンドラが並列化なしで順次処理していた。フラッシュセール中にイベント書き込み速度がプロジェクション処理速度を超えた。

復旧策:

  1. パーティション化されたプロジェクション処理を実装 -- アグリゲートIDで並列化する。
  2. リードモデルクエリにプロジェクション遅延を返す「鮮度」インジケータを追加する。
  3. チェックアウトのようなクリティカルパスでは、プロジェクションではなくイベントストリームから直接読み取る(より強い整合性)。

事例2: アグリゲートリファクタリング後のスナップショット破損

シナリオ: あるチームがOrderアグリゲートをリファクタリングし、shippingInfoを別々のaddresscarrierフィールドに分割した。デプロイ後、既存のスナップショットを持つすべての注文が、古いスナップショット形式でのデシリアライゼーションエラーにより読み込みに失敗した。

復旧策:

  1. 影響を受けたアグリゲートタイプの既存スナップショットをすべて無効化する。
  2. デシリアライゼーション失敗時にフルリプレイにフォールバックするtry-catchでスナップショットロードをラップしてデプロイする。
  3. マイグレーションロジック付きのスナップショットバージョニングを実装する。

事例3: イベントストアのディスク枯渇

シナリオ: 大量のIoTプラットフォームが生のセンサー読み取り値をイベントとして保存していた。50,000台のセンサーが毎秒レポートし、イベントストアは1日あたり200GB増加した。3ヶ月後の週末に、EventStoreDBクラスターのディスク容量が枯渇し、完全な障害が発生した。

復旧策:

  1. スケジュールジョブで古いイベントストリームをコールドストレージ(S3)にアーカイブする。
  2. リテンションウィンドウを超えたストリームにストリームトランケーションを使用する。
  3. 生のセンサー読み取り値がイベントストアに属するかどうかを再検討する -- 高頻度テレメトリには時系列データベースがより適切な可能性がある。

運用チェックリスト

イベントソーシングで本番環境に移行する前に、以下を確認する。

  • バージョン追跡付きのイベントスキーマレジストリが整備されている。
  • すべての歴史的なイベントバージョンに対してアップキャスターが登録・テストされている。
  • バージョニングとリテンションポリシーを含むスナップショット戦略が定義されている。
  • ダウンタイムなしでプロジェクション再構築をトリガーできる(ブルーグリーンデプロイメント)。
  • すべてのイベントハンドラが重複排除チェック付きで冪等である。
  • すべてのイベントに相関IDと因果IDが存在する。
  • イベントストア監視に以下が含まれる:追記レイテンシ、サブスクリプション遅延、ディスク使用量、ストリーム数。
  • 災害復旧計画に以下が含まれる:イベントストアのバックアップ/リストア手順、プロジェクション再構築の運用手順書。
  • GDPRコンプライアンス戦略が文書化されている(暗号シュレッディングまたはストリーム削除)。

アーキテクチャ図: 本番イベントソーシングシステム

+------------------+     +-----------------+     +------------------+
|   API Gateway    |---->| Command Handler |---->| Domain Aggregate |
+------------------+     +-----------------+     +--------+---------+
                                                          |
                              +---------------------------+
                              | Domain Events
                              v
                    +--------------------+
                    |    Event Store     |
                    | (append-only log)  |
                    +--------+-----------+
                             |
              +--------------+--------------+
              |              |              |
              v              v              v
     +--------+---+  +------+------+  +----+--------+
     | Projection |  | Projection  |  | Projection  |
     | Handler A  |  | Handler B   |  | Handler C   |
     +--------+---+  +------+------+  +----+--------+
              |              |              |
              v              v              v
     +--------+---+  +------+------+  +----+--------+
     | PostgreSQL |  |Elasticsearch|  |    Redis     |
     | Read Model |  |Search Index |  |   Cache      |
     +------------+  +-------------+  +-------------+
イベントスキーマ進化パイプライン:

  Event Store (生のバイト列)
       |
       v
  +----------+     +-----------+     +-----------+
  | Upcaster |---->| Upcaster  |---->| Upcaster  |
  |  V1->V2  |     |  V2->V3   |     |  V3->V4   |
  +----------+     +-----------+     +-----------+
                                          |
                                          v
                                  +---------------+
                                  | Current Event |
                                  | (Version 4)   |
                                  +---------------+
                                          |
                              +-----------+-----------+
                              |                       |
                              v                       v
                     +--------+------+      +---------+-----+
                     | Command Side  |      | Projection    |
                     | (Aggregate)   |      | (Read Model)  |
                     +---------------+      +---------------+

まとめ

イベントソーシングは、完全な監査証跡、テンポラルクエリ、読み書きの独立したスケーラビリティを必要とするシステムに絶大な価値を提供する。しかし、運用上の複雑さは現実のものである。スキーマ進化、スナップショット、プロジェクション管理、イベントストアスケーリングはオプションの関心事ではなく、イベントソーシングシステムが成功するか運用負荷になるかを決定する核心的な課題である。

重要なポイントは以下の通りである:スキーマ進化の主要戦略としてアップキャスティングを使用すること、計測されたパフォーマンス要求がある場合にのみスナップショットを導入すること、冪等なリプレイのためにプロジェクションを設計すること、実際のスループット要件と運用能力に基づいてイベントストア技術を選定すること。

参考文献