Skip to content
Published on

Event-Driven + Sagaアーキテクチャのトレードオフ

Authors
Event-Driven + Sagaアーキテクチャのトレードオフ

分散トランザクションが必要になる瞬間

モノリスでは、1つのDBトランザクション内で注文作成、在庫引き落とし、決済処理をすべて行うことができる。ACIDが保証されるため、途中で失敗すればrollback一回で終わりだ。

しかしサービスが分離された瞬間、この「当たり前のこと」が不可能になる。注文サービス、在庫サービス、決済サービスがそれぞれ独自のDBを持つと、1つのトランザクションにまとめることができない。2PC(Two-Phase Commit)は理論的には可能だが、ネットワークパーティションに脆弱で、coordinatorが単一障害点となり、ロック時間が長くなってスループットが急減する。

Sagaパターンは Chris Richardson が体系化した代替手段だ(microservices.io/patterns/data/saga)。核心的なアイデアはシンプルだ。

分散トランザクションを複数のローカルトランザクションに分解し、途中で失敗した場合は既に完了したトランザクションを取り消す**補償トランザクション(compensating transaction)**を実行する。

この記事ではSagaとEvent-Driven Architecture(EDA)を組み合わせる際に発生する実際のトレードオフを扱う。「分離すればスケールする」というスローガンの裏にある複雑さ、コスト、組織要件を正直に分析する。

Sagaの2つの調整方式

Choreography:イベントベースの自律調整

各サービスがローカルトランザクションを完了した後にドメインイベントを発行し、次のサービスがそのイベントをサブスクライブして自身の作業を実行する。中央オーケストレーターは存在しない。

OrderService              InventoryService          PaymentService
    |                        |                        |
    |-- OrderCreated ------->|                        |
    |                        |-- InventoryReserved --->|
    |                        |                        |-- PaymentProcessed
    |<----------- OrderConfirmed ----------------------|

失敗時には逆方向に補償イベントが流れる。

PaymentService: PaymentFailed
  --> InventoryService: InventoryReleased
    --> OrderService: OrderCancelled

利点:サービス間の直接的な依存関係がない。新しいサービスを追加する際に既存サービスを修正する必要がない。 欠点:全体のフローをコードで一目で確認できない。イベントチェーンが複雑になると、どこで失敗したか追跡が困難。循環依存が発生するリスクがある。

Orchestration:中央オーケストレーター方式

Saga Orchestratorが全体のフローを管理する。各ステップの成功/失敗に応じて次のステップを決定し、失敗時に補償順序を制御する。

SagaOrchestrator
    |-- (1) reserve_credit --> PaymentService
    |-- (2) reserve_inventory --> InventoryService
    |-- (3) create_shipment --> ShippingService
    |
    |  (3) 失敗時:
    |-- compensate (2) release_inventory
    |-- compensate (1) refund_credit

利点:全体のフローがorchestratorコードに明示的に存在する。デバッグとモニタリングが容易。 欠点:orchestratorが単一障害点になる可能性がある。サービス数が増えるとorchestratorの複雑度が増大する。

Saga Orchestratorの実装

以下はプロダクションで使用可能なSaga Orchestratorのコア実装だ。

"""
Saga Orchestrator: 分散トランザクションの実行と補償を管理する。

各stepはexecute()とcompensate()を実装し、
orchestratorは順次実行後、失敗時に逆順で補償する。
"""
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import logging
import uuid
import time

logger = logging.getLogger(__name__)


class SagaStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    COMPENSATED = "compensated"
    FAILED = "failed"  # 補償自体が失敗した場合


class StepStatus(Enum):
    PENDING = "pending"
    EXECUTED = "executed"
    COMPENSATED = "compensated"
    COMPENSATION_FAILED = "compensation_failed"


