Skip to content
Published on

CQRSとイベントソーシング実践実装:Axon FrameworkとEventStoreDB 2026

Authors
  • Name
    Twitter
CQRSとイベントソーシング実践実装:Axon FrameworkとEventStoreDB 2026

概要

CQRS(Command Query Responsibility Segregation)とイベントソーシング(Event Sourcing)は、複雑なドメインを扱うシステムで書き込みと読み込みを分離し、状態変更の完全な履歴を保存するアーキテクチャパターンである。概念自体はシンプルだが、実際の本番環境に適用すると、イベントスキーマの進化、プロジェクションの再構築、スナップショット戦略、障害復旧など、予想外の問題が噴出する。

この記事では、JVMエコシステムの代表的なCQRS/ESフレームワークであるAxon Framework 4.xと、専用イベントストアであるEventStoreDB(現在KurrentDBにリブランディング)を中心に、概念説明から実践コード、運用レベルのトラブルシューティングまでを扱う。イベント駆動ガバナンスやSagaパターンのトレードオフは別記事で深く扱うため、ここではCQRSとイベントソーシングの実装自体に集中する。

CQRSのコア概念

従来のCRUDとCQRSの根本的な違い

従来のCRUDアーキテクチャでは、一つのドメインモデルが書き込みと読み込みの両方を担当する。注文サービスを例にすると、Orderエンティティが注文作成、状態変更、キャンセルなどの書き込みロジックと、注文一覧照会、詳細照会などの読み込みロジックを同時に実行する。データが少なくドメインが単純な場合は問題ないが、システムが複雑になると、一つのモデルに読み込み最適化と書き込み整合性保証を同時に入れることがますます難しくなる。

CQRSはこの問題を構造的に解決する。コマンド側(Write Side)はドメイン不変条件(invariant)を検証しイベントを発行することに集中し、クエリ側(Read Side)はユーザーに最適化された読み込みモデルを提供する。

項目従来のCRUDCQRS
モデル単一モデル(読み書き共有)コマンドモデル + クエリモデル分離
データストア単一DB書き込みDB + 読み込みDB(分離可能)
スケーラビリティ読み書きが同じ比率でスケール読み書きが独立してスケール
複雑さ低い(初期段階)高い(初期学習曲線)
一貫性強い一貫性結果整合性(Eventual Consistency)
監査ログ別途実装が必要イベントソーシングと組み合わせで自動確保
適切なシナリオシンプルなCRUDアプリ読み書き比率の差が大きい複雑なドメイン

CQRSを適用すべきタイミング

CQRSはすべてのシステムに適しているわけではない。以下の条件のうち3つ以上に該当すれば、導入を検討する価値がある。

  • 読み込みトラフィックが書き込みトラフィックの10倍以上である
  • 読み込みと書き込みに異なるデータモデルが必要(例:書き込みは正規化、読み込みは非正規化)
  • ドメインロジックが複雑でDDD Aggregateパターンを使用している
  • 状態変更の完全な履歴がビジネス要件である(金融、医療、物流)
  • 複数のチームが同じデータを異なる観点から消費している

逆に、シンプルなCRUDアプリやトラフィックの少ない内部管理ツールにCQRSを適用するのはオーバーエンジニアリングである。

イベントソーシングの原理

状態保存 vs イベント保存

従来の方式は現在の状態をDBに保存する。注文状態が「配送中」なら、ordersテーブルのstatusカラムがSHIPPINGに更新される。以前の状態は失われる。

イベントソーシングは状態変更を引き起こしたイベントそのものを保存する。注文の現在の状態はOrderCreatedPaymentCompletedShippingStartedなどのイベントを順番にリプレイ(replay)して導出する。イベントは不変(immutable)なので、絶対に削除や変更をしない。

# 従来の状態保存
orders テーブル: id=123, status=SHIPPING, total=50000

# イベントソーシング
イベントストリーム [order-123]:
  1. OrderCreated    (total=50000, items=[...])
  2. PaymentCompleted(method=CARD, amount=50000)
  3. ShippingStarted (trackingNo=KR123456)

