Skip to content

필사 모드: Sagaパターンで実装するマイクロサービス分散トランザクション: Choreography vs Orchestration

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

はじめに

マイクロサービスアーキテクチャで最も難しい問題の一つが**分散トランザクション**です。モノリスではデータベースのACIDトランザクションで解決できましたが、各サービスが独自のデータベースを持つマイクロサービスでは、2PC(Two-Phase Commit)は現実的ではありません。

**Sagaパターン**は、長いトランザクションを複数のローカルトランザクションに分解し、失敗時に補償トランザクション(Compensating Transaction)で一貫性を維持するパターンです。

なぜ2PCではなくSagaなのか?

| 項目 | 2PC | Saga |

| ---------------- | ------------------ | -------------- |

| 整合性 | 強い整合性 | 結果整合性 |

| 可用性 | 低い(同期ロック) | 高い(非同期) |

| レイテンシ | 高い | 低い |

| スケーラビリティ | 制限的 | 優秀 |

| 障害分離 | 全体ロールバック | 部分補償 |

注文シナリオ

オンラインショッピングモールの注文処理を例に説明します:

1. **注文サービス**: 注文作成

2. **決済サービス**: 決済処理

3. **在庫サービス**: 在庫引き当て

4. **配送サービス**: 配送作成

ステップ3(在庫引き当て)で失敗した場合、ステップ2(決済)を返金し、ステップ1(注文)をキャンセルする必要があります。

Choreography方式 — イベント駆動

各サービスがイベントを発行し、他のサービスがイベントを購読して自身のローカルトランザクションを実行します。

イベント定義

events.py

from dataclasses import dataclass

from datetime import datetime

@dataclass

class OrderCreated:

order_id: str

customer_id: str

items: list

total_amount: float

timestamp: datetime

@dataclass

class PaymentProcessed:

order_id: str

payment_id: str

amount: float

@dataclass

class PaymentFailed:

order_id: str

reason: str

@dataclass

class InventoryReserved:

order_id: str

items: list

@dataclass

class InventoryInsufficient:

order_id: str

item_id: str

requested: int

available: int

@dataclass

class ShippingCreated:

order_id: str

tracking_number: str

補償イベント

@dataclass

class PaymentRefunded:

order_id: str

payment_id: str

amount: float

@dataclass

class InventoryReleased:

order_id: str

items: list

@dataclass

class OrderCancelled:

order_id: str

reason: str

注文サービス

order_service.py

from kafka import KafkaProducer, KafkaConsumer

class OrderService:

def __init__(self):

self.producer = KafkaProducer(

bootstrap_servers='kafka:9092',

value_serializer=lambda v: json.dumps(v).encode()

)

self.db = OrderDB()

def create_order(self, customer_id, items, total_amount):

"""注文作成 + OrderCreatedイベント発行"""

order = self.db.create(

customer_id=customer_id,

items=items,

total_amount=total_amount,

status="PENDING"

)

self.producer.send('order-events', {

'type': 'OrderCreated',

'order_id': order.id,

'customer_id': customer_id,

'items': items,

'total_amount': total_amount

})

return order

def handle_payment_failed(self, event):

"""決済失敗時に注文キャンセル"""

self.db.update_status(event['order_id'], "CANCELLED")

self.producer.send('order-events', {

'type': 'OrderCancelled',

'order_id': event['order_id'],

'reason': event['reason']

})

def handle_inventory_insufficient(self, event):

"""在庫不足時に返金要求"""

self.db.update_status(event['order_id'], "CANCELLING")

決済返金イベント発行

self.producer.send('payment-events', {

'type': 'RefundRequested',

'order_id': event['order_id']

})

def handle_shipping_created(self, event):

"""配送作成完了時に注文確定"""

self.db.update_status(event['order_id'], "CONFIRMED")

決済サービス

payment_service.py

class PaymentService:

def __init__(self):

self.producer = KafkaProducer(

bootstrap_servers='kafka:9092',

value_serializer=lambda v: json.dumps(v).encode()

)

self.db = PaymentDB()

def handle_order_created(self, event):

"""注文作成イベント受信 -> 決済処理"""

try:

payment = self.db.process_payment(

order_id=event['order_id'],

customer_id=event['customer_id'],

amount=event['total_amount']

)

self.producer.send('payment-events', {

'type': 'PaymentProcessed',

'order_id': event['order_id'],

'payment_id': payment.id,

'amount': event['total_amount']

})

except InsufficientFundsError as e:

self.producer.send('payment-events', {

'type': 'PaymentFailed',

'order_id': event['order_id'],

'reason': str(e)

})

def handle_refund_requested(self, event):

"""補償トランザクション: 返金処理"""

payment = self.db.get_by_order(event['order_id'])

self.db.refund(payment.id)

self.producer.send('payment-events', {

'type': 'PaymentRefunded',

'order_id': event['order_id'],

'payment_id': payment.id,

'amount': payment.amount

})