@dataclass
class SagaStep:
    """Sagaの個別ステップ。

    execute()とcompensate()はサブクラスで実装する。
    idempotency_keyによりリトライの安全性を保証する。
    """
    name: str
    status: StepStatus = StepStatus.PENDING
    idempotency_key: str = ""
    executed_at: Optional[float] = None
    compensated_at: Optional[float] = None
    error: Optional[str] = None

    def __post_init__(self):
        if not self.idempotency_key:
            self.idempotency_key = str(uuid.uuid4())

    def execute(self, context: dict) -> dict:
        raise NotImplementedError

    def compensate(self, context: dict) -> None:
        raise NotImplementedError


@dataclass
class SagaExecution:
    """Saga実行記録。"""
    saga_id: str
    status: SagaStatus = SagaStatus.PENDING
    steps: list[SagaStep] = field(default_factory=list)
    context: dict = field(default_factory=dict)
    started_at: Optional[float] = None
    completed_at: Optional[float] = None
    error: Optional[str] = None


class SagaOrchestrator:
    """Saga実行を管理するorchestrator。

    実行フロー:
    1. 各stepを順次execute()
    2. いずれかのstepが失敗したら、既に実行されたstepを逆順でcompensate()
    3. 補償も失敗したらFAILED状態で記録しアラート送信
    """

    def __init__(self, steps: list[SagaStep], on_failure: Optional[callable] = None):
        self.steps = steps
        self.on_failure = on_failure  # 補償失敗時に呼び出すコールバック

    def run(self, initial_context: dict | None = None) -> SagaExecution:
        execution = SagaExecution(
            saga_id=str(uuid.uuid4()),
            steps=self.steps,
            context=initial_context or {},
            started_at=time.time(),
        )
        execution.status = SagaStatus.RUNNING
        completed_steps: list[SagaStep] = []

        logger.info(f"Saga {execution.saga_id} started with {len(self.steps)} steps")

        try:
            for step in self.steps:
                logger.info(f"Saga {execution.saga_id}: executing '{step.name}'")
                try:
                    result = step.execute(execution.context)
                    step.status = StepStatus.EXECUTED
                    step.executed_at = time.time()

                    # stepの結果をcontextにマージ
                    if isinstance(result, dict):
                        execution.context.update(result)

                    completed_steps.append(step)

                except Exception as e:
                    step.error = str(e)
                    logger.error(
                        f"Saga {execution.saga_id}: step '{step.name}' failed: {e}"
                    )
                    # 補償開始
                    self._compensate(execution, completed_steps)
                    return execution

            execution.status = SagaStatus.COMPLETED
            execution.completed_at = time.time()
            logger.info(f"Saga {execution.saga_id} completed successfully")

        except Exception as e:
            execution.status = SagaStatus.FAILED
            execution.error = str(e)
            logger.critical(f"Saga {execution.saga_id} unexpected error: {e}")

        return execution

    def _compensate(self, execution: SagaExecution, completed_steps: list[SagaStep]):
        """完了したstepを逆順で補償する。"""
        execution.status = SagaStatus.COMPENSATING
        compensation_failures = []

        for step in reversed(completed_steps):
            logger.info(
                f"Saga {execution.saga_id}: compensating '{step.name}'"
            )
            try:
                step.compensate(execution.context)
                step.status = StepStatus.COMPENSATED
                step.compensated_at = time.time()
            except Exception as e:
                step.status = StepStatus.COMPENSATION_FAILED
                step.error = f"Compensation failed: {e}"
                compensation_failures.append((step.name, str(e)))
                logger.critical(
                    f"Saga {execution.saga_id}: compensation of '{step.name}' FAILED: {e}"
                )

        if compensation_failures:
            execution.status = SagaStatus.FAILED
            execution.error = f"Compensation failures: {compensation_failures}"
            if self.on_failure:
                self.on_failure(execution, compensation_failures)
        else:
            execution.status = SagaStatus.COMPENSATED
            execution.completed_at = time.time()

実際のstep実装例は以下の通りだ。

