Skip to content
Published on

イベント駆動アーキテクチャ設計パターン — Kafka、CQRS、Event Sourcing 実践ガイド

Authors
  • Name
    Twitter
イベント駆動アーキテクチャ

はじめに

マイクロサービスが増えると、サービス間の通信が複雑になります。同期(Sync)呼び出しで結合されたサービスは、1つが停止すると連鎖障害が発生します。イベント駆動アーキテクチャ(EDA)は、この問題を非同期イベントベースで解決します。本記事では、Kafkaを中心にEDAのコアパターンであるCQRSとEvent Sourcingを実践コードとともに解説します。


1. イベント駆動アーキテクチャ概要

同期 vs 非同期通信

graph LR
    subgraph "同期方式 (REST)"
        A[注文サービス] -->|HTTP呼び出し| B[決済サービス]
        B -->|HTTP呼び出し| C[在庫サービス]
        C -->|HTTP呼び出し| D[通知サービス]
    end

同期方式では、決済サービスが停止すると注文も失敗します。すべてのサービスが強く結合されています。

graph LR
    subgraph "非同期方式 (Event-Driven)"
        A2[注文サービス] -->|イベント発行| K[Kafka]
        K -->|購読| B2[決済サービス]
        K -->|購読| C2[在庫サービス]
        K -->|購読| D2[通知サービス]
    end

非同期方式では、注文サービスはイベントを発行するだけです。各サービスが独立してイベントを消費します。

EDAのコア構成要素

構成要素役割
Producerイベントを生成し発行する注文サービス
Event Brokerイベントを保存し配信するApache Kafka
Consumerイベントを購読し処理する決済、在庫、通知サービス
Event発生した事実を表すメッセージOrderCreated, PaymentCompleted

2. Apache Kafka 基礎

Kafka アーキテクチャ

graph TB
    P1[Producer 1] --> T[Topic: orders]
    P2[Producer 2] --> T
    T --> PA0[Partition 0]
    T --> PA1[Partition 1]
    T --> PA2[Partition 2]
    PA0 --> C1[Consumer Group A - Consumer 1]
    PA1 --> C2[Consumer Group A - Consumer 2]
    PA2 --> C3[Consumer Group A - Consumer 3]
    PA0 --> C4[Consumer Group B - Consumer 1]
    PA1 --> C4
    PA2 --> C4

主要概念

  • Topic: イベントが保存される論理チャネル(例:orderspayments
  • Partition: Topic内の並列処理単位。パーティションキーで順序を保証
  • Consumer Group: 同じグループのコンシューマーはパーティションを分配されて並列処理
  • Offset: 各パーティション内のメッセージの通し番号。コンシューマーがどこまで読んだかを追跡

Kafka Producer 実装 (Python)

from confluent_kafka import Producer
import json

config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'client.id': 'order-service',
    'acks': 'all',  # すべてのレプリカへの書き込みを確認
    'retries': 3,
    'retry.backoff.ms': 1000,
}

producer = Producer(config)

def publish_order_event(order: dict):
    """注文イベントを発行"""
    event = {
        'event_type': 'OrderCreated',
        'event_id': str(uuid.uuid4()),
        'timestamp': datetime.utcnow().isoformat(),
        'data': {
            'order_id': order['id'],
            'user_id': order['user_id'],
            'items': order['items'],
            'total_amount': order['total'],
        }
    }

    # パーティションキー = user_id → 同じユーザーの注文は同じパーティションへ
    producer.produce(
        topic='orders',
        key=str(order['user_id']).encode('utf-8'),
        value=json.dumps(event).encode('utf-8'),
        callback=delivery_callback,
    )
    producer.flush()

def delivery_callback(err, msg):
    if err:
        print(f'送信失敗: {err}')
    else:
        print(f'送信成功: {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

Kafka Consumer 実装 (Python)

from confluent_kafka import Consumer

config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'group.id': 'payment-service',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,  # 手動コミットでat-least-onceを保証
}