Orchestration方式 — 中央コーディネーター

一つのOrchestrator(Saga Coordinator)が全体のトランザクションフローを管理します。

TemporalによるSaga Orchestratorの実装

workflows.py

from temporalio import workflow

from temporalio.common import RetryPolicy

from datetime import timedelta

@workflow.defn

class OrderSagaWorkflow:

"""注文Saga - Orchestration方式"""

@workflow.run

async def run(self, order_request: dict) -> dict:

retry_policy = RetryPolicy(

maximum_attempts=3,

initial_interval=timedelta(seconds=1),

maximum_interval=timedelta(seconds=10),

)

order_id = None

payment_id = None

reservation_id = None

try:

Step 1: 注文作成

order_id = await workflow.execute_activity(

create_order,

order_request,

start_to_close_timeout=timedelta(seconds=30),

retry_policy=retry_policy,

)

workflow.logger.info(f"Order created: {order_id}")

Step 2: 決済処理

payment_id = await workflow.execute_activity(

process_payment,

{"order_id": order_id, "amount": order_request["total_amount"]},

start_to_close_timeout=timedelta(seconds=60),

retry_policy=retry_policy,

)

workflow.logger.info(f"Payment processed: {payment_id}")

Step 3: 在庫予約

reservation_id = await workflow.execute_activity(

reserve_inventory,

{"order_id": order_id, "items": order_request["items"]},

start_to_close_timeout=timedelta(seconds=30),

retry_policy=retry_policy,

)

workflow.logger.info(f"Inventory reserved: {reservation_id}")

Step 4: 配送作成

tracking_number = await workflow.execute_activity(

create_shipment,

{"order_id": order_id, "address": order_request["address"]},

start_to_close_timeout=timedelta(seconds=30),

retry_policy=retry_policy,

)

workflow.logger.info(f"Shipment created: {tracking_number}")

Step 5: 注文確定

await workflow.execute_activity(

confirm_order,

{"order_id": order_id},

start_to_close_timeout=timedelta(seconds=10),

)

return {

"status": "SUCCESS",

"order_id": order_id,

"tracking_number": tracking_number

}

except Exception as e:

workflow.logger.error(f"Saga failed: {e}")

補償トランザクション実行(逆順)

if reservation_id:

await workflow.execute_activity(

release_inventory,

{"reservation_id": reservation_id},

start_to_close_timeout=timedelta(seconds=30),

)

if payment_id:

await workflow.execute_activity(

refund_payment,

{"payment_id": payment_id},

start_to_close_timeout=timedelta(seconds=60),

)

if order_id:

await workflow.execute_activity(

cancel_order,

{"order_id": order_id, "reason": str(e)},

start_to_close_timeout=timedelta(seconds=10),

)

return {"status": "FAILED", "reason": str(e)}

Activityの実装

activities.py

from temporalio import activity

@activity.defn

async def create_order(request: dict) -> str:

async with httpx.AsyncClient() as client:

response = await client.post(

"http://order-service:8080/orders",

json=request

)

response.raise_for_status()

return response.json()["order_id"]

@activity.defn

async def process_payment(request: dict) -> str:

async with httpx.AsyncClient() as client:

response = await client.post(

"http://payment-service:8080/payments",

json=request

)

response.raise_for_status()

return response.json()["payment_id"]

@activity.defn

async def reserve_inventory(request: dict) -> str:

async with httpx.AsyncClient() as client:

response = await client.post(

"http://inventory-service:8080/reservations",

json=request

)

if response.status_code == 409:

raise InsufficientInventoryError(response.json()["message"])

response.raise_for_status()

return response.json()["reservation_id"]

補償アクティビティ

@activity.defn

async def refund_payment(request: dict) -> None:

async with httpx.AsyncClient() as client:

response = await client.post(

"http://payment-service:8080/refunds",

json=request

)

response.raise_for_status()

@activity.defn

async def release_inventory(request: dict) -> None:

async with httpx.AsyncClient() as client:

response = await client.delete(

f"http://inventory-service:8080/reservations/{request['reservation_id']}"

)

response.raise_for_status()

@activity.defn

async def cancel_order(request: dict) -> None:

async with httpx.AsyncClient() as client:

response = await client.patch(

f"http://order-service:8080/orders/{request['order_id']}/cancel",

json={"reason": request["reason"]}

)

response.raise_for_status()

Temporal Workerの起動

worker.py

from temporalio.client import Client

from temporalio.worker import Worker

async def main():

client = await Client.connect("temporal:7233")

worker = Worker(

client,

task_queue="order-saga",

workflows=[OrderSagaWorkflow],

activities=[

create_order,

process_payment,

reserve_inventory,

create_shipment,

confirm_order,

refund_payment,

release_inventory,

cancel_order,

],

)

await worker.run()

if __name__ == "__main__":

asyncio.run(main())

Choreography vs Orchestration