現在の状態 = fold(events) => Order(status=SHIPPING, total=50000)

イベントソーシングのメリットとコスト

メリットは明確である。完全な監査ログを自動的に取得でき、タイムトラベル(Time Travel)クエリが可能になり、デバッグ時に正確にどのイベントが問題を引き起こしたかを追跡できる。特に金融ドメインでは、規制遵守のためにイベントソーシングが事実上必須である。

コストも存在する。イベントストリームが長くなるとAggregate読み込み時間が増加し(スナップショットで緩和)、イベントスキーマの変更が厄介になり(アップキャスティングで解決)、チーム全体が結果整合性モデルを理解する必要がある。これら3つの問題を解決する具体的な方法は後述する。

Axon Framework実装

Axon Frameworkアーキテクチャ概要

Axon Frameworkは、DDD、CQRS、イベントソーシングをJVM上で実装するためのフレームワークである。主要コンポーネントは以下の通りである。

  • Command Bus:コマンドを適切なハンドラーにルーティングする。高性能が必要な場合はDisruptorCommandBusを使用する。
  • Event Bus / Event Store:イベントを発行し保存する。Axon Serverを使えば、分散イベントバスとイベントストアを一度に解決できる。
  • Query Bus:クエリを適切なハンドラーにルーティングする。
  • Aggregate:ドメイン不変条件を保護するコマンドモデルの中核単位である。
  • Saga:長時間実行トランザクションを管理する。

AggregateとCommand Handlerの実装

注文ドメインをKotlinで実装してみよう。Aggregateクラスの中にコマンドハンドラーとイベントソーシングハンドラーを一緒に配置する。

// コマンド定義
data class CreateOrderCommand(
    @TargetAggregateIdentifier
    val orderId: String,
    val customerId: String,
    val items: List<OrderItem>,
    val totalAmount: Long
)

data class ConfirmPaymentCommand(
    @TargetAggregateIdentifier
    val orderId: String,
    val paymentMethod: String,
    val transactionId: String
)

data class CancelOrderCommand(
    @TargetAggregateIdentifier
    val orderId: String,
    val reason: String
)

// イベント定義
data class OrderCreatedEvent(
    val orderId: String,
    val customerId: String,
    val items: List<OrderItem>,
    val totalAmount: Long,
    val createdAt: Instant = Instant.now()
)

data class PaymentConfirmedEvent(
    val orderId: String,
    val paymentMethod: String,
    val transactionId: String,
    val confirmedAt: Instant = Instant.now()
)

data class OrderCancelledEvent(
    val orderId: String,
    val reason: String,
    val cancelledAt: Instant = Instant.now()
)
// Aggregate実装
@Aggregate
class OrderAggregate {

    @AggregateIdentifier
    private lateinit var orderId: String
    private lateinit var customerId: String
    private var totalAmount: Long = 0
    private var status: OrderStatus = OrderStatus.DRAFT
    private var items: List<OrderItem> = emptyList()

    // デフォルトコンストラクタ(Axonフレームワークがリフレクションで使用)
    constructor()

    // 作成コマンドハンドラー — 新しいAggregateインスタンスを作成する
    @CommandHandler
    constructor(cmd: CreateOrderCommand) {
        require(cmd.items.isNotEmpty()) { "注文項目は空にできない" }
        require(cmd.totalAmount > 0) { "注文金額は0より大きくなければならない" }

        AggregateLifecycle.apply(
            OrderCreatedEvent(
                orderId = cmd.orderId,
                customerId = cmd.customerId,
                items = cmd.items,
                totalAmount = cmd.totalAmount
            )
        )
    }

    // 決済確認コマンドハンドラー
    @CommandHandler
    fun handle(cmd: ConfirmPaymentCommand) {
        check(status == OrderStatus.CREATED) {
            "決済確認はCREATED状態でのみ可能。現在の状態: $status"
        }
        AggregateLifecycle.apply(
            PaymentConfirmedEvent(
                orderId = cmd.orderId,
                paymentMethod = cmd.paymentMethod,
                transactionId = cmd.transactionId
            )
        )
    }