consumer = Consumer(config)
consumer.subscribe(['orders'])

def process_events():
    """イベント消費ループ"""
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f'Error: {msg.error()}')
            continue

        event = json.loads(msg.value().decode('utf-8'))

        try:
            if event['event_type'] == 'OrderCreated':
                handle_order_created(event['data'])

            # 処理成功後に手動コミット
            consumer.commit(msg)

        except Exception as e:
            print(f'処理失敗: {e}')
            # DLQ(Dead Letter Queue)へ送信
            send_to_dlq(event, str(e))

def handle_order_created(data: dict):
    """注文作成イベント処理 → 決済開始"""
    payment = create_payment(
        order_id=data['order_id'],
        amount=data['total_amount'],
    )
    # 決済完了イベントを発行
    publish_payment_event(payment)

3. CQRS (Command Query Responsibility Segregation)

CQRSは**書き込み(Command)読み取り(Query)**のモデルを分離するパターンです。

CQRSアーキテクチャフロー

graph TB
    Client[クライアント]

    Client -->|Command| CmdAPI[Command API]
    Client -->|Query| QryAPI[Query API]

    CmdAPI --> WriteDB[(Write DB - PostgreSQL)]
    CmdAPI -->|イベント発行| Kafka[Kafka]

    Kafka -->|イベント消費| Projector[Projector / Denormalizer]
    Projector --> ReadDB[(Read DB - Elasticsearch/Redis)]

    QryAPI --> ReadDB

なぜCQRSか?

従来の方式CQRS
1つのモデルで読み書き読み/書きモデルを分離
複雑なJOINで読み取り性能が低下読み取りに最適化された非正規化モデル
スケーリングが困難読み/書きを独立してスケーリング
シンプルなドメインに適している複雑なドメインに適している

CQRS 実装例

# === Command Side(書き込み)===
class OrderCommandHandler:
    def __init__(self, db, event_publisher):
        self.db = db
        self.publisher = event_publisher

    def create_order(self, cmd: CreateOrderCommand):
        """注文作成コマンドの処理"""
        # ビジネスロジックのバリデーション
        if not self._validate_stock(cmd.items):
            raise InsufficientStockError()

        # Write DBに保存
        order = Order(
            id=uuid.uuid4(),
            user_id=cmd.user_id,
            items=cmd.items,
            status='CREATED',
        )
        self.db.save(order)

        # イベント発行(非同期でRead Modelを更新)
        self.publisher.publish('orders', OrderCreatedEvent(
            order_id=order.id,
            user_id=order.user_id,
            items=order.items,
            total=order.total,
            created_at=order.created_at,
        ))
        return order.id


# === Projector(イベント → Read Model 変換)===
class OrderProjector:
    def __init__(self, read_db):
        self.read_db = read_db  # Elasticsearch or Redis

    def handle_order_created(self, event: OrderCreatedEvent):
        """注文作成イベント → 読み取りモデルに非正規化して保存"""
        user = self.get_user_info(event.user_id)

        # 非正規化された読み取りモデル(JOIN不要!)
        order_view = {
            'order_id': str(event.order_id),
            'user_name': user.name,
            'user_email': user.email,
            'items': [
                {'name': item.name, 'qty': item.qty, 'price': item.price}
                for item in event.items
            ],
            'total': event.total,
            'status': 'CREATED',
            'created_at': event.created_at.isoformat(),
        }
        self.read_db.index('orders', order_view)


# === Query Side(読み取り)===
class OrderQueryHandler:
    def __init__(self, read_db):
        self.read_db = read_db

    def get_user_orders(self, user_id: str, page: int = 1):
        """ユーザーの注文一覧を取得(Read Modelから直接返却)"""
        return self.read_db.search('orders', {
            'query': {'term': {'user_id': user_id}},
            'sort': [{'created_at': 'desc'}],
            'from': (page - 1) * 20,
            'size': 20,
        })

4. Event Sourcing — イベントがデータそのもの

