Skip to content
Published on

Sagaパターンで実装するマイクロサービス分散トランザクション: Choreography vs Orchestration

Authors
  • Name
    Twitter

はじめに

マイクロサービスアーキテクチャで最も難しい問題の一つが分散トランザクションです。モノリスではデータベースのACIDトランザクションで解決できましたが、各サービスが独自のデータベースを持つマイクロサービスでは、2PC(Two-Phase Commit)は現実的ではありません。

Sagaパターンは、長いトランザクションを複数のローカルトランザクションに分解し、失敗時に補償トランザクション(Compensating Transaction)で一貫性を維持するパターンです。

なぜ2PCではなくSagaなのか?

項目2PCSaga
整合性強い整合性結果整合性
可用性低い(同期ロック)高い(非同期)
レイテンシ高い低い
スケーラビリティ制限的優秀
障害分離全体ロールバック部分補償

注文シナリオ

オンラインショッピングモールの注文処理を例に説明します:

  1. 注文サービス: 注文作成
  2. 決済サービス: 決済処理
  3. 在庫サービス: 在庫引き当て
  4. 配送サービス: 配送作成

ステップ3(在庫引き当て)で失敗した場合、ステップ2(決済)を返金し、ステップ1(注文)をキャンセルする必要があります。

Choreography方式 — イベント駆動

各サービスがイベントを発行し、他のサービスがイベントを購読して自身のローカルトランザクションを実行します。

イベント定義

# events.py
from dataclasses import dataclass
from datetime import datetime

@dataclass
class OrderCreated:
    order_id: str
    customer_id: str
    items: list
    total_amount: float
    timestamp: datetime

@dataclass
class PaymentProcessed:
    order_id: str
    payment_id: str
    amount: float

@dataclass
class PaymentFailed:
    order_id: str
    reason: str

@dataclass
class InventoryReserved:
    order_id: str
    items: list

@dataclass
class InventoryInsufficient:
    order_id: str
    item_id: str
    requested: int
    available: int

@dataclass
class ShippingCreated:
    order_id: str
    tracking_number: str

# 補償イベント
@dataclass
class PaymentRefunded:
    order_id: str
    payment_id: str
    amount: float

@dataclass
class InventoryReleased:
    order_id: str
    items: list

@dataclass
class OrderCancelled:
    order_id: str
    reason: str

注文サービス

# order_service.py
import json
from kafka import KafkaProducer, KafkaConsumer

class OrderService:
    def __init__(self):
        self.producer = KafkaProducer(
            bootstrap_servers='kafka:9092',
            value_serializer=lambda v: json.dumps(v).encode()
        )
        self.db = OrderDB()

    def create_order(self, customer_id, items, total_amount):
        """注文作成 + OrderCreatedイベント発行"""
        order = self.db.create(
            customer_id=customer_id,
            items=items,
            total_amount=total_amount,
            status="PENDING"
        )

        self.producer.send('order-events', {
            'type': 'OrderCreated',
            'order_id': order.id,
            'customer_id': customer_id,
            'items': items,
            'total_amount': total_amount
        })

        return order

    def handle_payment_failed(self, event):
        """決済失敗時に注文キャンセル"""
        self.db.update_status(event['order_id'], "CANCELLED")
        self.producer.send('order-events', {
            'type': 'OrderCancelled',
            'order_id': event['order_id'],
            'reason': event['reason']
        })

    def handle_inventory_insufficient(self, event):
        """在庫不足時に返金要求"""
        self.db.update_status(event['order_id'], "CANCELLING")
        # 決済返金イベント発行
        self.producer.send('payment-events', {
            'type': 'RefundRequested',
            'order_id': event['order_id']
        })

    def handle_shipping_created(self, event):
        """配送作成完了時に注文確定"""
        self.db.update_status(event['order_id'], "CONFIRMED")

決済サービス