class ReserveCreditStep(SagaStep):
    """決済サービスにクレジットを予約する。"""

    def __init__(self, payment_client):
        super().__init__(name="reserve_credit")
        self.payment_client = payment_client

    def execute(self, context: dict) -> dict:
        result = self.payment_client.reserve(
            customer_id=context["customer_id"],
            amount=context["order_total"],
            idempotency_key=self.idempotency_key,
        )
        return {"reservation_id": result.reservation_id}

    def compensate(self, context: dict) -> None:
        self.payment_client.release(
            reservation_id=context["reservation_id"],
            idempotency_key=f"{self.idempotency_key}_compensate",
        )


class ReserveInventoryStep(SagaStep):
    """在庫サービスに商品を予約する。"""

    def __init__(self, inventory_client):
        super().__init__(name="reserve_inventory")
        self.inventory_client = inventory_client

    def execute(self, context: dict) -> dict:
        result = self.inventory_client.reserve(
            items=context["order_items"],
            idempotency_key=self.idempotency_key,
        )
        return {"inventory_reservation_id": result.reservation_id}

    def compensate(self, context: dict) -> None:
        self.inventory_client.release(
            reservation_id=context["inventory_reservation_id"],
            idempotency_key=f"{self.idempotency_key}_compensate",
        )

イベント契約とスキーマ管理

Event-Driven Architectureにおいて、イベントはサービス間の**契約(contract)**だ。この契約を管理しなければ、あるサービスのイベント形式変更が消費者サービスを予告なく壊してしまう。

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "$id": "https://events.example.com/order/created/v2",
  "title": "OrderCreated",
  "description": "注文が作成された時に発行されるイベント",
  "type": "object",
  "required": ["event_id", "event_type", "event_version", "timestamp", "data"],
  "properties": {
    "event_id": {
      "type": "string",
      "format": "uuid",
      "description": "イベント固有ID(idempotency keyとして使用)"
    },
    "event_type": {
      "type": "string",
      "const": "order.created"
    },
    "event_version": {
      "type": "integer",
      "const": 2,
      "description": "スキーマバージョン。互換性判断に使用"
    },
    "timestamp": {
      "type": "string",
      "format": "date-time"
    },
    "data": {
      "type": "object",
      "required": ["order_id", "customer_id", "items", "total_amount"],
      "properties": {
        "order_id": { "type": "string" },
        "customer_id": { "type": "string" },
        "items": {
          "type": "array",
          "items": {
            "type": "object",
            "required": ["sku", "quantity", "unit_price"],
            "properties": {
              "sku": { "type": "string" },
              "quantity": { "type": "integer", "minimum": 1 },
              "unit_price": { "type": "number", "minimum": 0 }
            }
          }
        },
        "total_amount": { "type": "number", "minimum": 0 },
        "currency": { "type": "string", "default": "KRW" },
        "shipping_address": {
          "type": "object",
          "description": "v2で追加されたフィールド。v1コンシューマーはこのフィールドを無視する。"
        }
      }
    }
  }
}

スキーマ互換性ポリシーは、Confluent Schema Registryの互換性モードを参考にするとよい。

互換性モード許可される変更使用タイミング
BACKWARDフィールド追加(optional)、フィールド削除コンシューマーが先にデプロイされる場合
FORWARDフィールド追加、デフォルト値付きフィールド削除プロデューサーが先にデプロイされる場合
FULL上記2つを両方満たすプロデューサー/コンシューマーのデプロイ順序が保証できない場合
NONEすべての変更を許可(危険)開発環境のみ

Outboxパターン:イベント発行の原子性保証

最もよくある間違いの1つは、「DBに保存してイベントを発行する」という2つの操作がアトミックでないことだ。

# 危険なパターン
1. DBに注文保存     -- 成功
2. Kafkaにイベント発行 -- 失敗(ネットワーク問題)
=> 注文は作成されたがイベントがないため、在庫も引かれず決済もされない

Outboxパターンは、イベントを別のoutboxテーブルに一緒に保存し、別のプロセスがoutboxをポーリングしてメッセージブローカーに配信する。

