Skip to content
Published on

イベント駆動アーキテクチャパターンガイド2025:CQRS、Event Sourcing、Sagaパターン

Authors

目次

1. イベント駆動アーキテクチャの基本概念

イベント駆動アーキテクチャ(EDA)は、システムのコンポーネントがイベントを通じて通信するソフトウェアアーキテクチャパターンです。サービス間の直接呼び出しの代わりにイベントを発行・購読する方式で、疎結合と高いスケーラビリティを実現します。

1.1 コア概念

イベント(Event): システムで発生した意味のある事実(fact)です。過去形で命名します。例:OrderPlaced、PaymentCompleted、InventoryReserved。

イベントプロデューサー(Producer): イベントを生成・発行するコンポーネントです。

イベントコンシューマー(Consumer): イベントを購読・処理するコンポーネントです。

イベントブローカー(Broker): プロデューサーとコンシューマーの間でイベントを伝達するインフラです。Kafka、RabbitMQ、AWS EventBridgeなどがあります。

1.2 イベントの種類

イベントの種類
├── Domain Event(ドメインイベント)
│   └── ビジネスドメインで発生した事実
│       例:OrderPlaced、PaymentFailed
├── Integration Event(統合イベント)
│   └── 境界づけられたコンテキスト間通信のためのイベント
│       例:OrderConfirmedIntegrationEvent
├── Event Notification(イベント通知)
│   └── 最小限のデータのみ含む。消費者が必要時に問合せ
│       例:OrderStatusChanged(orderIdのみ含む)
└── Event-Carried State Transfer(状態転送イベント)
    └── 全状態を含む。消費者の別途問合せ不要
        例:OrderDetails(全注文情報を含む)

1.3 Pub/Subパターン

// TypeScript - イベントバスインターフェース
interface EventBus {
  publish<T extends DomainEvent>(event: T): Promise<void>;
  subscribe<T extends DomainEvent>(
    eventType: string,
    handler: EventHandler<T>
  ): void;
  unsubscribe(eventType: string, handler: EventHandler<unknown>): void;
}

interface DomainEvent {
  eventId: string;
  eventType: string;
  aggregateId: string;
  occurredAt: Date;
  version: number;
  metadata: Record<string, string>;
}

interface EventHandler<T extends DomainEvent> {
  handle(event: T): Promise<void>;
}

2. ドメインイベント

2.1 ドメインイベント設計原則

優れたドメインイベントは以下の原則に従います。

不変性(Immutable): イベントは既に発生した事実なので変更できません。

過去形命名: OrderPlaced(O)、PlaceOrder(X)。イベントは既に起こったことです。

自己記述的(Self-describing): イベントだけで何が起きたか理解できる必要があります。

ビジネス意味: 技術的実装ではなくビジネスの観点から命名します。

2.2 ドメインイベント実装

// 基本ドメインイベント
abstract class BaseDomainEvent implements DomainEvent {
  readonly eventId: string;
  readonly occurredAt: Date;
  readonly version: number;
  abstract readonly eventType: string;
  abstract readonly aggregateId: string;
  metadata: Record<string, string> = {};

  constructor() {
    this.eventId = crypto.randomUUID();
    this.occurredAt = new Date();
    this.version = 1;
  }
}

// 注文作成イベント
class OrderPlaced extends BaseDomainEvent {
  readonly eventType = 'order.placed';
  readonly aggregateId: string;

  constructor(
    public readonly orderId: string,
    public readonly customerId: string,
    public readonly items: OrderItem[],
    public readonly totalAmount: Money,
    public readonly shippingAddress: Address
  ) {
    super();
    this.aggregateId = orderId;
  }
}

// 決済完了イベント
class PaymentCompleted extends BaseDomainEvent {
  readonly eventType = 'payment.completed';
  readonly aggregateId: string;

  constructor(
    public readonly paymentId: string,
    public readonly orderId: string,
    public readonly amount: Money,
    public readonly paymentMethod: string
  ) {
    super();
    this.aggregateId = paymentId;
  }
}

// 在庫予約イベント
class InventoryReserved extends BaseDomainEvent {
  readonly eventType = 'inventory.reserved';
  readonly aggregateId: string;

  constructor(
    public readonly reservationId: string,
    public readonly orderId: string,
    public readonly items: ReservedItem[]
  ) {
    super();
    this.aggregateId = reservationId;
  }
}

2.3 イベントスキーマ設計