# payment_service.py
class PaymentService:
    def __init__(self):
        self.producer = KafkaProducer(
            bootstrap_servers='kafka:9092',
            value_serializer=lambda v: json.dumps(v).encode()
        )
        self.db = PaymentDB()

    def handle_order_created(self, event):
        """注文作成イベント受信 -> 決済処理"""
        try:
            payment = self.db.process_payment(
                order_id=event['order_id'],
                customer_id=event['customer_id'],
                amount=event['total_amount']
            )

            self.producer.send('payment-events', {
                'type': 'PaymentProcessed',
                'order_id': event['order_id'],
                'payment_id': payment.id,
                'amount': event['total_amount']
            })
        except InsufficientFundsError as e:
            self.producer.send('payment-events', {
                'type': 'PaymentFailed',
                'order_id': event['order_id'],
                'reason': str(e)
            })

    def handle_refund_requested(self, event):
        """補償トランザクション: 返金処理"""
        payment = self.db.get_by_order(event['order_id'])
        self.db.refund(payment.id)

        self.producer.send('payment-events', {
            'type': 'PaymentRefunded',
            'order_id': event['order_id'],
            'payment_id': payment.id,
            'amount': payment.amount
        })

Orchestration方式 — 中央コーディネーター

一つのOrchestrator(Saga Coordinator)が全体のトランザクションフローを管理します。

TemporalによるSaga Orchestratorの実装

# workflows.py
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta

@workflow.defn
class OrderSagaWorkflow:
    """注文Saga - Orchestration方式"""

    @workflow.run
    async def run(self, order_request: dict) -> dict:
        retry_policy = RetryPolicy(
            maximum_attempts=3,
            initial_interval=timedelta(seconds=1),
            maximum_interval=timedelta(seconds=10),
        )

        order_id = None
        payment_id = None
        reservation_id = None

        try:
            # Step 1: 注文作成
            order_id = await workflow.execute_activity(
                create_order,
                order_request,
                start_to_close_timeout=timedelta(seconds=30),
                retry_policy=retry_policy,
            )
            workflow.logger.info(f"Order created: {order_id}")

            # Step 2: 決済処理
            payment_id = await workflow.execute_activity(
                process_payment,
                {"order_id": order_id, "amount": order_request["total_amount"]},
                start_to_close_timeout=timedelta(seconds=60),
                retry_policy=retry_policy,
            )
            workflow.logger.info(f"Payment processed: {payment_id}")

            # Step 3: 在庫予約
            reservation_id = await workflow.execute_activity(
                reserve_inventory,
                {"order_id": order_id, "items": order_request["items"]},
                start_to_close_timeout=timedelta(seconds=30),
                retry_policy=retry_policy,
            )
            workflow.logger.info(f"Inventory reserved: {reservation_id}")

            # Step 4: 配送作成
            tracking_number = await workflow.execute_activity(
                create_shipment,
                {"order_id": order_id, "address": order_request["address"]},
                start_to_close_timeout=timedelta(seconds=30),
                retry_policy=retry_policy,
            )
            workflow.logger.info(f"Shipment created: {tracking_number}")

            # Step 5: 注文確定
            await workflow.execute_activity(
                confirm_order,
                {"order_id": order_id},
                start_to_close_timeout=timedelta(seconds=10),
            )

            return {
                "status": "SUCCESS",
                "order_id": order_id,
                "tracking_number": tracking_number
            }

        except Exception as e:
            workflow.logger.error(f"Saga failed: {e}")

            # 補償トランザクション実行(逆順)
            if reservation_id:
                await workflow.execute_activity(
                    release_inventory,
                    {"reservation_id": reservation_id},
                    start_to_close_timeout=timedelta(seconds=30),
                )

            if payment_id:
                await workflow.execute_activity(
                    refund_payment,
                    {"payment_id": payment_id},
                    start_to_close_timeout=timedelta(seconds=60),
                )

            if order_id:
                await workflow.execute_activity(
                    cancel_order,
                    {"order_id": order_id, "reason": str(e)},
                    start_to_close_timeout=timedelta(seconds=10),
                )

            return {"status": "FAILED", "reason": str(e)}

Activityの実装

# activities.py
from temporalio import activity

@activity.defn
async def create_order(request: dict) -> str:
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "http://order-service:8080/orders",
            json=request
        )
        response.raise_for_status()
        return response.json()["order_id"]

@activity.defn
async def process_payment(request: dict) -> str:
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "http://payment-service:8080/payments",
            json=request
        )
        response.raise_for_status()
        return response.json()["payment_id"]

@activity.defn
async def reserve_inventory(request: dict) -> str:
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "http://inventory-service:8080/reservations",
            json=request
        )
        if response.status_code == 409:
            raise InsufficientInventoryError(response.json()["message"])
        response.raise_for_status()
        return response.json()["reservation_id"]

# 補償アクティビティ
@activity.defn
async def refund_payment(request: dict) -> None:
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "http://payment-service:8080/refunds",
            json=request
        )
        response.raise_for_status()