-- outboxテーブルスキーマ
CREATE TABLE outbox (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type  VARCHAR(100) NOT NULL,    -- 'Order', 'Payment' 等
    aggregate_id    VARCHAR(100) NOT NULL,    -- 注文ID、決済ID
    event_type      VARCHAR(200) NOT NULL,    -- 'order.created'
    event_version   INT NOT NULL DEFAULT 1,
    payload         JSONB NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMPTZ,              -- NULLなら未発行
    retry_count     INT NOT NULL DEFAULT 0,
    max_retries     INT NOT NULL DEFAULT 5
);

-- 未発行イベント検索インデックス
CREATE INDEX idx_outbox_unpublished
    ON outbox (created_at)
    WHERE published_at IS NULL AND retry_count < max_retries;

-- 注文作成とイベントを1つのトランザクションで処理
BEGIN;
    INSERT INTO orders (id, customer_id, total, status)
    VALUES ('ord-123', 'cust-456', 50000, 'created');

    INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
    VALUES (
        'Order',
        'ord-123',
        'order.created',
        '{"order_id": "ord-123", "customer_id": "cust-456", "total": 50000}'::jsonb
    );
COMMIT;

Outbox publisherは別プロセスとして動作し、未発行イベントを定期的にポーリングする。

"""
Outbox Publisher: outboxテーブルの未発行イベントをbrokerに配信する。

at-least-once配信を保証し、コンシューマー側で
idempotencyを処理する必要がある。
"""
import time
import json
import logging

logger = logging.getLogger(__name__)


class OutboxPublisher:
    def __init__(self, db_pool, kafka_producer, poll_interval_seconds: float = 1.0):
        self.db_pool = db_pool
        self.producer = kafka_producer
        self.poll_interval = poll_interval_seconds

    def run(self):
        """メインループ。未発行イベントを継続的にポーリングして発行する。"""
        logger.info("OutboxPublisher started")
        while True:
            try:
                published_count = self._poll_and_publish()
                if published_count == 0:
                    time.sleep(self.poll_interval)
            except Exception as e:
                logger.error(f"OutboxPublisher error: {e}")
                time.sleep(self.poll_interval * 5)

    def _poll_and_publish(self, batch_size: int = 100) -> int:
        """未発行イベントのバッチを検索し発行する。"""
        with self.db_pool.connection() as conn:
            with conn.cursor() as cur:
                cur.execute("""
                    SELECT id, aggregate_type, aggregate_id,
                           event_type, event_version, payload
                    FROM outbox
                    WHERE published_at IS NULL
                      AND retry_count < max_retries
                    ORDER BY created_at
                    LIMIT %s
                    FOR UPDATE SKIP LOCKED
                """, (batch_size,))

                rows = cur.fetchall()
                if not rows:
                    return 0

                published_ids = []
                failed_ids = []

                for row in rows:
                    event_id, agg_type, agg_id, evt_type, evt_ver, payload = row
                    topic = f"{agg_type.lower()}-events"

                    try:
                        self.producer.send(
                            topic=topic,
                            key=agg_id.encode(),
                            value=json.dumps({
                                "event_id": str(event_id),
                                "event_type": evt_type,
                                "event_version": evt_ver,
                                "data": payload,
                            }).encode(),
                        )
                        published_ids.append(event_id)
                    except Exception as e:
                        logger.warning(f"Failed to publish {event_id}: {e}")
                        failed_ids.append(event_id)

                # 成功したイベントをマーク
                if published_ids:
                    cur.execute("""
                        UPDATE outbox SET published_at = NOW()
                        WHERE id = ANY(%s)
                    """, (published_ids,))

                # 失敗したイベントのリトライカウントを増加
                if failed_ids:
                    cur.execute("""
                        UPDATE outbox SET retry_count = retry_count + 1
                        WHERE id = ANY(%s)
                    """, (failed_ids,))

                conn.commit()
                return len(published_ids)

Idempotency:重複イベント処理

分散システムではイベントは「少なくとも1回(at-least-once)」配信されるため、同一イベントが複数回到着する可能性がある。コンシューマーは必ずidempotentでなければならない。

