Skip to content
Published on

イベント駆動アーキテクチャ + CQRS + Sagaパターン: MSA実践設計ガイド

Authors
  • Name
    Twitter

1. なぜイベント駆動アーキテクチャなのか

モノリスからMSAへ移行すると、分散トランザクションの問題が発生します:

graph LR
    A[注文サービス] -->|HTTP同期呼び出し| B[決済サービス]
    B -->|HTTP同期呼び出し| C[在庫サービス]
    C -->|HTTP同期呼び出し| D[配送サービス]
    style A fill:#f96,stroke:#333
    style D fill:#6f9,stroke:#333

同期方式の問題点:

  • 一つでも失敗すると全体が失敗(カスケード障害)
  • サービス間の強結合
  • 呼び出しチェーンが長くなるほどレイテンシ増加
  • 部分的な失敗時にロールバックが困難
graph LR
    A[注文サービス] -->|イベント発行| K[Kafka]
    K -->|イベント消費| B[決済サービス]
    K -->|イベント消費| C[在庫サービス]
    K -->|イベント消費| D[配送サービス]
    style K fill:#ff9,stroke:#333

イベント駆動のメリット:

  • サービス間の疎結合(loose coupling)
  • 非同期処理によるレスポンスタイム短縮
  • 独立したスケーリングが可能
  • イベントリプレイによる状態復旧が可能

2. CQRS(Command Query Responsibility Segregation)

2.1 コアコンセプト

**コマンド(Command)クエリ(Query)**を分離するパターン:

graph TB
    Client -->|Command: 注文作成| CmdAPI[Command API]
    Client -->|Query: 注文照会| QryAPI[Query API]

    CmdAPI --> WriteDB[(Write DB<br/>PostgreSQL)]
    WriteDB -->|イベント発行| EventBus[Kafka]
    EventBus -->|イベント消費| Projector[Projector]
    Projector --> ReadDB[(Read DB<br/>Elasticsearch)]
    QryAPI --> ReadDB

    style CmdAPI fill:#f96,stroke:#333
    style QryAPI fill:#6f9,stroke:#333
    style EventBus fill:#ff9,stroke:#333

2.2 なぜ分離するのか?

側面Command(書き込み)Query(読み取り)
モデル正規化、整合性重視非正規化、パフォーマンス重視
スケーリング垂直スケーリング水平スケーリング(レプリケーション)
DBPostgreSQL、MySQLElasticsearch、Redis、MongoDB
最適化トランザクション安全性読み取りパフォーマンス
比率全体の10〜20%全体の80〜90%

2.3 実装例

# Command Side
from dataclasses import dataclass
from datetime import datetime
import uuid

@dataclass
class CreateOrderCommand:
    customer_id: str
    items: list[dict]
    total_amount: float

class OrderCommandHandler:
    def __init__(self, repo, event_publisher):
        self.repo = repo
        self.event_publisher = event_publisher

    def handle_create_order(self, cmd: CreateOrderCommand):
        order = Order(
            id=str(uuid.uuid4()),
            customer_id=cmd.customer_id,
            items=cmd.items,
            total_amount=cmd.total_amount,
            status="PENDING",
            created_at=datetime.utcnow()
        )

        # Write DBに保存
        self.repo.save(order)

        # イベント発行
        self.event_publisher.publish("order.created", {
            "order_id": order.id,
            "customer_id": order.customer_id,
            "total_amount": order.total_amount,
            "items": order.items,
        })
        return order.id

# Query Side
class OrderQueryService:
    def __init__(self, read_repo):
        self.read_repo = read_repo  # Elasticsearch

    def get_order(self, order_id: str):
        return self.read_repo.find_by_id(order_id)

    def search_orders(self, customer_id: str, status: str = None):
        return self.read_repo.search(customer_id=customer_id, status=status)

# Projector(イベント → Read Model同期)
class OrderProjector:
    def __init__(self, read_repo):
        self.read_repo = read_repo

    def on_order_created(self, event):
        self.read_repo.upsert({
            "id": event["order_id"],
            "customer_id": event["customer_id"],
            "total_amount": event["total_amount"],
            "status": "PENDING",
            "items": event["items"],
        })

    def on_order_paid(self, event):
        self.read_repo.update(event["order_id"], {"status": "PAID"})

3. Sagaパターン: 分散トランザクション管理