    // 注文キャンセルコマンドハンドラー
    @CommandHandler
    fun handle(cmd: CancelOrderCommand) {
        check(status != OrderStatus.CANCELLED) {
            "既にキャンセルされた注文は再度キャンセルできない"
        }
        check(status != OrderStatus.SHIPPED) {
            "配送が開始された注文はキャンセルできない"
        }
        AggregateLifecycle.apply(
            OrderCancelledEvent(
                orderId = cmd.orderId,
                reason = cmd.reason
            )
        )
    }

    // イベントソーシングハンドラー — 状態を更新する(副作用なく純粋に)
    @EventSourcingHandler
    fun on(event: OrderCreatedEvent) {
        orderId = event.orderId
        customerId = event.customerId
        items = event.items
        totalAmount = event.totalAmount
        status = OrderStatus.CREATED
    }

    @EventSourcingHandler
    fun on(event: PaymentConfirmedEvent) {
        status = OrderStatus.PAID
    }

    @EventSourcingHandler
    fun on(event: OrderCancelledEvent) {
        status = OrderStatus.CANCELLED
    }
}

enum class OrderStatus {
    DRAFT, CREATED, PAID, SHIPPED, DELIVERED, CANCELLED
}

上記のコードで注目すべき2つの核心原則がある。第一に、@CommandHandlerではビジネスルールを検証しイベントを発行するだけである。状態を直接変更しない。第二に、@EventSourcingHandlerではイベントに基づいて状態を更新するだけである。検証ロジックは入れない。この分離が崩れると、イベントリプレイ時に予期せぬエラーが発生する。

Axon Frameworkと自前実装の比較

項目Axon Framework自前実装(Hand-rolled)
学習曲線アノテーションベースで比較的緩やかすべてのインフラを自分で理解する必要あり
コマンドルーティング自動(Command Bus)ルーティングを自前で実装する必要あり
イベントストアAxon ServerまたはJPAベースで内蔵テーブル設計 + シリアライズを自前で実装
スナップショット設定1行で有効化自前実装 + 復元ロジックが必要
アップキャスティング内蔵Upcasterチェーン変換パイプラインを自前で実装
分散環境Axon Serverがクラスタリングをサポート別途インフラ(Kafkaなど)が必要
テストFixtureConfigurationを提供テストインフラを自前で構成
柔軟性フレームワーク規約に依存完全な自由度
チーム採用Axon経験者が限定的一般的なJava/Kotlin開発者を活用可能

Axon Frameworkは初期実装速度を大幅に向上させるが、内部動作を理解せずに使用すると、運用段階でのデバッグが非常に困難になる。フレームワークを選択する前に、チームがイベントソーシングの原理を十分に理解しているか確認すべきである。

EventStoreDBの活用

EventStoreDBとは

EventStoreDB(2024年にKurrentDBにリブランディング)は、イベントソーシングのために設計された専用データベースである。PostgreSQLやMongoDBのような汎用DBの上にイベントストアを実装することもできるが、EventStoreDBはイベントストリームの読み書き、プロジェクション、サブスクリプションをネイティブでサポートする。

gRPCベースのクライアントを提供し、.NET、Java、Node.js、Python、Rustなど多様な言語をサポートする。楽観的同時実行制御(Optimistic Concurrency Control)が内蔵されており、同じストリームへの同時書き込みが発生した際に競合を検出できる。

Event Store実装比較