// イベントスキーマ(Avro-like)
const OrderPlacedSchema = {
  type: 'record',
  name: 'OrderPlaced',
  namespace: 'com.ecommerce.orders.v1',
  fields: [
    { name: 'eventId', type: 'string' },
    { name: 'eventType', type: 'string' },
    { name: 'aggregateId', type: 'string' },
    { name: 'occurredAt', type: 'string' },  // ISO 8601
    { name: 'version', type: 'int' },
    { name: 'orderId', type: 'string' },
    { name: 'customerId', type: 'string' },
    {
      name: 'items',
      type: {
        type: 'array',
        items: {
          type: 'record',
          name: 'OrderItem',
          fields: [
            { name: 'productId', type: 'string' },
            { name: 'quantity', type: 'int' },
            { name: 'unitPrice', type: 'long' },
          ],
        },
      },
    },
    { name: 'totalAmount', type: 'long' },
    { name: 'currency', type: 'string' },
  ],
};

3. Event Sourcing

Event Sourcingはアプリケーションの状態をイベントのシーケンスとして保存するパターンです。現在の状態を直接保存する代わりに、状態変更の履歴を全て記録します。

3.1 基本概念

従来の方式(CRUD):
┌─────────────────────────┐
│ orders テーブル           │
│ id: ORD-001             │
│ status: shipped         │ ← 現在の状態のみ保存
│ total: 50000            │
│ updated_at: 2025-03-24  │
└─────────────────────────┘

Event Sourcing方式:
┌─────────────────────────────────────────┐
│ event_store                             │
│ 1. OrderPlaced    (ORD-001, 2025-03-24) │
│ 2. PaymentReceived(ORD-001, 2025-03-24) │
│ 3. ItemPacked     (ORD-001, 2025-03-24) │
│ 4. OrderShipped   (ORD-001, 2025-03-24) │ ← 全履歴を保存
└─────────────────────────────────────────┘

3.2 イベントストア実装