3.1 Choreography vs Orchestration

graph TB
    subgraph "Choreography(イベント駆動)"
        O1[注文] -->|OrderCreated| P1[決済]
        P1 -->|PaymentCompleted| S1[在庫]
        S1 -->|StockReserved| D1[配送]
        D1 -->|DeliveryStarted| O1
    end
graph TB
    subgraph "Orchestration(中央調整)"
        Orch[Saga Orchestrator]
        Orch -->|1. 決済リクエスト| P2[決済]
        P2 -->|決済完了| Orch
        Orch -->|2. 在庫予約| S2[在庫]
        S2 -->|予約完了| Orch
        Orch -->|3. 配送リクエスト| D2[配送]
        D2 -->|配送開始| Orch
    end
方式メリットデメリット
Choreographyシンプル、疎結合フロー把握が困難、循環依存のリスク
Orchestrationフローが明確、管理容易中央集中、Orchestratorが単一障害点

3.2 Orchestration Sagaの実装

from enum import Enum
from kafka import KafkaProducer, KafkaConsumer
import json

class SagaStep(Enum):
    PAYMENT = "payment"
    INVENTORY = "inventory"
    DELIVERY = "delivery"

class OrderSagaOrchestrator:
    STEPS = [SagaStep.PAYMENT, SagaStep.INVENTORY, SagaStep.DELIVERY]

    def __init__(self, producer: KafkaProducer):
        self.producer = producer
        self.saga_state = {}  # order_id -> {step, status}

    def start_saga(self, order_id: str, order_data: dict):
        self.saga_state[order_id] = {
            "current_step": 0,
            "data": order_data,
            "completed_steps": [],
        }
        self._execute_step(order_id)

    def _execute_step(self, order_id: str):
        state = self.saga_state[order_id]
        step_idx = state["current_step"]

        if step_idx >= len(self.STEPS):
            # すべてのステップ完了
            self._publish(f"order.completed", {"order_id": order_id})
            return

        step = self.STEPS[step_idx]
        self._publish(f"{step.value}.request", {
            "order_id": order_id,
            "saga_id": order_id,
            **state["data"]
        })

    def handle_step_success(self, order_id: str, step: SagaStep):
        state = self.saga_state[order_id]
        state["completed_steps"].append(step)
        state["current_step"] += 1
        self._execute_step(order_id)

    def handle_step_failure(self, order_id: str, failed_step: SagaStep):
        """補償トランザクション実行(逆順)"""
        state = self.saga_state[order_id]

        for step in reversed(state["completed_steps"]):
            self._publish(f"{step.value}.compensate", {
                "order_id": order_id,
            })

        self._publish("order.failed", {
            "order_id": order_id,
            "failed_at": failed_step.value,
        })

    def _publish(self, topic: str, data: dict):
        self.producer.send(topic, json.dumps(data).encode())

3.3 補償トランザクションの例

sequenceDiagram
    participant O as Orchestrator
    participant P as 決済サービス
    participant I as 在庫サービス
    participant D as 配送サービス

    O->>P: 1. 決済リクエスト
    P-->>O: 決済成功
    O->>I: 2. 在庫予約
    I-->>O: 在庫予約成功
    O->>D: 3. 配送リクエスト
    D-->>O: 配送失敗(住所エラー)

    Note over O: 補償トランザクション開始(逆順)
    O->>I: 在庫予約取消
    I-->>O: 取消完了
    O->>P: 決済返金
    P-->>O: 返金完了
    O->>O: 注文失敗処理

4. Event Sourcing

4.1 コンセプト

状態を直接保存する代わりに、状態変更イベントを保存する:

従来の方式:
  ordersテーブル: {id: 1, status: "SHIPPED", amount: 50000}

Event Sourcing:
  eventsテーブル:
    1. OrderCreated {amount: 50000}
    2. PaymentReceived {payment_id: "pay_123"}
    3. StockReserved {warehouse: "seoul"}
    4. OrderShipped {tracking: "KR12345"}

  → イベントを順番にリプレイすれば現在の状態を復元

4.2 実装