項目EventStoreDB (KurrentDB)Axon ServerMarten (.NET)PostgreSQL自前実装
タイプ専用イベントDB専用イベントDB + メッセージルーター.NETライブラリ(PostgreSQLベース)汎用RDBMS上の自前実装
言語サポート多言語(gRPCクライアント)JVM中心.NET専用制限なし
プロジェクションサーバーサイドJSプロジェクション内蔵クライアントサイドプロジェクションインライン + 非同期プロジェクション自前実装
サブスクリプションCatch-up、PersistentサブスクリプションTracking Event Processorデーモンベースの非同期プロジェクションPollingまたはCDC
クラスタリングリーダー・フォロワークラスタEnterpriseエディションPostgreSQLクラスタリングを活用PostgreSQL HAを活用
ライセンスServer-Side Public Licenseコミュニティ / 商用MIT該当なし
運用難易度中程度低い(マネージドサービス利用可能)低い(PostgreSQL運用)高い(すべて自前)

PythonでEventStoreDBを使う

EventStoreDBのPython gRPCクライアント(esdbclient)を使用したイベント書き込み/読み込みの例である。

import json
import uuid
from datetime import datetime, timezone
from esdbclient import EventStoreDBClient, NewEvent, StreamState

# EventStoreDB接続
client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")

# イベント作成およびストリームに追加
stream_name = f"order-{uuid.uuid4()}"

events = [
    NewEvent(
        type="OrderCreated",
        data=json.dumps({
            "orderId": stream_name.split("-", 1)[1],
            "customerId": "customer-001",
            "items": [
                {"productId": "prod-a", "quantity": 2, "price": 15000},
                {"productId": "prod-b", "quantity": 1, "price": 20000},
            ],
            "totalAmount": 50000,
            "createdAt": datetime.now(timezone.utc).isoformat(),
        }).encode("utf-8"),
        metadata=json.dumps({"correlationId": str(uuid.uuid4())}).encode("utf-8"),
        content_type="application/json",
    ),
    NewEvent(
        type="PaymentConfirmed",
        data=json.dumps({
            "paymentMethod": "CARD",
            "transactionId": f"txn-{uuid.uuid4()}",
            "confirmedAt": datetime.now(timezone.utc).isoformat(),
        }).encode("utf-8"),
        content_type="application/json",
    ),
]

# ストリームがまだ存在しない場合のみ書き込み(楽観的同時実行制御)
commit_position = client.append_to_stream(
    stream_name=stream_name,
    current_version=StreamState.NO_STREAM,
    events=events,
)
print(f"イベント記録完了。Commit position: {commit_position}")

# ストリームからイベント読み込み
recorded_events = client.get_stream(stream_name)
for event in recorded_events:
    data = json.loads(event.data.decode("utf-8"))
    print(f"[{event.type}] stream_position={event.stream_position}, data={data}")

上記の例でStreamState.NO_STREAMは、該当ストリームが存在しない場合のみ書き込みを許可する。既にストリームが存在する場合はWrongExpectedVersionErrorが発生する。これがEventStoreDBの楽観的同時実行制御である。既存のストリームにイベントを追加する場合は、最後に読み込んだイベントのstream_positioncurrent_versionとして渡す必要がある。

TypeScriptでEventStoreDBを使う

Node.js/TypeScript環境では@eventstore/db-clientパッケージを使用する。

import { EventStoreDBClient, jsonEvent, NO_STREAM, JSONEventType } from '@eventstore/db-client'

// イベント型定義
interface OrderCreated extends JSONEventType {
  type: 'OrderCreated'
  data: {
    orderId: string
    customerId: string
    items: Array<{ productId: string; quantity: number; price: number }>
    totalAmount: number
  }
}

interface PaymentConfirmed extends JSONEventType {
  type: 'PaymentConfirmed'
  data: {
    paymentMethod: string
    transactionId: string
  }
}

// クライアント作成(シングルトンとして使用)
const client = EventStoreDBClient.connectionString('esdb://localhost:2113?tls=false')