Event Sourcingは、現在の状態を保存する代わりに、状態変更イベントのシーケンスを保存するパターンです。

Event Sourcing フロー

sequenceDiagram
    participant Client
    participant OrderAggregate
    participant EventStore
    participant Projector
    participant ReadDB

    Client->>OrderAggregate: CreateOrder コマンド
    OrderAggregate->>EventStore: OrderCreated イベント保存
    EventStore->>Projector: イベント伝播
    Projector->>ReadDB: 読み取りモデル更新

    Client->>OrderAggregate: PayOrder コマンド
    OrderAggregate->>EventStore: OrderPaid イベント保存
    EventStore->>Projector: イベント伝播
    Projector->>ReadDB: 読み取りモデル更新

    Note over EventStore: イベントは絶対に削除・修正されない!

従来のCRUD vs Event Sourcing

# 従来のCRUD — 現在の状態のみ保存
orders テーブル:
| id | user_id | status    | total  |
|----|---------|-----------|--------|
| 1  | 100     | SHIPPED   | 50000  |

→ 「いつ注文して、いつ決済されて、いつ配送されたか?」わからない!

# Event Sourcing — すべての変更履歴を保存
events テーブル:
| seq | aggregate_id | type           | data                    | timestamp           |
|-----|-------------|----------------|-------------------------|---------------------|
| 1   | order-1     | OrderCreated   | {items: [...], total: 50000} | 2026-03-02 10:00 |
| 2   | order-1     | PaymentReceived| {method: "card"}        | 2026-03-02 10:05    |
| 3   | order-1     | OrderShipped   | {tracking: "KR123"}     | 2026-03-02 14:30    |

→ 全履歴を再構成可能!

Event Sourcing 実装

from dataclasses import dataclass, field
from datetime import datetime
from typing import List
import json

# === イベント定義 ===
@dataclass
class DomainEvent:
    event_id: str
    aggregate_id: str
    event_type: str
    data: dict
    timestamp: datetime
    version: int

# === Aggregate(ドメインオブジェクト)===
class OrderAggregate:
    def __init__(self, order_id: str):
        self.id = order_id
        self.status = None
        self.items = []
        self.total = 0
        self.version = 0
        self._pending_events: List[DomainEvent] = []

    # --- Command Handler ---
    def create(self, user_id: str, items: list):
        """注文作成コマンド"""
        if self.status is not None:
            raise Exception("既に作成済みの注文です")

        # イベント作成(状態変更なし、イベントのみ記録)
        self._apply_event(DomainEvent(
            event_id=str(uuid.uuid4()),
            aggregate_id=self.id,
            event_type='OrderCreated',
            data={'user_id': user_id, 'items': items,
                  'total': sum(i['price'] * i['qty'] for i in items)},
            timestamp=datetime.utcnow(),
            version=self.version + 1,
        ))

    def pay(self, payment_method: str):
        """決済コマンド"""
        if self.status != 'CREATED':
            raise Exception(f"決済できない状態: {self.status}")

        self._apply_event(DomainEvent(
            event_id=str(uuid.uuid4()),
            aggregate_id=self.id,
            event_type='OrderPaid',
            data={'payment_method': payment_method},
            timestamp=datetime.utcnow(),
            version=self.version + 1,
        ))

    # --- Event Handler(状態変更はここだけ!)---
    def _on_order_created(self, event: DomainEvent):
        self.status = 'CREATED'
        self.items = event.data['items']
        self.total = event.data['total']

    def _on_order_paid(self, event: DomainEvent):
        self.status = 'PAID'

    def _apply_event(self, event: DomainEvent):
        """イベント適用(状態変更 + イベント保存)"""
        handler = {
            'OrderCreated': self._on_order_created,
            'OrderPaid': self._on_order_paid,
        }.get(event.event_type)

        if handler:
            handler(event)
        self.version = event.version
        self._pending_events.append(event)

    @classmethod
    def from_events(cls, order_id: str, events: List[DomainEvent]):
        """イベントシーケンスから現在の状態を復元"""
        aggregate = cls(order_id)
        for event in events:
            aggregate._apply_event(event)
        aggregate._pending_events.clear()  # 既に保存済みのイベント
        return aggregate