// イベントストアインターフェース
interface EventStore {
  // イベント追加(楽観的同時実行制御)
  append(
    streamId: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise<void>;

  // ストリームからイベント読み取り
  readStream(
    streamId: string,
    fromVersion?: number
  ): Promise<DomainEvent[]>;

  // 全イベント購読
  subscribe(
    handler: (event: DomainEvent) => Promise<void>
  ): void;
}

// PostgreSQLベースのイベントストア
class PostgresEventStore implements EventStore {
  async append(
    streamId: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise<void> {
    const client = await this.pool.connect();
    try {
      await client.query('BEGIN');

      // 楽観的同時実行チェック
      const result = await client.query(
        'SELECT MAX(version) as current_version FROM events WHERE stream_id = $1',
        [streamId]
      );
      const currentVersion = result.rows[0]?.current_version ?? 0;

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, but current is ${currentVersion}`
        );
      }

      // イベント保存
      let version = expectedVersion;
      for (const event of events) {
        version++;
        await client.query(
          `INSERT INTO events (stream_id, version, event_type, data, metadata, created_at)
           VALUES ($1, $2, $3, $4, $5, NOW())`,
          [streamId, version, event.eventType, JSON.stringify(event), JSON.stringify(event.metadata)]
        );
      }

      await client.query('COMMIT');
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  async readStream(streamId: string, fromVersion: number = 0): Promise<DomainEvent[]> {
    const result = await this.pool.query(
      'SELECT data FROM events WHERE stream_id = $1 AND version > $2 ORDER BY version ASC',
      [streamId, fromVersion]
    );
    return result.rows.map(row => JSON.parse(row.data));
  }
}

3.3 Event Sourcing対応Aggregate

// Event Sourced Aggregate基底クラス
abstract class EventSourcedAggregate {
  private uncommittedEvents: DomainEvent[] = [];
  version: number = 0;

  protected apply(event: DomainEvent): void {
    this.when(event);
    this.version++;
    this.uncommittedEvents.push(event);
  }

  loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.when(event);
      this.version++;
    }
  }

  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents];
  }

  clearUncommittedEvents(): void {
    this.uncommittedEvents = [];
  }

  protected abstract when(event: DomainEvent): void;
}

// 注文Aggregate
class Order extends EventSourcedAggregate {
  private id!: string;
  private customerId!: string;
  private items!: OrderItem[];
  private totalAmount!: Money;
  private status!: OrderStatus;

  // コマンド:注文作成
  static place(orderId: string, customerId: string, items: OrderItem[], total: Money): Order {
    const order = new Order();
    order.apply(new OrderPlaced(orderId, customerId, items, total, new Address()));
    return order;
  }

  // コマンド:決済確認
  confirmPayment(paymentId: string): void {
    if (this.status !== OrderStatus.PLACED) {
      throw new InvalidOperationError('PLACEDステータスの注文のみ決済確認可能');
    }
    this.apply(new OrderPaymentConfirmed(this.id, paymentId));
  }

  // コマンド:注文キャンセル
  cancel(reason: string): void {
    if (this.status === OrderStatus.CANCELLED) {
      throw new InvalidOperationError('注文は既にキャンセル済み');
    }
    this.apply(new OrderCancelled(this.id, reason));
  }

  // イベントを状態に適用
  protected when(event: DomainEvent): void {
    if (event instanceof OrderPlaced) {
      this.id = event.orderId;
      this.customerId = event.customerId;
      this.items = event.items;
      this.totalAmount = event.totalAmount;
      this.status = OrderStatus.PLACED;
    } else if (event instanceof OrderPaymentConfirmed) {
      this.status = OrderStatus.CONFIRMED;
    } else if (event instanceof OrderCancelled) {
      this.status = OrderStatus.CANCELLED;
    }
  }
}

3.4 スナップショット

イベントが多く蓄積されるとAggregate復元が遅くなります。スナップショットでこれを最適化します。

class SnapshotStore {
  async saveSnapshot(
    streamId: string,
    snapshot: AggregateSnapshot
  ): Promise<void> {
    await this.pool.query(
      `INSERT INTO snapshots (stream_id, version, data, created_at)
       VALUES ($1, $2, $3, NOW())
       ON CONFLICT (stream_id) DO UPDATE SET version = $2, data = $3, created_at = NOW()`,
      [streamId, snapshot.version, JSON.stringify(snapshot.state)]
    );
  }

  async getLatestSnapshot(streamId: string): Promise<AggregateSnapshot | null> {
    const result = await this.pool.query(
      'SELECT version, data FROM snapshots WHERE stream_id = $1',
      [streamId]
    );
    if (result.rows.length === 0) return null;
    return {
      version: result.rows[0].version,
      state: JSON.parse(result.rows[0].data),
    };
  }
}

// スナップショットを活用したAggregate読み込み
class OrderRepository {
  async getById(orderId: string): Promise<Order> {
    const order = new Order();

    // 1. スナップショットロード
    const snapshot = await this.snapshotStore.getLatestSnapshot(orderId);
    let fromVersion = 0;

    if (snapshot) {
      order.restoreFromSnapshot(snapshot.state);
      fromVersion = snapshot.version;
    }

    // 2. スナップショット以降のイベントのみロード
    const events = await this.eventStore.readStream(orderId, fromVersion);
    order.loadFromHistory(events);

    // 3. 一定イベント数後にスナップショット作成
    if (events.length > 50) {
      await this.snapshotStore.saveSnapshot(orderId, {
        version: order.version,
        state: order.toSnapshot(),
      });
    }

    return order;
  }
}

3.5 プロジェクション(リードモデル)

// イベントからリードモデルを構築するプロジェクション
class OrderProjection {
  constructor(private readDb: Pool) {}

  async handle(event: DomainEvent): Promise<void> {
    if (event instanceof OrderPlaced) {
      await this.readDb.query(
        `INSERT INTO order_read_model (id, customer_id, status, total_amount, created_at, version)
         VALUES ($1, $2, 'PLACED', $3, $4, $5)`,
        [event.orderId, event.customerId, event.totalAmount.amount, event.occurredAt, event.version]
      );

      for (const item of event.items) {
        await this.readDb.query(
          `INSERT INTO order_items_read_model (order_id, product_id, quantity, unit_price)
           VALUES ($1, $2, $3, $4)`,
          [event.orderId, item.productId, item.quantity, item.unitPrice]
        );
      }
    } else if (event instanceof OrderPaymentConfirmed) {
      await this.readDb.query(
        `UPDATE order_read_model SET status = 'CONFIRMED', version = $2 WHERE id = $1`,
        [event.orderId, event.version]
      );
    } else if (event instanceof OrderCancelled) {
      await this.readDb.query(
        `UPDATE order_read_model SET status = 'CANCELLED', version = $2 WHERE id = $1`,
        [event.orderId, event.version]
      );
    }
  }
}

4. CQRS(コマンドクエリ責務分離)

CQRSは書き込みモデル(Command)と読み取りモデル(Query)を分離して、それぞれを独立に最適化するパターンです。

4.1 CQRSアーキテクチャ

┌─────────────────────────────────────────────────┐
│                   クライアント                     │
│         ┌──────────┐  ┌──────────┐              │
│         │  書き込み  │  │  読み取り  │              │
│         │  リクエスト │  │  リクエスト │              │
│         └────┬─────┘  └─────┬────┘              │
└──────────────┼──────────────┼───────────────────┘
               │              │
    ┌──────────▼──────┐  ┌────▼──────────┐
    │  Command Side   │  │  Query Side   │
    │                 │  │               │
    │  Command Handler│  │  Query Handler│
    │       │         │  │       │       │
    │  ┌────▼──────┐  │  │  ┌────▼────┐  │
    │  │ ドメイン    │  │  │  │ Read DB │  │
    │  │ モデル     │  │  │  │ (非正規化)│  │
    │  └────┬──────┘  │  │  └─────────┘  │
    │  ┌────▼──────┐  │  │       ▲       │
    │  │ Write DB  │  │  │       │       │
    │  └────┬──────┘  │  │  プロジェクション │
    └───────┼─────────┘  └───────┼───────┘
            │                    │
            └─── イベント ─────────┘

4.2 Command Side実装

// コマンド定義
class PlaceOrderCommand {
  constructor(
    public readonly customerId: string,
    public readonly items: OrderItemDto[],
    public readonly shippingAddress: AddressDto
  ) {}
}

// コマンドハンドラー
class PlaceOrderHandler {
  constructor(
    private orderRepo: OrderRepository,
    private eventBus: EventBus
  ) {}

  async handle(command: PlaceOrderCommand): Promise<string> {
    // ビジネスロジック
    const orderId = generateOrderId();
    const items = command.items.map(i => new OrderItem(i.productId, i.quantity, i.unitPrice));
    const total = items.reduce((sum, i) => sum + i.quantity * i.unitPrice, 0);

    // Aggregate作成とイベント適用
    const order = Order.place(orderId, command.customerId, items, new Money(total, 'USD'));

    // イベントストアに保存
    await this.orderRepo.save(order);

    // イベント発行
    for (const event of order.getUncommittedEvents()) {
      await this.eventBus.publish(event);
    }

    return orderId;
  }
}

4.3 Query Side実装

// クエリ定義
class GetOrderDetailsQuery {
  constructor(public readonly orderId: string) {}
}

class ListCustomerOrdersQuery {
  constructor(
    public readonly customerId: string,
    public readonly page: number = 1,
    public readonly pageSize: number = 20
  ) {}
}

// クエリハンドラー
class GetOrderDetailsHandler {
  constructor(private readDb: Pool) {}

  async handle(query: GetOrderDetailsQuery): Promise<OrderDetailsDto> {
    const result = await this.readDb.query(
      `SELECT o.*, json_agg(i.*) as items
       FROM order_read_model o
       JOIN order_items_read_model i ON o.id = i.order_id
       WHERE o.id = $1
       GROUP BY o.id`,
      [query.orderId]
    );

    if (result.rows.length === 0) {
      throw new NotFoundError(`Order ${query.orderId} not found`);
    }

    return this.mapToDto(result.rows[0]);
  }
}

// 最終的一貫性(Eventual Consistency)
// リードモデルはイベント処理後に更新されるため、若干の遅延があり得る
class OrderQueryService {
  async getOrderWithConsistencyCheck(
    orderId: string,
    expectedVersion?: number
  ): Promise<OrderDetailsDto> {
    const order = await this.getOrderDetails(orderId);

    // 特定バージョンが必要な場合確認
    if (expectedVersion && order.version < expectedVersion) {
      // プロジェクションがまだ追いついていない場合
      // 少し待機して再試行、またはCommand側から直接クエリ
      await this.waitForProjection(orderId, expectedVersion);
      return this.getOrderDetails(orderId);
    }

    return order;
  }
}

5. Sagaパターン

Sagaパターンは分散トランザクションをローカルトランザクションのシーケンスで管理します。

5.1 Choreography vs Orchestration

Choreography(振り付け方式):
サービスがイベントを通じて自律的に協力

Order Service ──OrderPlaced──> Payment Service
                               ──PaymentCompleted──> Inventory Service
                                                     ──InventoryReserved──> Shipping Service

利点:低結合、単一障害点なし
欠点:複雑なワークフローの追跡が困難

─────────────────────────────────────────────────

Orchestration(指揮方式):
中央のオーケストレーターがワークフローを制御

                   ┌──────────────────┐
                   │  Order Saga      │
                   │  Orchestrator    │
                   └───────┬──────────┘
          ┌────────────────┼────────────────┐
          │                │                │
   ┌──────▼──────┐  ┌─────▼───────┐  ┌─────▼───────┐
   │  Payment    │  │  Inventory  │  │  Shipping   │
   │  Service    │  │  Service    │  │  Service    │
   └─────────────┘  └─────────────┘  └─────────────┘

利点:フロー追跡が容易、テスト可能
欠点:オーケストレーターが単一障害点

5.2 Orchestration Saga実装

// Saga状態
enum SagaState {
  STARTED = 'STARTED',
  PAYMENT_PENDING = 'PAYMENT_PENDING',
  INVENTORY_PENDING = 'INVENTORY_PENDING',
  SHIPPING_PENDING = 'SHIPPING_PENDING',
  COMPLETED = 'COMPLETED',
  COMPENSATING = 'COMPENSATING',
  COMPENSATED = 'COMPENSATED',
  FAILED = 'FAILED',
}

// 注文処理Saga
class OrderProcessingSaga {
  private state: SagaState = SagaState.STARTED;
  private orderId!: string;
  private paymentId?: string;
  private reservationId?: string;

  constructor(
    private sagaStore: SagaStore,
    private commandBus: CommandBus
  ) {}

  // 開始:注文が作成された時
  async start(event: OrderPlaced): Promise<void> {
    this.orderId = event.orderId;
    this.state = SagaState.PAYMENT_PENDING;
    await this.sagaStore.save(this);

    // Step 1: 決済リクエスト
    await this.commandBus.dispatch(new ProcessPaymentCommand(
      event.orderId, event.totalAmount
    ));
  }

  async handlePaymentCompleted(event: PaymentCompleted): Promise<void> {
    this.paymentId = event.paymentId;
    this.state = SagaState.INVENTORY_PENDING;
    await this.sagaStore.save(this);

    // Step 2: 在庫予約リクエスト
    await this.commandBus.dispatch(new ReserveInventoryCommand(
      this.orderId, event.paymentId
    ));
  }

  async handlePaymentFailed(event: PaymentFailed): Promise<void> {
    this.state = SagaState.COMPENSATING;
    await this.sagaStore.save(this);

    // 補償:注文キャンセル
    await this.commandBus.dispatch(new CancelOrderCommand(
      this.orderId, 'Payment failed'
    ));

    this.state = SagaState.COMPENSATED;
    await this.sagaStore.save(this);
  }

  async handleInventoryReserved(event: InventoryReserved): Promise<void> {
    this.reservationId = event.reservationId;
    this.state = SagaState.SHIPPING_PENDING;
    await this.sagaStore.save(this);

    // Step 3: 配送リクエスト
    await this.commandBus.dispatch(new CreateShipmentCommand(
      this.orderId, this.reservationId
    ));
  }

  async handleInventoryFailed(event: InventoryReservationFailed): Promise<void> {
    this.state = SagaState.COMPENSATING;
    await this.sagaStore.save(this);

    // 補償:決済返金後、注文キャンセル
    await this.commandBus.dispatch(new RefundPaymentCommand(
      this.orderId, this.paymentId!
    ));
    await this.commandBus.dispatch(new CancelOrderCommand(
      this.orderId, 'Inventory unavailable'
    ));

    this.state = SagaState.COMPENSATED;
    await this.sagaStore.save(this);
  }

  async handleShipmentCreated(event: ShipmentCreated): Promise<void> {
    this.state = SagaState.COMPLETED;
    await this.sagaStore.save(this);

    // 注文確定
    await this.commandBus.dispatch(new ConfirmOrderCommand(this.orderId));
  }
}

6. Outboxパターン

6.1 二重書き込み問題

データベース更新とメッセージ発行を原子的に実行する必要があります。Outboxパターンはこの問題を解決します。

問題シナリオ:
1. DBに注文保存 ✓
2. Kafkaにイベント発行 ✗  ← 失敗するとデータ不整合!

Outboxパターン:
1. DBトランザクション内で:
   - 注文保存 ✓
   - outboxテーブルにイベント保存 ✓
2. 別プロセスがoutboxから読み取りKafkaに発行

6.2 Outboxテーブルスキーマ

CREATE TABLE outbox (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  aggregate_type VARCHAR(255) NOT NULL,
  aggregate_id VARCHAR(255) NOT NULL,
  event_type VARCHAR(255) NOT NULL,
  payload JSONB NOT NULL,
  created_at TIMESTAMP NOT NULL DEFAULT NOW(),
  published_at TIMESTAMP,
  retry_count INT DEFAULT 0,
  last_error TEXT
);

CREATE INDEX idx_outbox_unpublished ON outbox (created_at)
  WHERE published_at IS NULL;

6.3 Outbox実装

class OutboxService {
  async saveWithOutbox(
    order: Order,
    events: DomainEvent[],
    client: PoolClient
  ): Promise<void> {
    // 同一トランザクションで注文とイベントを保存
    await client.query(
      'INSERT INTO orders (id, customer_id, status, total) VALUES ($1, $2, $3, $4)',
      [order.id, order.customerId, order.status, order.total]
    );

    for (const event of events) {
      await client.query(
        `INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
         VALUES ($1, $2, $3, $4)`,
        ['Order', order.id, event.eventType, JSON.stringify(event)]
      );
    }
  }
}

// Outboxリレー:定期的にoutboxを読み取り発行
class OutboxRelay {
  async processOutbox(): Promise<void> {
    const client = await this.pool.connect();
    try {
      // 未発行イベントを読み取り
      const result = await client.query(
        `SELECT * FROM outbox
         WHERE published_at IS NULL AND retry_count < 5
         ORDER BY created_at ASC
         LIMIT 100
         FOR UPDATE SKIP LOCKED`
      );

      for (const row of result.rows) {
        try {
          await this.kafka.produce(
            `events.${row.aggregate_type.toLowerCase()}`,
            row.aggregate_id,
            row.payload
          );

          // 発行完了マーク
          await client.query(
            'UPDATE outbox SET published_at = NOW() WHERE id = $1',
            [row.id]
          );
        } catch (error) {
          // 失敗カウント増加
          await client.query(
            'UPDATE outbox SET retry_count = retry_count + 1, last_error = $1 WHERE id = $2',
            [error.message, row.id]
          );
        }
      }
    } finally {
      client.release();
    }
  }
}

6.4 CDC(Change Data Capture)とDebezium

# Debeziumコネクタ設定
{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "secret",
    "database.dbname": "orderdb",
    "table.include.list": "public.outbox",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.topic.replacement": "events.orders"
  }
}

7. 冪等性(Idempotency)

7.1 なぜ冪等性が重要か

イベント駆動システムではメッセージが重複配信される可能性があります(at-least-once delivery)。同じイベントを複数回処理しても結果が同じになるよう冪等性を保証する必要があります。

7.2 冪等性キー実装

class IdempotencyStore {
  async isProcessed(idempotencyKey: string): Promise<boolean> {
    const result = await this.pool.query(
      'SELECT 1 FROM processed_events WHERE idempotency_key = $1',
      [idempotencyKey]
    );
    return result.rows.length > 0;
  }

  async markProcessed(
    idempotencyKey: string,
    result: unknown,
    client: PoolClient
  ): Promise<void> {
    await client.query(
      `INSERT INTO processed_events (idempotency_key, result, processed_at)
       VALUES ($1, $2, NOW())
       ON CONFLICT (idempotency_key) DO NOTHING`,
      [idempotencyKey, JSON.stringify(result)]
    );
  }
}

// 冪等性保証イベントハンドラー
class IdempotentEventHandler<T extends DomainEvent> {
  constructor(
    private idempotencyStore: IdempotencyStore,
    private innerHandler: EventHandler<T>
  ) {}

  async handle(event: T): Promise<void> {
    const key = `${event.eventType}:${event.eventId}`;

    // 既に処理済みかチェック
    if (await this.idempotencyStore.isProcessed(key)) {
      console.log(`Event ${key} already processed, skipping`);
      return;
    }

    const client = await this.pool.connect();
    try {
      await client.query('BEGIN');

      // イベント処理
      await this.innerHandler.handle(event);

      // 処理済みマーク
      await this.idempotencyStore.markProcessed(key, null, client);

      await client.query('COMMIT');
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }
}

7.3 冪等性戦略

戦略説明
冪等性キー処理済みイベントIDを保存イベントIDベースの重複排除
条件付き更新条件が満たされた場合のみ更新WHERE version = expected
UpsertINSERT ON CONFLICT DO UPDATE自然な冪等性
重複排除ログ処理済みメッセージを記録コンシューマー側の重複排除テーブル

8. 最終的一貫性(Eventual Consistency)

8.1 一貫性モデル

強い一貫性:
Writer ──Write──> DB ──Read──> Reader
                  └── 常に最新を参照 ──┘

最終的一貫性:
Writer ──Write──> Write DB ──Event──> Read DB ──Read──> Reader
                                      └── 一時的に古いデータが見える可能性 ──┘

8.2 最終的一貫性の対処法

// 戦略1:楽観的UIアップデート
class OrderController {
  async placeOrder(req: Request, res: Response): Promise<void> {
    const orderId = await this.commandBus.dispatch(
      new PlaceOrderCommand(req.body)
    );

    // orderIdを即座に返却
    res.json({
      orderId,
      status: 'PROCESSING',
      message: '注文を処理中です'
    });
  }
}

// 戦略2:バージョンベースの一貫性チェック
class OrderApiController {
  async getOrder(req: Request, res: Response): Promise<void> {
    const minVersion = req.headers['x-min-version']
      ? parseInt(req.headers['x-min-version'] as string)
      : undefined;

    const order = await this.queryService.getOrderWithConsistencyCheck(
      req.params.orderId,
      minVersion
    );

    res.json(order);
  }
}

// 戦略3:サブスクリプションベースの更新
class OrderEventsWebSocket {
  handleConnection(client: WebSocket, orderId: string): void {
    this.eventBus.subscribe('order.*', async (event: DomainEvent) => {
      if (event.aggregateId === orderId) {
        client.send(JSON.stringify({
          type: event.eventType,
          data: event,
        }));
      }
    });
  }
}

9. スキーマ進化と互換性

9.1 イベントバージョニング

// スキーマ進化戦略
interface OrderPlacedV1 {
  orderId: string;
  customerId: string;
  totalAmount: number;
}

interface OrderPlacedV2 extends OrderPlacedV1 {
  currency: string;        // 新フィールド(デフォルト: 'USD')
  shippingMethod: string;  // 新フィールド(デフォルト: 'STANDARD')
}

// イベントアップキャスター:旧バージョンを新バージョンに変換
class OrderPlacedUpcaster {
  upcast(event: OrderPlacedV1): OrderPlacedV2 {
    return {
      ...event,
      currency: 'USD',
      shippingMethod: 'STANDARD',
    };
  }
}

9.2 スキーマ互換性タイプ

タイプ説明アップグレード戦略
BACKWARD新スキーマで旧データを読み取りコンシューマーを先にアップグレード
FORWARD旧スキーマで新データを読み取りプロデューサーを先にアップグレード
FULL双方向互換最も安全、推奨

10. Kafka + TypeScript実践実装

10.1 Kafka Producer

import { Kafka, Partitioners } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
  retry: { retries: 5 },
});

const producer = kafka.producer({
  createPartitioner: Partitioners.DefaultPartitioner,
  idempotent: true,
  maxInFlightRequests: 5,
  transactionalId: 'order-producer-1',
});

async function publishOrderEvent(event: OrderPlaced): Promise<void> {
  await producer.send({
    topic: 'events.orders',
    messages: [
      {
        key: event.orderId,
        value: JSON.stringify(event),
        headers: {
          'event-type': event.eventType,
          'event-id': event.eventId,
          'correlation-id': event.metadata.correlationId || '',
        },
      },
    ],
  });
}

10.2 Kafka Consumer

const consumer = kafka.consumer({
  groupId: 'payment-service-group',
  sessionTimeout: 30000,
  heartbeatInterval: 3000,
});

async function startConsumer(): Promise<void> {
  await consumer.connect();
  await consumer.subscribe({
    topics: ['events.orders'],
    fromBeginning: false,
  });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const event = JSON.parse(message.value!.toString());
      const eventType = message.headers?.['event-type']?.toString();

      console.log(`Received: ${eventType} from ${topic}[${partition}]`);

      try {
        switch (eventType) {
          case 'order.placed':
            await paymentHandler.onOrderPlaced(event);
            break;
          case 'order.cancelled':
            await paymentHandler.onOrderCancelled(event);
            break;
          default:
            console.log(`Unknown event type: ${eventType}`);
        }
      } catch (error) {
        console.error(`Failed to process event: ${error}`);
        // DLQ処理またはリトライ
      }
    },
  });
}

10.3 Dead Letter Queue(DLQ)パターン

class DeadLetterQueueHandler {
  private dlqProducer: Producer;

  async handleFailedMessage(
    originalTopic: string,
    message: KafkaMessage,
    error: Error
  ): Promise<void> {
    await this.dlqProducer.send({
      topic: `${originalTopic}.dlq`,
      messages: [
        {
          key: message.key,
          value: message.value,
          headers: {
            ...message.headers,
            'dlq-reason': error.message,
            'dlq-timestamp': new Date().toISOString(),
            'original-topic': originalTopic,
            'retry-count': String(
              parseInt(message.headers?.['retry-count']?.toString() || '0') + 1
            ),
          },
        },
      ],
    });
  }
}

11. RabbitMQ実装

11.1 Exchange種類とルーティング

Exchange種類:
├── Direct Exchange
│   └── 完全一致するルーティングキーでルーティング
│       用途:特定サービスへのターゲティング
├── Topic Exchange
│   └── ルーティングキーパターン(*.order.#)でルーティング
│       用途:柔軟なイベントルーティング
├── Fanout Exchange
│   └── バインドされた全キューにブロードキャスト
│       用途:全コンシューマーへのイベント通知
└── Headers Exchange
    └── メッセージヘッダーマッチングでルーティング
        用途:複雑なルーティングロジック

11.2 RabbitMQ + Node.js

import amqp from 'amqplib';

class RabbitMQEventBus {
  private connection!: amqp.Connection;
  private channel!: amqp.Channel;

  async connect(): Promise<void> {
    this.connection = await amqp.connect('amqp://localhost');
    this.channel = await this.connection.createChannel();

    // Exchange宣言
    await this.channel.assertExchange('domain-events', 'topic', {
      durable: true,
    });
  }

  async publish(event: DomainEvent): Promise<void> {
    const routingKey = `events.${event.eventType}`;
    this.channel.publish(
      'domain-events',
      routingKey,
      Buffer.from(JSON.stringify(event)),
      {
        persistent: true,
        messageId: event.eventId,
        contentType: 'application/json',
        headers: {
          'event-type': event.eventType,
          'aggregate-id': event.aggregateId,
        },
      }
    );
  }

  async subscribe(pattern: string, handler: (event: DomainEvent) => Promise<void>): Promise<void> {
    const queue = await this.channel.assertQueue('', { exclusive: true });
    await this.channel.bindQueue(queue.queue, 'domain-events', pattern);

    await this.channel.consume(queue.queue, async (msg) => {
      if (!msg) return;
      try {
        const event = JSON.parse(msg.content.toString());
        await handler(event);
        this.channel.ack(msg);
      } catch (error) {
        // 拒否して再キューまたはDLQへ
        this.channel.nack(msg, false, false);
      }
    });
  }
}

12. よくあるアンチパターン

12.1 過度に細分化されたイベント

過度に細粒度のイベントはシステムの複雑性を増加させます。ビジネス意味の単位でイベントを設計しましょう。

12.2 イベントによる強結合

イベントに過度なデータを含めると、コンシューマーがプロデューサーの内部構造に依存します。イベントは公開契約なので慎重に設計する必要があります。

12.3 同期的イベント処理

イベントハンドラーで他のサービスを同期的に呼び出すと、EDAの利点が失われます。非同期処理を維持しましょう。

12.4 冪等性の無視

「at-least-once」配信保証を考慮しないと、重複処理によるデータ不整合が発生します。

12.5 Correlation IDの欠如

Correlation IDがないと、複数サービスにまたがるリクエストの追跡がほぼ不可能になります。常にイベントを通じてCorrelation IDを伝播させましょう。

13. EDAパターン選択ガイド

シナリオ推奨パターン
監査証跡が必要Event Sourcing
読み/書きの比率が偏っているCQRS
複数サービスにまたがるトランザクションSagaパターン
DBとメッセージブローカーの原子性Outboxパターン
高スループットのイベントストリーミングKafka
複雑なルーティングロジックRabbitMQ
シンプルなイベント通知Event Notification + SNS/SQS

14. クイズ

Q1: Event Sourcingでスナップショットが必要な理由は何ですか?

正解:イベント数が増えるほどAggregate復元時間が長くなるため

Event SourcingでAggregateの現在の状態を得るには、全てのイベントを最初から再生する必要があります。イベントが数千、数万になると復元時間が急激に増加します。スナップショットは特定時点のAggregate状態を保存し、その時点以降のイベントのみ再生すれば良いように最適化します。

Q2: Choreography SagaとOrchestration Sagaの長所・短所は何ですか?

正解:

Choreography: 長所 - シンプルなワークフローに適合、サービス間の結合度が低い、単一障害点がない。短所 - 複雑なワークフローで追跡が困難、循環依存の発生可能性、テストが困難。

Orchestration: 長所 - 複雑なワークフローに適合、全体フローを一箇所で把握可能、テスト容易。短所 - オーケストレーターが単一障害点、オーケストレーターにロジック集中の危険性。

Q3: Outboxパターンが解決する「二重書き込み問題」とは何ですか?

正解:データベースとメッセージブローカーに同時に書き込む際に原子性を保証できない問題

DBにデータを保存しKafkaにイベントを発行する二つの操作は異なるシステムなので、一つのトランザクションにまとめることができません。DB保存は成功したがKafka発行が失敗するとデータ不整合が発生します。Outboxパターンはイベントを同じDBトランザクション内のoutboxテーブルに保存し、別プロセスがこれを読み取りブローカーに発行することで原子性を保証します。

Q4: CQRSにおける「最終的一貫性(Eventual Consistency)」の意味と対処法は?

正解:書き込みモデルの変更が読み取りモデルに即座に反映されないこと

CQRSではCommand側の変更事項がイベントを通じてQuery側に非同期的に伝播されます。したがって、書き込み直後に読み取りをすると以前の状態が見える可能性があります。対処法としては、UIで楽観的アップデートを適用するか、特定操作後にCommand側から直接クエリするか、バージョンベースの条件付きクエリを使用する方法があります。

Q5: 冪等性がイベント駆動システムで重要な理由は?

正解:「at-least-once」配信保証により同一イベントが重複配信される可能性があるため

大部分のメッセージブローカーは「at-least-once」配信を保証します。ネットワーク問題やコンシューマーの再起動などで同じイベントが複数回配信される可能性があります。冪等性が保証されないと、同じ決済が二重処理されたり、在庫が重複差引されるなどの問題が発生します。冪等性キー、重複排除、条件付き更新などでこれを防止します。

15. 参考資料

  1. Martin Fowler - Event Sourcing - イベントソーシング概念
  2. Microsoft - CQRS Pattern - CQRSパターンガイド
  3. Microservices.io - Saga Pattern - Sagaパターン詳細
  4. Debezium Documentation - CDCツール公式ドキュメント
  5. KafkaJS Documentation - Node.js Kafkaクライアント
  6. Apache Kafka Documentation - Kafka公式ドキュメント
  7. Confluent Schema Registry - スキーマレジストリ
  8. EventStoreDB - 専用イベントストアデータベース
  9. Axon Framework - Java CQRS/ESフレームワーク
  10. NestJS CQRS - NestJS CQRSレシピ
  11. Domain-Driven Design Reference - DDDリファレンス
  12. Microservices Patterns by Chris Richardson - マイクロサービスパターンカタログ
  13. RabbitMQ Documentation - RabbitMQ公式ドキュメント