@activity.defn
async def release_inventory(request: dict) -> None:
    async with httpx.AsyncClient() as client:
        response = await client.delete(
            f"http://inventory-service:8080/reservations/{request['reservation_id']}"
        )
        response.raise_for_status()

@activity.defn
async def cancel_order(request: dict) -> None:
    async with httpx.AsyncClient() as client:
        response = await client.patch(
            f"http://order-service:8080/orders/{request['order_id']}/cancel",
            json={"reason": request["reason"]}
        )
        response.raise_for_status()

Temporal Workerの起動

# worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker

async def main():
    client = await Client.connect("temporal:7233")

    worker = Worker(
        client,
        task_queue="order-saga",
        workflows=[OrderSagaWorkflow],
        activities=[
            create_order,
            process_payment,
            reserve_inventory,
            create_shipment,
            confirm_order,
            refund_payment,
            release_inventory,
            cancel_order,
        ],
    )
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

Choreography vs Orchestration

項目ChoreographyOrchestration
結合度疎結合Orchestratorに依存
複雑度サービス数増加時に急増中央で管理
可視性イベント追跡が困難ワークフロー状態が明確
単一障害点なしOrchestrator
適したケースシンプルなフロー(3-4段階)複雑なフロー(5段階以上)
テスト困難比較的容易

Semantic Lockの対策

Sagaには分離性がないため、同時実行時にデータ異常が発生する可能性があります。

# Semantic Lockパターン
class OrderDB:
    def create_with_lock(self, order_data):
        """注文作成時にステータスをPENDINGに設定して他のSagaの干渉を防止"""
        order = Order(
            **order_data,
            status="PENDING",  # Semantic Lock
            saga_id=generate_saga_id()
        )
        self.session.add(order)
        self.session.commit()
        return order

    def confirm_order(self, order_id, saga_id):
        """Saga完了時にロック解除"""
        order = self.session.query(Order).filter_by(
            id=order_id,
            saga_id=saga_id,
            status="PENDING"
        ).first()
        if not order:
            raise ConcurrencyError("Order already processed by another saga")
        order.status = "CONFIRMED"
        self.session.commit()

冪等性の保証

補償トランザクションが複数回実行される可能性があるため、冪等性は必須です。

# 冪等性キーを活用した決済処理
class PaymentService:
    def process_payment(self, order_id, amount, idempotency_key):
        # すでに処理済みのリクエストか確認
        existing = self.db.find_by_idempotency_key(idempotency_key)
        if existing:
            return existing  # 同一結果を返却

        payment = self.db.create_payment(
            order_id=order_id,
            amount=amount,
            idempotency_key=idempotency_key
        )
        return payment

まとめ

Sagaパターンは、マイクロサービスにおける分散トランザクション処理の核心パターンです:

  • Choreography: シンプルなフローに適合、イベント駆動による疎結合
  • Orchestration: 複雑なフローに適合、Temporalなどで中央管理
  • 補償トランザクション: 失敗時に逆順で復旧
  • 冪等性: リトライの安全性のために必須
  • Semantic Lock: 同時実行Saga間の干渉防止

クイズ: Sagaパターン理解度チェック(7問)

Q1. 2PCの代わりにSagaを使う主な理由は?

2PCは同期ロックにより可用性とスケーラビリティが制限されますが、Sagaは非同期により高い可用性とスケーラビリティを提供します。

Q2. 補償トランザクション(Compensating Transaction)とは?

Sagaの特定のステップが失敗した際に、以前に成功したステップを元に戻す逆順のトランザクションです。

Q3. Choreography方式の欠点は?

サービス数が増加するとイベントフローが複雑になり、Saga全体の進行状況を把握することが困難になります。

Q4. TemporalがSaga Orchestrationに適している理由は?

ワークフロー状態を自動的に永続化し、失敗時のリトライと補償を宣言的に管理できます。

Q5. Semantic Lockパターンの目的は?

PENDINGステータスにより、他のSagaが同じデータを同時に変更することを防止します。

Q6. 補償トランザクションで冪等性が重要な理由は?

ネットワークエラーなどにより補償が複数回実行される可能性があるため、同一の結果を保証する必要があります。

Q7. ChoreographyとOrchestrationのうち、どのような状況でOrchestrationを選択すべきか?

5段階以上の複雑なフロー、条件分岐が多い場合、全体の進行状況の可視性が必要な場合です。