# === Event Store ===
class EventStore:
    def __init__(self, db):
        self.db = db

    def save(self, aggregate: OrderAggregate):
        """未保存のイベントをEvent Storeに記録"""
        for event in aggregate._pending_events:
            self.db.execute(
                """INSERT INTO events
                   (event_id, aggregate_id, event_type, data, timestamp, version)
                   VALUES (%s, %s, %s, %s, %s, %s)""",
                (event.event_id, event.aggregate_id, event.event_type,
                 json.dumps(event.data), event.timestamp, event.version)
            )
        aggregate._pending_events.clear()

    def load(self, aggregate_id: str) -> OrderAggregate:
        """Event Storeからイベントを読み込みAggregateを復元"""
        rows = self.db.query(
            "SELECT * FROM events WHERE aggregate_id = %s ORDER BY version",
            (aggregate_id,)
        )
        events = [DomainEvent(**row) for row in rows]
        return OrderAggregate.from_events(aggregate_id, events)

5. Sagaパターン — 分散トランザクション管理

マイクロサービスにおいて複数サービスにまたがるトランザクションを管理するパターンです。

Choreography Saga(イベントベース)

sequenceDiagram
    participant Order as 注文サービス
    participant Kafka
    participant Payment as 決済サービス
    participant Stock as 在庫サービス
    participant Notify as 通知サービス

    Order->>Kafka: OrderCreated
    Kafka->>Payment: OrderCreated 受信
    Payment->>Kafka: PaymentCompleted
    Kafka->>Stock: PaymentCompleted 受信
    Stock->>Kafka: StockReserved
    Kafka->>Notify: StockReserved 受信
    Notify->>Notify: ユーザーに通知送信

    Note over Payment,Stock: 決済失敗の場合?
    Payment->>Kafka: PaymentFailed
    Kafka->>Order: PaymentFailed 受信
    Order->>Order: 注文キャンセル(補償トランザクション)

Orchestration Saga(オーケストレーターベース)

graph TB
    Orch[Saga Orchestrator]

    Orch -->|1. 注文作成| Order[注文サービス]
    Order -->|成功| Orch

    Orch -->|2. 決済要求| Payment[決済サービス]
    Payment -->|成功| Orch

    Orch -->|3. 在庫引当| Stock[在庫サービス]
    Stock -->|成功| Orch

    Orch -->|4. 通知送信| Notify[通知サービス]

    Stock -->|失敗| Orch
    Orch -->|補償: 決済取消| Payment
    Orch -->|補償: 注文取消| Order

2つの方式の比較

項目ChoreographyOrchestration
中央制御なし(各サービスが自律的)オーケストレーターが制御
結合度疎結合オーケストレーターに依存
デバッグ難しい(イベント追跡が必要)比較的容易
複雑性サービス数増加に伴い複雑化オーケストレーターが複雑化
適した場面シンプルなフロー(3-4ステップ)複雑なフロー(5+ステップ)

6. 実践的な考慮事項

冪等性 (Idempotency)

イベントが重複配信される可能性があるため、冪等性の保証が必須です:

def handle_payment_event(event: dict):
    """冪等性保証 — 同じイベントの重複処理を防止"""
    event_id = event['event_id']

    # 既に処理済みのイベントか確認
    if is_already_processed(event_id):
        logger.info(f'既に処理済みのイベント: {event_id}')
        return

    # ビジネスロジックの処理
    process_payment(event['data'])

    # 処理完了を記録
    mark_as_processed(event_id)

Dead Letter Queue (DLQ)

処理に失敗したイベントを別のキューに保管:

MAX_RETRIES = 3