| 項目 | Choreography | Orchestration |

| ------------ | --------------------------- | ------------------------- |

| 結合度 | 疎結合 | Orchestratorに依存 |

| 複雑度 | サービス数増加時に急増 | 中央で管理 |

| 可視性 | イベント追跡が困難 | ワークフロー状態が明確 |

| 単一障害点 | なし | Orchestrator |

| 適したケース | シンプルなフロー(3-4段階) | 複雑なフロー(5段階以上) |

| テスト | 困難 | 比較的容易 |

Semantic Lockの対策

Sagaには分離性がないため、同時実行時にデータ異常が発生する可能性があります。

Semantic Lockパターン

class OrderDB:

def create_with_lock(self, order_data):

"""注文作成時にステータスをPENDINGに設定して他のSagaの干渉を防止"""

order = Order(

**order_data,

status="PENDING", # Semantic Lock

saga_id=generate_saga_id()

)

self.session.add(order)

self.session.commit()

return order

def confirm_order(self, order_id, saga_id):

"""Saga完了時にロック解除"""

order = self.session.query(Order).filter_by(

id=order_id,

saga_id=saga_id,

status="PENDING"

).first()

if not order:

raise ConcurrencyError("Order already processed by another saga")

order.status = "CONFIRMED"

self.session.commit()

冪等性の保証

補償トランザクションが複数回実行される可能性があるため、冪等性は必須です。

冪等性キーを活用した決済処理

class PaymentService:

def process_payment(self, order_id, amount, idempotency_key):

すでに処理済みのリクエストか確認

existing = self.db.find_by_idempotency_key(idempotency_key)

if existing:

return existing # 同一結果を返却

payment = self.db.create_payment(

order_id=order_id,

amount=amount,

idempotency_key=idempotency_key

)

return payment

まとめ

Sagaパターンは、マイクロサービスにおける分散トランザクション処理の核心パターンです:

- **Choreography**: シンプルなフローに適合、イベント駆動による疎結合

- **Orchestration**: 複雑なフローに適合、Temporalなどで中央管理

- **補償トランザクション**: 失敗時に逆順で復旧

- **冪等性**: リトライの安全性のために必須

- **Semantic Lock**: 同時実行Saga間の干渉防止

**Q1. 2PCの代わりにSagaを使う主な理由は?**

2PCは同期ロックにより可用性とスケーラビリティが制限されますが、Sagaは非同期により高い可用性とスケーラビリティを提供します。

**Q2. 補償トランザクション(Compensating Transaction)とは?**

Sagaの特定のステップが失敗した際に、以前に成功したステップを元に戻す逆順のトランザクションです。

**Q3. Choreography方式の欠点は?**

サービス数が増加するとイベントフローが複雑になり、Saga全体の進行状況を把握することが困難になります。

**Q4. TemporalがSaga Orchestrationに適している理由は?**

ワークフロー状態を自動的に永続化し、失敗時のリトライと補償を宣言的に管理できます。

**Q5. Semantic Lockパターンの目的は?**

PENDINGステータスにより、他のSagaが同じデータを同時に変更することを防止します。

**Q6. 補償トランザクションで冪等性が重要な理由は?**

ネットワークエラーなどにより補償が複数回実行される可能性があるため、同一の結果を保証する必要があります。

**Q7. ChoreographyとOrchestrationのうち、どのような状況でOrchestrationを選択すべきか?**

5段階以上の複雑なフロー、条件分岐が多い場合、全体の進行状況の可視性が必要な場合です。

クイズ

Q1: 「Sagaパターンで実装するマイクロサービス分散トランザクション: Choreography vs

Orchestration」の主なトピックは何ですか?

Sagaパターンでマイクロサービスの分散トランザクションを実装します。ChoreographyとOrchestrationの違い、補償トランザクション、Temporalを活用した実践的な実装までコードとともに解説します。

オンラインショッピングモールの注文処理を例に説明します: 注文サービス: 注文作成 決済サービス:

決済処理 在庫サービス: 在庫引き当て 配送サービス: 配送作成

ステップ3(在庫引き当て)で失敗した場合、ステップ2(決済)を返金し、ステップ1(注文)をキャンセルする必要があります。

各サービスがイベントを発行し、他のサービスがイベントを購読して自身のローカルトランザクションを実行します。

イベント定義 注文サービス 決済サービス

一つのOrchestrator(Saga Coordinator)が全体のトランザクションフローを管理します。

TemporalによるSaga Orchestratorの実装 Activityの実装 Temporal Workerの起動

Sagaには分離性がないため、同時実行時にデータ異常が発生する可能性があります。

현재 단락 (1/366)

マイクロサービスアーキテクチャで最も難しい問題の一つが**分散トランザクション**です。モノリスではデータベースのACIDトランザクションで解決できましたが、各サービスが独自のデータベースを持つマイク...

작성 글자: 0원문 글자: 10,248작성 단락: 0/366