async function appendAndReadEvents(): Promise<void> {
  const streamName = `order-${crypto.randomUUID()}`

  // イベント追加
  const orderCreated = jsonEvent<OrderCreated>({
    type: 'OrderCreated',
    data: {
      orderId: streamName.replace('order-', ''),
      customerId: 'customer-001',
      items: [{ productId: 'prod-a', quantity: 2, price: 15000 }],
      totalAmount: 30000,
    },
  })

  const paymentConfirmed = jsonEvent<PaymentConfirmed>({
    type: 'PaymentConfirmed',
    data: {
      paymentMethod: 'CARD',
      transactionId: `txn-${crypto.randomUUID()}`,
    },
  })

  // 楽観的同時実行制御:ストリームが存在しない場合のみ書き込み
  await client.appendToStream(streamName, [orderCreated, paymentConfirmed], {
    expectedRevision: NO_STREAM,
  })

  // ストリームからイベント読み込み
  const events = client.readStream(streamName)
  for await (const resolvedEvent of events) {
    console.log(`[${resolvedEvent.event?.type}]`, JSON.stringify(resolvedEvent.event?.data))
  }
}

appendAndReadEvents().catch(console.error)

プロジェクションと読み込みモデル

プロジェクションとは

イベントソーシングシステムでは、イベントストリームは書き込みに最適化されている。ユーザーが「自分の注文一覧を最新順に表示してほしい」とリクエストした場合、すべてのイベントをリプレイして現在の状態を作ることはできない。プロジェクション(Projection)は、イベントを消費して読み込みに最適化された別のビュー(Read Model)を構築するプロセスである。

Axon FrameworkではEvent Handlerがこの役割を担当する。@EventHandlerアノテーションが付いたメソッドがイベントを受け取り、読み込みモデル(通常はRDBMSのテーブル)を更新する。

// 読み込みモデルエンティティ
@Entity
@Table(name = "order_summary")
data class OrderSummaryEntity(
    @Id
    val orderId: String,
    val customerId: String,
    val totalAmount: Long,
    val status: String,
    val itemCount: Int,
    val createdAt: Instant,
    var updatedAt: Instant = Instant.now()
)

// プロジェクション(Event Handler)
@Component
@ProcessingGroup("order-summary")
class OrderSummaryProjection(
    private val repository: OrderSummaryRepository
) {

    @EventHandler
    fun on(event: OrderCreatedEvent) {
        val summary = OrderSummaryEntity(
            orderId = event.orderId,
            customerId = event.customerId,
            totalAmount = event.totalAmount,
            status = "CREATED",
            itemCount = event.items.size,
            createdAt = event.createdAt
        )
        repository.save(summary)
    }

    @EventHandler
    fun on(event: PaymentConfirmedEvent) {
        repository.findById(event.orderId).ifPresent { summary ->
            repository.save(summary.copy(
                status = "PAID",
                updatedAt = Instant.now()
            ))
        }
    }

    @EventHandler
    fun on(event: OrderCancelledEvent) {
        repository.findById(event.orderId).ifPresent { summary ->
            repository.save(summary.copy(
                status = "CANCELLED",
                updatedAt = Instant.now()
            ))
        }
    }

    // リプレイ時に既存データをクリアするリセットハンドラー
    @ResetHandler
    fun onReset() {
        repository.deleteAll()
    }
}

// クエリハンドラー
@Component
class OrderQueryHandler(
    private val repository: OrderSummaryRepository
) {

    @QueryHandler
    fun handle(query: FindOrderByIdQuery): OrderSummaryEntity? {
        return repository.findById(query.orderId).orElse(null)
    }

    @QueryHandler
    fun handle(query: FindOrdersByCustomerQuery): List<OrderSummaryEntity> {
        return repository.findByCustomerIdOrderByCreatedAtDesc(query.customerId)
    }
}

プロジェクションの再構築(Replay)

プロジェクションの最大の利点は、いつでも最初から再構築できることである。読み込みモデルのスキーマを変更したり、バグでデータが破損した場合、イベントを最初からリプレイすればよい。

Axon FrameworkではTrackingEventProcessorをリセットするとプロジェクションが再構築される。@ResetHandlerが最初に実行されて既存データをクリアし、その後すべてのイベントが順番にリプレイされる。