class IdempotentEventHandler:
    """重複イベントを安全に処理するハンドラーラッパー。

    event_idをキーとして処理履歴を記録し、
    既に処理済みのイベントは無視する。
    """

    def __init__(self, db_pool, handler_fn):
        self.db_pool = db_pool
        self.handler_fn = handler_fn

    def handle(self, event: dict) -> bool:
        event_id = event["event_id"]

        with self.db_pool.connection() as conn:
            with conn.cursor() as cur:
                # 既に処理済みのイベントか確認
                cur.execute(
                    "SELECT 1 FROM processed_events WHERE event_id = %s",
                    (event_id,),
                )
                if cur.fetchone():
                    logger.info(f"Duplicate event {event_id}, skipping")
                    return True

                try:
                    # ビジネスロジック実行とイベント処理記録を1つのトランザクションで
                    self.handler_fn(event, cur)

                    cur.execute(
                        "INSERT INTO processed_events (event_id, processed_at) VALUES (%s, NOW())",
                        (event_id,),
                    )
                    conn.commit()
                    return True

                except Exception as e:
                    conn.rollback()
                    logger.error(f"Failed to process event {event_id}: {e}")
                    return False

Saga運用モニタリング

Sagaがプロダクションで稼働すると、各sagaインスタンスの状態をリアルタイムで追跡する必要がある。

-- Sagaステータスダッシュボードクエリ

-- 1. 時間帯別saga状態分布
SELECT
    date_trunc('hour', started_at) AS hour,
    status,
    COUNT(*) AS count,
    AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) AS avg_duration_seconds
FROM saga_executions
WHERE started_at >= NOW() - INTERVAL '24 hours'
GROUP BY 1, 2
ORDER BY 1 DESC, 2;

-- 2. 補償失敗が発生したsaga(即時対応必要)
SELECT
    saga_id,
    started_at,
    error,
    jsonb_agg(
        jsonb_build_object(
            'step', step_name,
            'status', step_status,
            'error', step_error
        ) ORDER BY step_order
    ) AS step_details
FROM saga_executions e
JOIN saga_steps s ON e.saga_id = s.saga_id
WHERE e.status = 'FAILED'
  AND e.started_at >= NOW() - INTERVAL '7 days'
GROUP BY e.saga_id, e.started_at, e.error
ORDER BY e.started_at DESC;

-- 3. step別失敗率(ボトルネック特定)
SELECT
    step_name,
    COUNT(*) AS total_executions,
    SUM(CASE WHEN step_status = 'executed' THEN 1 ELSE 0 END) AS successes,
    SUM(CASE WHEN step_status != 'executed' THEN 1 ELSE 0 END) AS failures,
    ROUND(
        100.0 * SUM(CASE WHEN step_status != 'executed' THEN 1 ELSE 0 END) / COUNT(*),
        2
    ) AS failure_rate_pct
FROM saga_steps
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY step_name
ORDER BY failure_rate_pct DESC;

導入判断:すべてにSagaが必要なわけではない

Saga + EDAは強力だが複雑だ。導入前に以下に正直に答える必要がある。

Saga/EDAが適切な場合:

  • サービス間のデータ整合性が必須だが、強い整合性(ACID)ではなく結果整合性(eventual consistency)で十分な場合
  • 各サービスの独立したデプロイとスケーリングがビジネス要件の場合
  • チームがサービスオーナーシップ境界に沿って分離されており、各チームが独自のデプロイサイクルを持つ必要がある場合
  • トランザクション量が多く、2PCのロックコストを許容できない場合

モノリストランザクションの方が良い場合:

  • サービス境界が明確でなく頻繁に変更される初期段階の製品
  • チーム規模が小さく(5名以下)、分散システム運用の負担が開発速度の利点を上回る場合
  • データ整合性要件がリアルタイム(即時的)で、eventual consistencyがビジネス的に不可能な場合(例:銀行口座間の送金)
  • メッセージブローカー(Kafka、RabbitMQ)の運用能力がチームにない場合

実践トラブルシューティング

補償トランザクションが失敗した場合は?

これがSagaの最も困難な問題だ。補償が失敗すると、システムが「半端な状態」に置かれる。注文はキャンセルされたが決済は返金されていない状態。