class EventStore:
    def __init__(self, db):
        self.db = db

    def append(self, aggregate_id: str, event_type: str, data: dict, version: int):
        self.db.execute("""
            INSERT INTO events (aggregate_id, event_type, data, version, created_at)
            VALUES (%s, %s, %s, %s, NOW())
        """, (aggregate_id, event_type, json.dumps(data), version))

    def get_events(self, aggregate_id: str, after_version: int = 0):
        return self.db.query("""
            SELECT event_type, data, version
            FROM events
            WHERE aggregate_id = %s AND version > %s
            ORDER BY version
        """, (aggregate_id, after_version))

class Order:
    def __init__(self):
        self.id = None
        self.status = None
        self.version = 0
        self._pending_events = []

    def apply_event(self, event_type: str, data: dict):
        if event_type == "OrderCreated":
            self.id = data["order_id"]
            self.status = "PENDING"
        elif event_type == "PaymentReceived":
            self.status = "PAID"
        elif event_type == "OrderShipped":
            self.status = "SHIPPED"
        self.version += 1

    @classmethod
    def from_events(cls, events):
        order = cls()
        for e in events:
            order.apply_event(e["event_type"], e["data"])
        return order

5. Kafkaベースの全体アーキテクチャ

graph TB
    Client[クライアント] --> Gateway[API Gateway]

    Gateway --> OrderCmd[注文 Command API]
    Gateway --> OrderQry[注文 Query API]

    OrderCmd --> WriteDB[(PostgreSQL)]
    WriteDB --> CDC[Debezium CDC]
    CDC --> Kafka[Apache Kafka]

    Kafka --> Saga[Saga Orchestrator]
    Kafka --> Projector[CQRS Projector]

    Saga --> PaymentSvc[決済サービス]
    Saga --> InventorySvc[在庫サービス]
    Saga --> DeliverySvc[配送サービス]

    Projector --> ReadDB[(Elasticsearch)]
    OrderQry --> ReadDB

    PaymentSvc --> Kafka
    InventorySvc --> Kafka
    DeliverySvc --> Kafka

    style Kafka fill:#ff9,stroke:#333
    style Saga fill:#f96,stroke:#333

6. 実践上の考慮事項

6.1 冪等性(Idempotency)

# イベントの重複処理防止
class IdempotentConsumer:
    def __init__(self, redis_client):
        self.redis = redis_client

    def process_event(self, event_id: str, handler):
        key = f"processed:{event_id}"
        if self.redis.setnx(key, "1"):
            self.redis.expire(key, 86400)  # 24時間TTL
            handler()
        else:
            pass  # すでに処理済み、スキップ

6.2 イベント順序の保証

# Kafkaのパーティションキーで順序保証
producer.send(
    "order.events",
    key=order_id.encode(),  # 同じ注文 → 同じパーティション → 順序保証
    value=json.dumps(event).encode()
)

7. クイズ

Q1. CQRSでCommandとQueryを分離する最大の理由は?

読み取り/書き込みの要件が根本的に異なるため。書き込みは正規化+トランザクション、読み取りは非正規化+パフォーマンス最適化。それぞれに最適化されたストレージとモデルを使用可能。

Q2. Saga ChoreographyとOrchestrationの違いは?

Choreography: 各サービスがイベントを発行/購読して自律的に動作。Orchestration: 中央のOrchestratorが各サービスに明示的に指示。Orchestrationの方がフロー把握と管理が容易。

Q3. 補償トランザクションとは?

Sagaの一つのステップが失敗した際、すでに完了したステップを逆順で取り消すトランザクション。例: 配送失敗 → 在庫予約取消 → 決済返金。

Q4. Event Sourcingの最大のメリットは?

(1) 完全な監査証跡(Audit Trail) (2) イベントリプレイによる時点復旧が可能 (3) 過去のイベントから新しいRead Modelを構築可能。

Q5. イベント処理で冪等性が重要な理由は?

ネットワーク障害、リトライ、リバランシングなどにより同じイベントが複数回配信される可能性がある。冪等でなければ、二重決済、二重在庫引き当てなど深刻な問題が発生する。

Q6. Kafkaでイベント順序を保証する方法は?

同じパーティションキーを使用する。同一キーのメッセージは同じパーティションに配信され、順序が保証される。例: order_idをパーティションキーとして使用。

Q7. CQRSでRead DBがWrite DBより遅れて更新される問題は?

Eventually Consistent — 結果整合性。クエリ時に最新データでない可能性がある。解決策: (1) 書き込み直後はWrite DBから直接読み取り (2) イベント処理完了を確認してから応答。