本番環境でプロジェクションを再構築する際は、以下を必ず考慮する必要がある。

  1. 再構築時間:イベントが数百万件あれば、数時間かかることがある。別のインスタンスで再構築し、完了後にトラフィックを切り替えるBlue-Green方式を推奨する。
  2. 同時読み込み:再構築中も既存の読み込みモデルでサービスを継続する必要がある。新しいプロセシンググループ名で並行再構築を進める。
  3. リソース使用量:大量リプレイ時にDB負荷が急増する。バッチサイズ調整と速度制限が必要である。

スナップショット戦略

なぜスナップショットが必要なのか

イベントソーシングでAggregateを読み込むには、該当ストリームのすべてのイベントを最初からリプレイする必要がある。イベントが10件なら問題ないが、10,000件を超えると読み込み時間が目に見えて遅くなる。スナップショットは特定時点のAggregate状態をキャプチャしておき、以降のイベントのみをリプレイする最適化手法である。

Axon Frameworkでのスナップショット設定

Axon Frameworkでは設定ファイルでスナップショットトリガーを簡単に構成できる。

# application.yml — Axonスナップショット設定
axon:
  aggregate:
    order:
      snapshot-trigger:
        type: event-count
        threshold: 100 # 100イベントごとにスナップショット作成

Java/Kotlin設定でも可能である。

@Configuration
class AxonSnapshotConfig {

    @Bean
    fun snapshotTriggerDefinition(snapshotter: Snapshotter): SnapshotTriggerDefinition {
        // 50イベントごとにスナップショット作成
        return EventCountSnapshotTriggerDefinition(snapshotter, 50)
    }
}

スナップショット運用時の注意事項

  • Aggregate構造変更時:Aggregateクラスのフィールドを追加/削除すると、既存のスナップショットのデシリアライズに失敗する可能性がある。この場合、既存スナップショットを削除してイベントのみで再構築する必要がある。
  • スナップショット周期のチューニング:頻繁に作成するとストレージコストが増加し、まれすぎると読み込み時間が長くなる。一般的に50〜500イベント間隔が適切である。
  • ビジネス境界ベースのスナップショット:イベント数ベースの代わりに、ビジネスイベントベースでスナップショットを作成するのも良い戦略である。例えば、月末決算完了時や特定の状態遷移時にスナップショットを取得する。

イベントバージョン管理(アップキャスティング)

イベントスキーマ進化の必然性

システムが生きている限り、イベントスキーマは必ず変化する。OrderCreatedEventcouponCodeフィールドを追加する必要があるかもしれないし、totalAmountの型をLongからBigDecimalに変更する必要があるかもしれない。従来のRDBMSならALTER TABLEで済むが、イベントソーシングでは既に保存されたイベントを変更できない。

アップキャスティング(Upcasting)は、以前のバージョンのイベントを読み込む際に、現在のコードが理解できる形式に変換する手法である。

Axon Frameworkでのアップキャスティング実装

まず、イベントクラスに@Revisionアノテーションでバージョンを表示する。

// V1: 初期バージョン
@Revision("1.0")
data class OrderCreatedEvent(
    val orderId: String,
    val customerId: String,
    val items: List<OrderItem>,
    val totalAmount: Long,
    val createdAt: Instant = Instant.now()
)

// V2: couponCodeフィールド追加、currencyフィールド追加
@Revision("2.0")
data class OrderCreatedEvent(
    val orderId: String,
    val customerId: String,
    val items: List<OrderItem>,
    val totalAmount: Long,
    val currency: String = "KRW",
    val couponCode: String? = null,
    val createdAt: Instant = Instant.now()
)

// Upcaster実装: V1 -> V2
class OrderCreatedEventUpcaster : SingleEventUpcaster() {

    override fun canUpcast(intermediateRepresentation: IntermediateEventRepresentation): Boolean {
        return intermediateRepresentation.type.name == OrderCreatedEvent::class.qualifiedName
            && intermediateRepresentation.type.revision == "1.0"
    }