def consume_with_retry(event: dict, retry_count: int = 0):
    try:
        handle_event(event)
    except Exception as e:
        if retry_count < MAX_RETRIES:
            # リトライ(exponential backoff)
            time.sleep(2 ** retry_count)
            consume_with_retry(event, retry_count + 1)
        else:
            # DLQへ送信
            producer.produce(
                topic='orders.dlq',
                value=json.dumps({
                    'original_event': event,
                    'error': str(e),
                    'failed_at': datetime.utcnow().isoformat(),
                }).encode('utf-8'),
            )

イベントスキーマの進化

# v1 イベント
{"event_type": "OrderCreated", "version": 1,
 "data": {"order_id": "123", "total": 50000}}

# v2 イベント(currencyフィールド追加)
{"event_type": "OrderCreated", "version": 2,
 "data": {"order_id": "123", "total": 50000, "currency": "KRW"}}

# アップキャスターでv1 → v2変換
def upcast_order_created_v1_to_v2(event: dict) -> dict:
    if event.get('version', 1) == 1:
        event['data']['currency'] = 'KRW'  # デフォルト値
        event['version'] = 2
    return event

7. EDA導入チェックリスト

  • サービス間の同期呼び出しが障害伝播を引き起こしているか?
  • 読み取り/書き込み比率が大きく異なるか? → CQRS検討
  • 監査ログ/履歴追跡が必要か? → Event Sourcing検討
  • イベントの冪等性処理戦略があるか?
  • DLQとモニタリング/アラートが設定されているか?
  • イベントのスキーマバージョン管理戦略があるか?
  • Sagaパターンで分散トランザクションを管理しているか?
  • イベント順序保証が必要な箇所にパーティションキーを設定したか?

クイズ

Q1: イベント駆動アーキテクチャの同期方式に対する最大の利点は? サービス間の疎結合(loose coupling)です。あるサービスの障害が他のサービスに伝播せず、各 サービスが独立してスケーリングできます。

Q2: Kafkaにおいて同じConsumer Group内のコンシューマーはどのように動作するか? 同じConsumer Group内では、各パーティションは1つのコンシューマーにのみ割り当てられます。これにより、 パーティション単位で並列処理しながらもパーティション内の順序が保証されます。

Q3: CQRSで読み/書きモデルを分離する理由は? 読み取りと書き込みの要件が異なるためです。書き込みは正規化されたモデルでデータ整合性を、読み取りは 非正規化されたモデルでクエリ性能をそれぞれ最適化できます。

Q4: Event Sourcingで現在の状態をどのように復元するか? Event Storeに保存されたイベントシーケンスを最初から順番にリプレイ(replay)して現在の状態を構成します。 Aggregateのfrom_events()メソッドがこの役割を担います。

Q5: Choreography SagaとOrchestration Sagaの主な違いは? Choreographyは中央制御なしに各サービスがイベントをやり取りしながら自律的に進行し、Orchestrationは 中央のオーケストレーターが各サービスに順番にコマンドを発行します。

Q6: イベント処理において冪等性(idempotency)が重要な理由は? ネットワークエラーやコンシューマーの再起動により、同じイベントが複数回配信される可能性があります。 冪等性を保証しなければ、重複決済や重複在庫引当などの問題が発生します。

Q7: Dead Letter Queue(DLQ)の役割は? 最大リトライ回数を超えて処理に失敗したイベントを別のキューに保管し、後で手動で確認して 再処理できるようにします。

Q8: Kafkaでパーティションキーをuser_idに設定する理由は? 同じuser_idを持つイベントが常に同じパーティションに入るため、そのユーザーのイベント順序が 保証されます。

Q9: Event Sourcingのイベントを修正または削除できるか? 原則としてできません。イベントは不変(immutable)であり、状態を変更するには新しい補償イベントを 追加する必要があります。これがEvent Sourcingの核心原則です。

Q10: CQRSの欠点は? 読み/書きモデル間のデータ整合性が即時ではない結果整合性(eventual consistency)モデルとなり、 システム複雑性が増加します。シンプルなCRUDアプリケーションには過度な設計になる場合があります。