- Authors
- Name
- はじめに
- なぜ2PCではなくSagaなのか?
- 注文シナリオ
- Choreography方式 — イベント駆動
- Orchestration方式 — 中央コーディネーター
- Choreography vs Orchestration
- Semantic Lockの対策
- 冪等性の保証
- まとめ
はじめに
マイクロサービスアーキテクチャで最も難しい問題の一つが分散トランザクションです。モノリスではデータベースのACIDトランザクションで解決できましたが、各サービスが独自のデータベースを持つマイクロサービスでは、2PC(Two-Phase Commit)は現実的ではありません。
Sagaパターンは、長いトランザクションを複数のローカルトランザクションに分解し、失敗時に補償トランザクション(Compensating Transaction)で一貫性を維持するパターンです。
なぜ2PCではなくSagaなのか?
| 項目 | 2PC | Saga |
|---|---|---|
| 整合性 | 強い整合性 | 結果整合性 |
| 可用性 | 低い(同期ロック) | 高い(非同期) |
| レイテンシ | 高い | 低い |
| スケーラビリティ | 制限的 | 優秀 |
| 障害分離 | 全体ロールバック | 部分補償 |
注文シナリオ
オンラインショッピングモールの注文処理を例に説明します:
- 注文サービス: 注文作成
- 決済サービス: 決済処理
- 在庫サービス: 在庫引き当て
- 配送サービス: 配送作成
ステップ3(在庫引き当て)で失敗した場合、ステップ2(決済)を返金し、ステップ1(注文)をキャンセルする必要があります。
Choreography方式 — イベント駆動
各サービスがイベントを発行し、他のサービスがイベントを購読して自身のローカルトランザクションを実行します。
イベント定義
# events.py
from dataclasses import dataclass
from datetime import datetime
@dataclass
class OrderCreated:
order_id: str
customer_id: str
items: list
total_amount: float
timestamp: datetime
@dataclass
class PaymentProcessed:
order_id: str
payment_id: str
amount: float
@dataclass
class PaymentFailed:
order_id: str
reason: str
@dataclass
class InventoryReserved:
order_id: str
items: list
@dataclass
class InventoryInsufficient:
order_id: str
item_id: str
requested: int
available: int
@dataclass
class ShippingCreated:
order_id: str
tracking_number: str
# 補償イベント
@dataclass
class PaymentRefunded:
order_id: str
payment_id: str
amount: float
@dataclass
class InventoryReleased:
order_id: str
items: list
@dataclass
class OrderCancelled:
order_id: str
reason: str
注文サービス
# order_service.py
import json
from kafka import KafkaProducer, KafkaConsumer
class OrderService:
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v).encode()
)
self.db = OrderDB()
def create_order(self, customer_id, items, total_amount):
"""注文作成 + OrderCreatedイベント発行"""
order = self.db.create(
customer_id=customer_id,
items=items,
total_amount=total_amount,
status="PENDING"
)
self.producer.send('order-events', {
'type': 'OrderCreated',
'order_id': order.id,
'customer_id': customer_id,
'items': items,
'total_amount': total_amount
})
return order
def handle_payment_failed(self, event):
"""決済失敗時に注文キャンセル"""
self.db.update_status(event['order_id'], "CANCELLED")
self.producer.send('order-events', {
'type': 'OrderCancelled',
'order_id': event['order_id'],
'reason': event['reason']
})
def handle_inventory_insufficient(self, event):
"""在庫不足時に返金要求"""
self.db.update_status(event['order_id'], "CANCELLING")
# 決済返金イベント発行
self.producer.send('payment-events', {
'type': 'RefundRequested',
'order_id': event['order_id']
})
def handle_shipping_created(self, event):
"""配送作成完了時に注文確定"""
self.db.update_status(event['order_id'], "CONFIRMED")
決済サービス
# payment_service.py
class PaymentService:
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v).encode()
)
self.db = PaymentDB()
def handle_order_created(self, event):
"""注文作成イベント受信 -> 決済処理"""
try:
payment = self.db.process_payment(
order_id=event['order_id'],
customer_id=event['customer_id'],
amount=event['total_amount']
)
self.producer.send('payment-events', {
'type': 'PaymentProcessed',
'order_id': event['order_id'],
'payment_id': payment.id,
'amount': event['total_amount']
})
except InsufficientFundsError as e:
self.producer.send('payment-events', {
'type': 'PaymentFailed',
'order_id': event['order_id'],
'reason': str(e)
})
def handle_refund_requested(self, event):
"""補償トランザクション: 返金処理"""
payment = self.db.get_by_order(event['order_id'])
self.db.refund(payment.id)
self.producer.send('payment-events', {
'type': 'PaymentRefunded',
'order_id': event['order_id'],
'payment_id': payment.id,
'amount': payment.amount
})
Orchestration方式 — 中央コーディネーター
一つのOrchestrator(Saga Coordinator)が全体のトランザクションフローを管理します。
TemporalによるSaga Orchestratorの実装
# workflows.py
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta
@workflow.defn
class OrderSagaWorkflow:
"""注文Saga - Orchestration方式"""
@workflow.run
async def run(self, order_request: dict) -> dict:
retry_policy = RetryPolicy(
maximum_attempts=3,
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=10),
)
order_id = None
payment_id = None
reservation_id = None
try:
# Step 1: 注文作成
order_id = await workflow.execute_activity(
create_order,
order_request,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=retry_policy,
)
workflow.logger.info(f"Order created: {order_id}")
# Step 2: 決済処理
payment_id = await workflow.execute_activity(
process_payment,
{"order_id": order_id, "amount": order_request["total_amount"]},
start_to_close_timeout=timedelta(seconds=60),
retry_policy=retry_policy,
)
workflow.logger.info(f"Payment processed: {payment_id}")
# Step 3: 在庫予約
reservation_id = await workflow.execute_activity(
reserve_inventory,
{"order_id": order_id, "items": order_request["items"]},
start_to_close_timeout=timedelta(seconds=30),
retry_policy=retry_policy,
)
workflow.logger.info(f"Inventory reserved: {reservation_id}")
# Step 4: 配送作成
tracking_number = await workflow.execute_activity(
create_shipment,
{"order_id": order_id, "address": order_request["address"]},
start_to_close_timeout=timedelta(seconds=30),
retry_policy=retry_policy,
)
workflow.logger.info(f"Shipment created: {tracking_number}")
# Step 5: 注文確定
await workflow.execute_activity(
confirm_order,
{"order_id": order_id},
start_to_close_timeout=timedelta(seconds=10),
)
return {
"status": "SUCCESS",
"order_id": order_id,
"tracking_number": tracking_number
}
except Exception as e:
workflow.logger.error(f"Saga failed: {e}")
# 補償トランザクション実行(逆順)
if reservation_id:
await workflow.execute_activity(
release_inventory,
{"reservation_id": reservation_id},
start_to_close_timeout=timedelta(seconds=30),
)
if payment_id:
await workflow.execute_activity(
refund_payment,
{"payment_id": payment_id},
start_to_close_timeout=timedelta(seconds=60),
)
if order_id:
await workflow.execute_activity(
cancel_order,
{"order_id": order_id, "reason": str(e)},
start_to_close_timeout=timedelta(seconds=10),
)
return {"status": "FAILED", "reason": str(e)}
Activityの実装
# activities.py
from temporalio import activity
@activity.defn
async def create_order(request: dict) -> str:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://order-service:8080/orders",
json=request
)
response.raise_for_status()
return response.json()["order_id"]
@activity.defn
async def process_payment(request: dict) -> str:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://payment-service:8080/payments",
json=request
)
response.raise_for_status()
return response.json()["payment_id"]
@activity.defn
async def reserve_inventory(request: dict) -> str:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://inventory-service:8080/reservations",
json=request
)
if response.status_code == 409:
raise InsufficientInventoryError(response.json()["message"])
response.raise_for_status()
return response.json()["reservation_id"]
# 補償アクティビティ
@activity.defn
async def refund_payment(request: dict) -> None:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://payment-service:8080/refunds",
json=request
)
response.raise_for_status()
@activity.defn
async def release_inventory(request: dict) -> None:
async with httpx.AsyncClient() as client:
response = await client.delete(
f"http://inventory-service:8080/reservations/{request['reservation_id']}"
)
response.raise_for_status()
@activity.defn
async def cancel_order(request: dict) -> None:
async with httpx.AsyncClient() as client:
response = await client.patch(
f"http://order-service:8080/orders/{request['order_id']}/cancel",
json={"reason": request["reason"]}
)
response.raise_for_status()
Temporal Workerの起動
# worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
async def main():
client = await Client.connect("temporal:7233")
worker = Worker(
client,
task_queue="order-saga",
workflows=[OrderSagaWorkflow],
activities=[
create_order,
process_payment,
reserve_inventory,
create_shipment,
confirm_order,
refund_payment,
release_inventory,
cancel_order,
],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
Choreography vs Orchestration
| 項目 | Choreography | Orchestration |
|---|---|---|
| 結合度 | 疎結合 | Orchestratorに依存 |
| 複雑度 | サービス数増加時に急増 | 中央で管理 |
| 可視性 | イベント追跡が困難 | ワークフロー状態が明確 |
| 単一障害点 | なし | Orchestrator |
| 適したケース | シンプルなフロー(3-4段階) | 複雑なフロー(5段階以上) |
| テスト | 困難 | 比較的容易 |
Semantic Lockの対策
Sagaには分離性がないため、同時実行時にデータ異常が発生する可能性があります。
# Semantic Lockパターン
class OrderDB:
def create_with_lock(self, order_data):
"""注文作成時にステータスをPENDINGに設定して他のSagaの干渉を防止"""
order = Order(
**order_data,
status="PENDING", # Semantic Lock
saga_id=generate_saga_id()
)
self.session.add(order)
self.session.commit()
return order
def confirm_order(self, order_id, saga_id):
"""Saga完了時にロック解除"""
order = self.session.query(Order).filter_by(
id=order_id,
saga_id=saga_id,
status="PENDING"
).first()
if not order:
raise ConcurrencyError("Order already processed by another saga")
order.status = "CONFIRMED"
self.session.commit()
冪等性の保証
補償トランザクションが複数回実行される可能性があるため、冪等性は必須です。
# 冪等性キーを活用した決済処理
class PaymentService:
def process_payment(self, order_id, amount, idempotency_key):
# すでに処理済みのリクエストか確認
existing = self.db.find_by_idempotency_key(idempotency_key)
if existing:
return existing # 同一結果を返却
payment = self.db.create_payment(
order_id=order_id,
amount=amount,
idempotency_key=idempotency_key
)
return payment
まとめ
Sagaパターンは、マイクロサービスにおける分散トランザクション処理の核心パターンです:
- Choreography: シンプルなフローに適合、イベント駆動による疎結合
- Orchestration: 複雑なフローに適合、Temporalなどで中央管理
- 補償トランザクション: 失敗時に逆順で復旧
- 冪等性: リトライの安全性のために必須
- Semantic Lock: 同時実行Saga間の干渉防止
クイズ: Sagaパターン理解度チェック(7問)
Q1. 2PCの代わりにSagaを使う主な理由は?
2PCは同期ロックにより可用性とスケーラビリティが制限されますが、Sagaは非同期により高い可用性とスケーラビリティを提供します。
Q2. 補償トランザクション(Compensating Transaction)とは?
Sagaの特定のステップが失敗した際に、以前に成功したステップを元に戻す逆順のトランザクションです。
Q3. Choreography方式の欠点は?
サービス数が増加するとイベントフローが複雑になり、Saga全体の進行状況を把握することが困難になります。
Q4. TemporalがSaga Orchestrationに適している理由は?
ワークフロー状態を自動的に永続化し、失敗時のリトライと補償を宣言的に管理できます。
Q5. Semantic Lockパターンの目的は?
PENDINGステータスにより、他のSagaが同じデータを同時に変更することを防止します。
Q6. 補償トランザクションで冪等性が重要な理由は?
ネットワークエラーなどにより補償が複数回実行される可能性があるため、同一の結果を保証する必要があります。
Q7. ChoreographyとOrchestrationのうち、どのような状況でOrchestrationを選択すべきか?
5段階以上の複雑なフロー、条件分岐が多い場合、全体の進行状況の可視性が必要な場合です。