    override fun doUpcast(intermediateRepresentation: IntermediateEventRepresentation): IntermediateEventRepresentation {
        return intermediateRepresentation.upcast(
            SimpleSerializedType(
                OrderCreatedEvent::class.qualifiedName,
                "2.0"  // ターゲットリビジョン
            ),
            JsonNode::class.java
        ) { jsonNode ->
            // V1イベントにないフィールドにデフォルト値を追加
            val objectNode = jsonNode as ObjectNode
            objectNode.put("currency", "KRW")
            objectNode.putNull("couponCode")
            objectNode
        }
    }
}

// Upcaster登録
@Configuration
class UpcasterConfig {

    @Bean
    fun orderUpcasterChain(): JpaEventStorageEngine {
        // Upcasterはチェーンで構成 — V1->V2->V3の順序で実行
        return JpaEventStorageEngine.builder()
            .upcasterChain(DefaultUpcasterChain(listOf(
                OrderCreatedEventUpcaster(),
                // 今後V2->V3 Upcasterを追加
            )))
            .build()
    }
}

イベントバージョン管理の原則

  1. 常に下位互換性を優先する:可能であれば新しいフィールドにデフォルト値を設定し、既存フィールドは削除しない。
  2. 意味の変更を禁止する:同じフィールド名の意味を変えない。amountが元々税前金額だった場合、税後金額に意味を変えず、amountAfterTaxフィールドを新規追加する。
  3. Upcasterチェーンのテスト:Upcasterが蓄積されるほど、V1から最新バージョンまでの変換が正常に動作するか統合テストで必ず検証する。
  4. リビジョン管理の文書化:各リビジョンで何が変わったか、なぜ変わったかをコードコメントやADR(Architecture Decision Record)に記録する。

トラブルシューティング

よくある問題と解決策

1. イベントハンドラーの順序問題

複数の@EventHandlerが同じイベントをサブスクライブする場合、実行順序が保証されず、プロジェクションデータの不整合が発生する可能性がある。Axon Frameworkでは@ProcessingGroupでプロセシンググループを分離し、各グループ内では@Orderアノテーションで順序を指定する。

2. プロジェクション再構築時のOOM(Out of Memory)

数百万件のイベントを一度にリプレイするとメモリが不足する。TrackingEventProcessorのバッチサイズ(batchSize)を減らし、リプレイ速度を調整する必要がある。

axon:
  eventhandling:
    processors:
      order-summary:
        mode: tracking
        batch-size: 256 # 一度に処理するイベント数
        thread-count: 2 # 並列処理スレッド数
        token-claim-interval: 5000 # トークン更新間隔(ms)

3. シリアライザーの不一致

Axonはデフォルトでは XStream XMLシリアライザーを使用するが、Jackson JSONに切り替えるチームが多い。既存のイベントがXMLで保存されている状態でシリアライザーをJSONに変更すると、デシリアライズに失敗する。切り替え時にはマルチシリアライザー設定が必要である。

4. Aggregate読み込みタイムアウト

イベントが極端に多いAggregate(数万件以上)は読み込みに数秒かかることがある。スナップショット周期を短くするか、Aggregateの責任を分離してイベント数を減らすのが根本的な解決策である。

5. イベントの重複処理

ネットワーク障害によりイベントが重複配信される可能性がある。イベントハンドラーは必ず冪等(idempotent)に実装する必要がある。プロジェクションではイベントIDやシーケンス番号で重複を検出するロジックを入れる。

失敗事例

事例1:すべてを一つのAggregateに入れた場合

あるECチームが注文・決済・配送・レビューをすべて一つのOrderAggregateに入れた。初期は実装が簡単だったが、レビューイベントまで含まれるようになり、注文あたりのイベントが数百件に達した。Aggregate読み込み時間が3秒を超え始め、スナップショットを適用してもAggregateのシリアライズサイズが肥大化し、根本的な解決にはならなかった。

解決策:注文、決済、配送、レビューをそれぞれ別のAggregateに分離した。Aggregate間の連携はSagaで処理した。分離後、各Aggregateのイベント数は10件以内に維持された。

事例2:プロジェクションなしでイベントのみでクエリした場合

