- Authors
- Name
- 1. なぜイベント駆動アーキテクチャなのか
- 2. CQRS(Command Query Responsibility Segregation)
- 3. Sagaパターン: 分散トランザクション管理
- 4. Event Sourcing
- 5. Kafkaベースの全体アーキテクチャ
- 6. 実践上の考慮事項
- 7. クイズ
1. なぜイベント駆動アーキテクチャなのか
モノリスからMSAへ移行すると、分散トランザクションの問題が発生します:
graph LR
A[注文サービス] -->|HTTP同期呼び出し| B[決済サービス]
B -->|HTTP同期呼び出し| C[在庫サービス]
C -->|HTTP同期呼び出し| D[配送サービス]
style A fill:#f96,stroke:#333
style D fill:#6f9,stroke:#333
同期方式の問題点:
- 一つでも失敗すると全体が失敗(カスケード障害)
- サービス間の強結合
- 呼び出しチェーンが長くなるほどレイテンシ増加
- 部分的な失敗時にロールバックが困難
graph LR
A[注文サービス] -->|イベント発行| K[Kafka]
K -->|イベント消費| B[決済サービス]
K -->|イベント消費| C[在庫サービス]
K -->|イベント消費| D[配送サービス]
style K fill:#ff9,stroke:#333
イベント駆動のメリット:
- サービス間の疎結合(loose coupling)
- 非同期処理によるレスポンスタイム短縮
- 独立したスケーリングが可能
- イベントリプレイによる状態復旧が可能
2. CQRS(Command Query Responsibility Segregation)
2.1 コアコンセプト
**コマンド(Command)とクエリ(Query)**を分離するパターン:
graph TB
Client -->|Command: 注文作成| CmdAPI[Command API]
Client -->|Query: 注文照会| QryAPI[Query API]
CmdAPI --> WriteDB[(Write DB<br/>PostgreSQL)]
WriteDB -->|イベント発行| EventBus[Kafka]
EventBus -->|イベント消費| Projector[Projector]
Projector --> ReadDB[(Read DB<br/>Elasticsearch)]
QryAPI --> ReadDB
style CmdAPI fill:#f96,stroke:#333
style QryAPI fill:#6f9,stroke:#333
style EventBus fill:#ff9,stroke:#333
2.2 なぜ分離するのか?
| 側面 | Command(書き込み) | Query(読み取り) |
|---|---|---|
| モデル | 正規化、整合性重視 | 非正規化、パフォーマンス重視 |
| スケーリング | 垂直スケーリング | 水平スケーリング(レプリケーション) |
| DB | PostgreSQL、MySQL | Elasticsearch、Redis、MongoDB |
| 最適化 | トランザクション安全性 | 読み取りパフォーマンス |
| 比率 | 全体の10〜20% | 全体の80〜90% |
2.3 実装例
# Command Side
from dataclasses import dataclass
from datetime import datetime
import uuid
@dataclass
class CreateOrderCommand:
customer_id: str
items: list[dict]
total_amount: float
class OrderCommandHandler:
def __init__(self, repo, event_publisher):
self.repo = repo
self.event_publisher = event_publisher
def handle_create_order(self, cmd: CreateOrderCommand):
order = Order(
id=str(uuid.uuid4()),
customer_id=cmd.customer_id,
items=cmd.items,
total_amount=cmd.total_amount,
status="PENDING",
created_at=datetime.utcnow()
)
# Write DBに保存
self.repo.save(order)
# イベント発行
self.event_publisher.publish("order.created", {
"order_id": order.id,
"customer_id": order.customer_id,
"total_amount": order.total_amount,
"items": order.items,
})
return order.id
# Query Side
class OrderQueryService:
def __init__(self, read_repo):
self.read_repo = read_repo # Elasticsearch
def get_order(self, order_id: str):
return self.read_repo.find_by_id(order_id)
def search_orders(self, customer_id: str, status: str = None):
return self.read_repo.search(customer_id=customer_id, status=status)
# Projector(イベント → Read Model同期)
class OrderProjector:
def __init__(self, read_repo):
self.read_repo = read_repo
def on_order_created(self, event):
self.read_repo.upsert({
"id": event["order_id"],
"customer_id": event["customer_id"],
"total_amount": event["total_amount"],
"status": "PENDING",
"items": event["items"],
})
def on_order_paid(self, event):
self.read_repo.update(event["order_id"], {"status": "PAID"})
3. Sagaパターン: 分散トランザクション管理
3.1 Choreography vs Orchestration
graph TB
subgraph "Choreography(イベント駆動)"
O1[注文] -->|OrderCreated| P1[決済]
P1 -->|PaymentCompleted| S1[在庫]
S1 -->|StockReserved| D1[配送]
D1 -->|DeliveryStarted| O1
end
graph TB
subgraph "Orchestration(中央調整)"
Orch[Saga Orchestrator]
Orch -->|1. 決済リクエスト| P2[決済]
P2 -->|決済完了| Orch
Orch -->|2. 在庫予約| S2[在庫]
S2 -->|予約完了| Orch
Orch -->|3. 配送リクエスト| D2[配送]
D2 -->|配送開始| Orch
end
| 方式 | メリット | デメリット |
|---|---|---|
| Choreography | シンプル、疎結合 | フロー把握が困難、循環依存のリスク |
| Orchestration | フローが明確、管理容易 | 中央集中、Orchestratorが単一障害点 |
3.2 Orchestration Sagaの実装
from enum import Enum
from kafka import KafkaProducer, KafkaConsumer
import json
class SagaStep(Enum):
PAYMENT = "payment"
INVENTORY = "inventory"
DELIVERY = "delivery"
class OrderSagaOrchestrator:
STEPS = [SagaStep.PAYMENT, SagaStep.INVENTORY, SagaStep.DELIVERY]
def __init__(self, producer: KafkaProducer):
self.producer = producer
self.saga_state = {} # order_id -> {step, status}
def start_saga(self, order_id: str, order_data: dict):
self.saga_state[order_id] = {
"current_step": 0,
"data": order_data,
"completed_steps": [],
}
self._execute_step(order_id)
def _execute_step(self, order_id: str):
state = self.saga_state[order_id]
step_idx = state["current_step"]
if step_idx >= len(self.STEPS):
# すべてのステップ完了
self._publish(f"order.completed", {"order_id": order_id})
return
step = self.STEPS[step_idx]
self._publish(f"{step.value}.request", {
"order_id": order_id,
"saga_id": order_id,
**state["data"]
})
def handle_step_success(self, order_id: str, step: SagaStep):
state = self.saga_state[order_id]
state["completed_steps"].append(step)
state["current_step"] += 1
self._execute_step(order_id)
def handle_step_failure(self, order_id: str, failed_step: SagaStep):
"""補償トランザクション実行(逆順)"""
state = self.saga_state[order_id]
for step in reversed(state["completed_steps"]):
self._publish(f"{step.value}.compensate", {
"order_id": order_id,
})
self._publish("order.failed", {
"order_id": order_id,
"failed_at": failed_step.value,
})
def _publish(self, topic: str, data: dict):
self.producer.send(topic, json.dumps(data).encode())
3.3 補償トランザクションの例
sequenceDiagram
participant O as Orchestrator
participant P as 決済サービス
participant I as 在庫サービス
participant D as 配送サービス
O->>P: 1. 決済リクエスト
P-->>O: 決済成功
O->>I: 2. 在庫予約
I-->>O: 在庫予約成功
O->>D: 3. 配送リクエスト
D-->>O: 配送失敗(住所エラー)
Note over O: 補償トランザクション開始(逆順)
O->>I: 在庫予約取消
I-->>O: 取消完了
O->>P: 決済返金
P-->>O: 返金完了
O->>O: 注文失敗処理
4. Event Sourcing
4.1 コンセプト
状態を直接保存する代わりに、状態変更イベントを保存する:
従来の方式:
ordersテーブル: {id: 1, status: "SHIPPED", amount: 50000}
Event Sourcing:
eventsテーブル:
1. OrderCreated {amount: 50000}
2. PaymentReceived {payment_id: "pay_123"}
3. StockReserved {warehouse: "seoul"}
4. OrderShipped {tracking: "KR12345"}
→ イベントを順番にリプレイすれば現在の状態を復元
4.2 実装
class EventStore:
def __init__(self, db):
self.db = db
def append(self, aggregate_id: str, event_type: str, data: dict, version: int):
self.db.execute("""
INSERT INTO events (aggregate_id, event_type, data, version, created_at)
VALUES (%s, %s, %s, %s, NOW())
""", (aggregate_id, event_type, json.dumps(data), version))
def get_events(self, aggregate_id: str, after_version: int = 0):
return self.db.query("""
SELECT event_type, data, version
FROM events
WHERE aggregate_id = %s AND version > %s
ORDER BY version
""", (aggregate_id, after_version))
class Order:
def __init__(self):
self.id = None
self.status = None
self.version = 0
self._pending_events = []
def apply_event(self, event_type: str, data: dict):
if event_type == "OrderCreated":
self.id = data["order_id"]
self.status = "PENDING"
elif event_type == "PaymentReceived":
self.status = "PAID"
elif event_type == "OrderShipped":
self.status = "SHIPPED"
self.version += 1
@classmethod
def from_events(cls, events):
order = cls()
for e in events:
order.apply_event(e["event_type"], e["data"])
return order
5. Kafkaベースの全体アーキテクチャ
graph TB
Client[クライアント] --> Gateway[API Gateway]
Gateway --> OrderCmd[注文 Command API]
Gateway --> OrderQry[注文 Query API]
OrderCmd --> WriteDB[(PostgreSQL)]
WriteDB --> CDC[Debezium CDC]
CDC --> Kafka[Apache Kafka]
Kafka --> Saga[Saga Orchestrator]
Kafka --> Projector[CQRS Projector]
Saga --> PaymentSvc[決済サービス]
Saga --> InventorySvc[在庫サービス]
Saga --> DeliverySvc[配送サービス]
Projector --> ReadDB[(Elasticsearch)]
OrderQry --> ReadDB
PaymentSvc --> Kafka
InventorySvc --> Kafka
DeliverySvc --> Kafka
style Kafka fill:#ff9,stroke:#333
style Saga fill:#f96,stroke:#333
6. 実践上の考慮事項
6.1 冪等性(Idempotency)
# イベントの重複処理防止
class IdempotentConsumer:
def __init__(self, redis_client):
self.redis = redis_client
def process_event(self, event_id: str, handler):
key = f"processed:{event_id}"
if self.redis.setnx(key, "1"):
self.redis.expire(key, 86400) # 24時間TTL
handler()
else:
pass # すでに処理済み、スキップ
6.2 イベント順序の保証
# Kafkaのパーティションキーで順序保証
producer.send(
"order.events",
key=order_id.encode(), # 同じ注文 → 同じパーティション → 順序保証
value=json.dumps(event).encode()
)
7. クイズ
Q1. CQRSでCommandとQueryを分離する最大の理由は?
読み取り/書き込みの要件が根本的に異なるため。書き込みは正規化+トランザクション、読み取りは非正規化+パフォーマンス最適化。それぞれに最適化されたストレージとモデルを使用可能。
Q2. Saga ChoreographyとOrchestrationの違いは?
Choreography: 各サービスがイベントを発行/購読して自律的に動作。Orchestration: 中央のOrchestratorが各サービスに明示的に指示。Orchestrationの方がフロー把握と管理が容易。
Q3. 補償トランザクションとは?
Sagaの一つのステップが失敗した際、すでに完了したステップを逆順で取り消すトランザクション。例: 配送失敗 → 在庫予約取消 → 決済返金。
Q4. Event Sourcingの最大のメリットは?
(1) 完全な監査証跡(Audit Trail) (2) イベントリプレイによる時点復旧が可能 (3) 過去のイベントから新しいRead Modelを構築可能。
Q5. イベント処理で冪等性が重要な理由は?
ネットワーク障害、リトライ、リバランシングなどにより同じイベントが複数回配信される可能性がある。冪等でなければ、二重決済、二重在庫引き当てなど深刻な問題が発生する。
Q6. Kafkaでイベント順序を保証する方法は?
同じパーティションキーを使用する。同一キーのメッセージは同じパーティションに配信され、順序が保証される。例: order_idをパーティションキーとして使用。
Q7. CQRSでRead DBがWrite DBより遅れて更新される問題は?
Eventually Consistent — 結果整合性。クエリ時に最新データでない可能性がある。解決策: (1) 書き込み直後はWrite DBから直接読み取り (2) イベント処理完了を確認してから応答。