対応策:

  1. Dead Letter Queue(DLQ):補償失敗イベントをDLQに送り、運用チームが手動で処理する。
  2. リトライポリシー:exponential backoffで3-5回リトライする。一時的なネットワーク問題はこれで解決される。
  3. 手動復旧ランブック:DLQでも解決できない場合のためのステップバイステップ手動復旧手順を文書化する。
  4. アラート:補償失敗はP1アラートに設定する。「後で処理しよう」とするとデータ不整合が蓄積される。

イベント順序が保証されない場合

Kafkaはパーティション内では順序を保証するが、パーティション間では保証しない。同じ注文のイベントが異なるパーティションに送られると順序が逆転する可能性がある。

対応策:

  1. 同一aggregateのイベントは同じパーティションキーを使用する(例:order_id)。
  2. コンシューマー側でイベントのシーケンス番号を確認し、順序が合わない場合は一時待機するか再処理キューに送る。
  3. イベントにcausation_id(原因イベントID)を含めて因果関係を追跡する。

Dead Letter Queueが溜まり続ける

症状:DLQに数千件のイベントが溜まっているが誰も処理していない。

原因:DLQ処理プロセスがないか、処理責任が明確でない。

対応:(1) DLQ件数に対するアラートを設定する(100件超過で警告、1000件超過でP1)。(2) 週次DLQレビューをチームルーティンに含める。(3) 自動再処理可能な件と手動介入が必要な件を分類するトリアージロジックを実装する。

参考資料

クイズ
  1. SagaパターンがTwo-Phase Commit(2PC)の代わりに使用される主な理由は? 正解:||2PCはcoordinatorが単一障害点となり、ネットワークパーティションに脆弱で、ロック時間が長くなりスループットが急減する。Sagaは各ステップを独立したローカルトランザクションとして処理し、失敗時に補償トランザクションで復旧するため、可用性とスループットが高い。||

  2. ChoreographyとOrchestrationの最大の運用上の違いは? 正解:||Choreographyは全体のsagaフローが複数のサービスに分散しているため一目で把握しにくくデバッグが複雑。Orchestrationは全体のフローがorchestratorコードに明示的に存在し追跡とデバッグが容易だが、orchestratorが単一障害点になる可能性がある。||

  3. Outboxパターンが解決する核心的な問題は? 正解:||DB保存とイベント発行がアトミックでない問題。DBには保存されたがイベント発行が失敗するとデータ不整合が発生する。Outboxパターンはイベントを同じDBトランザクション内でoutboxテーブルに保存し、別プロセスがそれを読んで発行することで原子性を保証する。||

  4. コンシューマーのidempotencyが必須である理由は? 正解:||分散システムではイベントはat-least-onceで配信されるため同一イベントが複数回到着する可能性がある。idempotentでなければ同一注文が二重決済されたり在庫が二重引き落としされる問題が発生する。||

  5. Sagaの補償トランザクションが失敗した場合、最低限やるべき3つのことは? 正解:||DLQに失敗イベントを保管し、P1アラートを送信し、手動復旧ランブックに従って運用チームが介入してデータ整合性を復旧する。||

  6. Kafkaで同じ注文のイベント順序を保証するには? 正解:||同一aggregate(例:order_id)のイベントが同じパーティションに行くようにパーティションキーをorder_idに設定する。Kafkaはパーティション内では順序を保証するため、同じキーのイベントは順番通りに処理される。||

  7. イベントスキーマ互換性モードの中でFULLが推奨される状況は? 正解:||プロデューサーとコンシューマーのデプロイ順序が保証できない場合。FULL互換性はBACKWARD(コンシューマー先)とFORWARD(プロデューサー先)の両方を満たすため、デプロイ順序に関係なく安全。||

  8. Saga/EDA導入を避けるべき代表的な状況2つは? 正解:||(1) サービス境界が頻繁に変更される初期段階の製品ではモノリスの方が変更コストが低い。(2) チーム規模が小さく、分散システム運用(Kafka、モニタリング、DLQ処理等)の負担が開発速度の利点を上回る場合。||