イベントソーシングを初めて導入したチームが「すべてのデータがイベントにあるのでプロジェクションは不要」と判断し、毎回のクエリでイベントをリプレイして現在の状態を計算した。データが少ない時は動作したが、一覧照会APIのレスポンスタイムが数秒に伸び、ユーザーからの苦情が殺到した。

解決策:読み取り専用のプロジェクションテーブルを作成し、イベントハンドラーが非同期で更新するように変更した。一覧照会APIのレスポンスタイムが200msから5msに短縮された。

事例3:アップキャスティングなしでイベントスキーマを変更した場合

フィールド名をamountからtotalAmountに変更する際にUpcasterを実装しなかった。既存イベントのリプレイが失敗し、特定のAggregateが読み込み不可能な状態に陥った。本番DBのイベントを直接修正するという危険なパッチを適用する必要があった。

解決策:Upcasterを実装し、イベントスキーマ変更時には必ず以前のバージョンとの互換性テストをCIに含めるようにした。

運用チェックリスト

本番環境にCQRS + イベントソーシングをデプロイする前に、以下の項目を点検する。

設計段階

  • Aggregate境界が正しく定義されているか(一つのAggregateに過度な責任がないか)
  • イベント名が過去形動詞(Created、Updated、Cancelled)で明確に定義されているか
  • コマンドハンドラーにビジネス検証ロジックのみがあり、イベントソーシングハンドラーに状態更新のみがあるか
  • プロジェクションが最低1つ以上実装されており、読み込みAPIがプロジェクションテーブルを使用しているか

イベント管理

  • イベントクラスに@Revisionアノテーションが付いているか
  • イベントスキーマ変更時にUpcasterが実装されているか
  • イベントシリアライズフォーマットが統一されているか(JSON推奨)
  • イベントサイズ制限が設定されているか(1MB以下推奨)

パフォーマンス

  • スナップショット戦略が策定されているか(イベント数ベースまたはビジネスイベントベース)
  • Aggregateあたりの平均イベント数をモニタリングしているか
  • プロジェクション再構築時間を測定し、許容範囲内か
  • プロジェクション再構築時のBlue-Green切り替え手順があるか

障害対応

  • イベントハンドラーが冪等に実装されているか
  • Dead Letter Queueが設定され、モニタリングされているか
  • プロジェクション遅延(Lag)をリアルタイムでモニタリングしているか
  • Aggregate読み込み失敗時にアラートが発生するか
  • イベントストアのバックアップとリストア手順が検証済みか

チーム能力

  • チーム全体が結果整合性(Eventual Consistency)の概念を理解しているか
  • イベントストーミング(Event Storming)ワークショップを通じてドメインイベントを導出したか
  • イベントソーシングのデバッグ方法(イベントストリーム照会、リプレイ)を理解しているか

まとめ

CQRSとイベントソーシングは複雑なドメインを扱うシステムにおいて強力なツールだが、導入コストは小さくない。チームの能力、ドメインの複雑さ、監査要件などを総合的に考慮して導入の可否を判断すべきである。

核心原則を改めて整理すると以下の通りである。

  1. Aggregateを小さく保つ:一つのAggregateが一つの不変条件を保護するように設計する。
  2. イベントは不変である:保存されたイベントは絶対に変更しない。スキーマ変更は必ずUpcasterで処理する。
  3. プロジェクションは使い捨てである:いつでも最初から再構築できなければならない。プロジェクションに原本データを入れない。
  4. 冪等性は必須である:イベントハンドラーは同じイベントを複数回処理しても同じ結果を保証しなければならない。
  5. 段階的に導入する:システム全体ではなく、最も複雑なバウンデッドコンテキスト一つから始める。

Axon Frameworkはこのすべてを構造的にサポートする成熟したフレームワークであり、EventStoreDBはイベントソーシングに特化したストアとして強力な選択肢である。この2つを一緒に使うことも、それぞれ独立して使うこともできる。重要なのはツールではなく、イベントソーシングの原理をチーム全体が理解しているかどうかである。

参考資料