1. なぜイベント駆動アーキテクチャなのか
モノリスからMSAへ移行すると、**分散トランザクション**の問題が発生します:
graph LR
A[注文サービス] -->|HTTP同期呼び出し| B[決済サービス]
B -->|HTTP同期呼び出し| C[在庫サービス]
C -->|HTTP同期呼び出し| D[配送サービス]
style A fill:#f96,stroke:#333
style D fill:#6f9,stroke:#333
**同期方式の問題点:**
- 一つでも失敗すると全体が失敗(カスケード障害)
- サービス間の強結合
- 呼び出しチェーンが長くなるほどレイテンシ増加
- 部分的な失敗時にロールバックが困難
graph LR
A[注文サービス] -->|イベント発行| K[Kafka]
K -->|イベント消費| B[決済サービス]
K -->|イベント消費| C[在庫サービス]
K -->|イベント消費| D[配送サービス]
style K fill:#ff9,stroke:#333
**イベント駆動のメリット:**
- サービス間の疎結合(loose coupling)
- 非同期処理によるレスポンスタイム短縮
- 独立したスケーリングが可能
- イベントリプレイによる状態復旧が可能
2. CQRS(Command Query Responsibility Segregation)
2.1 コアコンセプト
**コマンド(Command)**と**クエリ(Query)**を分離するパターン:
graph TB
Client -->|Command: 注文作成| CmdAPI[Command API]
Client -->|Query: 注文照会| QryAPI[Query API]
CmdAPI --> WriteDB[(Write DB<br/>PostgreSQL)]
WriteDB -->|イベント発行| EventBus[Kafka]
EventBus -->|イベント消費| Projector[Projector]
Projector --> ReadDB[(Read DB<br/>Elasticsearch)]
QryAPI --> ReadDB
style CmdAPI fill:#f96,stroke:#333
style QryAPI fill:#6f9,stroke:#333
style EventBus fill:#ff9,stroke:#333
2.2 なぜ分離するのか?
| 側面 | Command(書き込み) | Query(読み取り) |
| ------------ | ---------------------- | ------------------------------------ |
| モデル | 正規化、整合性重視 | 非正規化、パフォーマンス重視 |
| スケーリング | 垂直スケーリング | 水平スケーリング(レプリケーション) |
| DB | PostgreSQL、MySQL | Elasticsearch、Redis、MongoDB |
| 最適化 | トランザクション安全性 | 読み取りパフォーマンス |
| 比率 | 全体の10〜20% | 全体の80〜90% |
2.3 実装例
Command Side
from dataclasses import dataclass
from datetime import datetime
@dataclass
class CreateOrderCommand:
customer_id: str
items: list[dict]
total_amount: float
class OrderCommandHandler:
def __init__(self, repo, event_publisher):
self.repo = repo
self.event_publisher = event_publisher
def handle_create_order(self, cmd: CreateOrderCommand):
order = Order(
id=str(uuid.uuid4()),
customer_id=cmd.customer_id,
items=cmd.items,
total_amount=cmd.total_amount,
status="PENDING",
created_at=datetime.utcnow()
)
Write DBに保存
self.repo.save(order)
イベント発行
self.event_publisher.publish("order.created", {
"order_id": order.id,
"customer_id": order.customer_id,
"total_amount": order.total_amount,
"items": order.items,
})
return order.id
Query Side
class OrderQueryService:
def __init__(self, read_repo):
self.read_repo = read_repo # Elasticsearch
def get_order(self, order_id: str):
return self.read_repo.find_by_id(order_id)
def search_orders(self, customer_id: str, status: str = None):
return self.read_repo.search(customer_id=customer_id, status=status)
Projector(イベント → Read Model同期)
class OrderProjector:
def __init__(self, read_repo):
self.read_repo = read_repo
def on_order_created(self, event):
self.read_repo.upsert({
"id": event["order_id"],
"customer_id": event["customer_id"],
"total_amount": event["total_amount"],
"status": "PENDING",
"items": event["items"],
})
def on_order_paid(self, event):
self.read_repo.update(event["order_id"], {"status": "PAID"})
3. Sagaパターン: 分散トランザクション管理
3.1 Choreography vs Orchestration
graph TB
subgraph "Choreography(イベント駆動)"
O1[注文] -->|OrderCreated| P1[決済]
P1 -->|PaymentCompleted| S1[在庫]
S1 -->|StockReserved| D1[配送]
D1 -->|DeliveryStarted| O1
end
graph TB
subgraph "Orchestration(中央調整)"
Orch[Saga Orchestrator]
Orch -->|1. 決済リクエスト| P2[決済]
P2 -->|決済完了| Orch
Orch -->|2. 在庫予約| S2[在庫]
S2 -->|予約完了| Orch
Orch -->|3. 配送リクエスト| D2[配送]
D2 -->|配送開始| Orch
end
| 方式 | メリット | デメリット |
| ------------- | ---------------------- | ---------------------------------- |
| Choreography | シンプル、疎結合 | フロー把握が困難、循環依存のリスク |
| Orchestration | フローが明確、管理容易 | 中央集中、Orchestratorが単一障害点 |
3.2 Orchestration Sagaの実装
from enum import Enum
from kafka import KafkaProducer, KafkaConsumer
class SagaStep(Enum):
PAYMENT = "payment"
INVENTORY = "inventory"
DELIVERY = "delivery"
class OrderSagaOrchestrator:
STEPS = [SagaStep.PAYMENT, SagaStep.INVENTORY, SagaStep.DELIVERY]
def __init__(self, producer: KafkaProducer):
self.producer = producer
self.saga_state = {} # order_id -> {step, status}
def start_saga(self, order_id: str, order_data: dict):
self.saga_state[order_id] = {
"current_step": 0,
"data": order_data,
"completed_steps": [],
}
self._execute_step(order_id)
def _execute_step(self, order_id: str):
state = self.saga_state[order_id]
step_idx = state["current_step"]
if step_idx >= len(self.STEPS):
すべてのステップ完了
self._publish(f"order.completed", {"order_id": order_id})
return
step = self.STEPS[step_idx]
self._publish(f"{step.value}.request", {
"order_id": order_id,
"saga_id": order_id,
**state["data"]
})
def handle_step_success(self, order_id: str, step: SagaStep):
state = self.saga_state[order_id]
state["completed_steps"].append(step)
state["current_step"] += 1
self._execute_step(order_id)
def handle_step_failure(self, order_id: str, failed_step: SagaStep):
"""補償トランザクション実行(逆順)"""
state = self.saga_state[order_id]
for step in reversed(state["completed_steps"]):
self._publish(f"{step.value}.compensate", {
"order_id": order_id,
})
self._publish("order.failed", {
"order_id": order_id,
"failed_at": failed_step.value,
})
def _publish(self, topic: str, data: dict):
self.producer.send(topic, json.dumps(data).encode())
3.3 補償トランザクションの例
sequenceDiagram
participant O as Orchestrator
participant P as 決済サービス
participant I as 在庫サービス
participant D as 配送サービス
O->>P: 1. 決済リクエスト
P-->>O: 決済成功
O->>I: 2. 在庫予約
I-->>O: 在庫予約成功
O->>D: 3. 配送リクエスト
D-->>O: 配送失敗(住所エラー)
Note over O: 補償トランザクション開始(逆順)
O->>I: 在庫予約取消
I-->>O: 取消完了
O->>P: 決済返金
P-->>O: 返金完了
O->>O: 注文失敗処理
4. Event Sourcing
4.1 コンセプト
状態を直接保存する代わりに、**状態変更イベント**を保存する:
従来の方式:
ordersテーブル: {id: 1, status: "SHIPPED", amount: 50000}
Event Sourcing:
eventsテーブル:
1. OrderCreated {amount: 50000}
2. PaymentReceived {payment_id: "pay_123"}
3. StockReserved {warehouse: "seoul"}
4. OrderShipped {tracking: "KR12345"}
→ イベントを順番にリプレイすれば現在の状態を復元
4.2 実装
class EventStore:
def __init__(self, db):
self.db = db
def append(self, aggregate_id: str, event_type: str, data: dict, version: int):
self.db.execute("""
INSERT INTO events (aggregate_id, event_type, data, version, created_at)
VALUES (%s, %s, %s, %s, NOW())
""", (aggregate_id, event_type, json.dumps(data), version))
def get_events(self, aggregate_id: str, after_version: int = 0):
return self.db.query("""
SELECT event_type, data, version
FROM events
WHERE aggregate_id = %s AND version > %s
ORDER BY version
""", (aggregate_id, after_version))
class Order:
def __init__(self):
self.id = None
self.status = None
self.version = 0
self._pending_events = []
def apply_event(self, event_type: str, data: dict):
if event_type == "OrderCreated":
self.id = data["order_id"]
self.status = "PENDING"
elif event_type == "PaymentReceived":
self.status = "PAID"
elif event_type == "OrderShipped":
self.status = "SHIPPED"
self.version += 1
@classmethod
def from_events(cls, events):
order = cls()
for e in events:
order.apply_event(e["event_type"], e["data"])
return order
5. Kafkaベースの全体アーキテクチャ
graph TB
Client[クライアント] --> Gateway[API Gateway]
Gateway --> OrderCmd[注文 Command API]
Gateway --> OrderQry[注文 Query API]
OrderCmd --> WriteDB[(PostgreSQL)]
WriteDB --> CDC[Debezium CDC]
CDC --> Kafka[Apache Kafka]
Kafka --> Saga[Saga Orchestrator]
Kafka --> Projector[CQRS Projector]
Saga --> PaymentSvc[決済サービス]
Saga --> InventorySvc[在庫サービス]
Saga --> DeliverySvc[配送サービス]
Projector --> ReadDB[(Elasticsearch)]
OrderQry --> ReadDB
PaymentSvc --> Kafka
InventorySvc --> Kafka
DeliverySvc --> Kafka
style Kafka fill:#ff9,stroke:#333
style Saga fill:#f96,stroke:#333
6. 実践上の考慮事項
6.1 冪等性(Idempotency)
イベントの重複処理防止
class IdempotentConsumer:
def __init__(self, redis_client):
self.redis = redis_client
def process_event(self, event_id: str, handler):
key = f"processed:{event_id}"
if self.redis.setnx(key, "1"):
self.redis.expire(key, 86400) # 24時間TTL
handler()
else:
pass # すでに処理済み、スキップ
6.2 イベント順序の保証
Kafkaのパーティションキーで順序保証
producer.send(
"order.events",
key=order_id.encode(), # 同じ注文 → 同じパーティション → 順序保証
value=json.dumps(event).encode()
)
7. クイズ
読み取り/書き込みの**要件が根本的に異なるため**。書き込みは正規化+トランザクション、読み取りは非正規化+パフォーマンス最適化。それぞれに最適化されたストレージとモデルを使用可能。
**Choreography**: 各サービスがイベントを発行/購読して自律的に動作。**Orchestration**: 中央のOrchestratorが各サービスに明示的に指示。Orchestrationの方がフロー把握と管理が容易。
Sagaの一つのステップが失敗した際、すでに完了したステップを**逆順で取り消す**トランザクション。例: 配送失敗 → 在庫予約取消 → 決済返金。
(1) 完全な**監査証跡(Audit Trail)** (2) イベントリプレイによる**時点復旧**が可能 (3) **過去のイベントから新しいRead Modelを構築**可能。
ネットワーク障害、リトライ、リバランシングなどにより**同じイベントが複数回配信**される可能性がある。冪等でなければ、二重決済、二重在庫引き当てなど深刻な問題が発生する。
**同じパーティションキー**を使用する。同一キーのメッセージは同じパーティションに配信され、順序が保証される。例: order_idをパーティションキーとして使用。
**Eventually Consistent** — 結果整合性。クエリ時に最新データでない可能性がある。解決策: (1) 書き込み直後はWrite DBから直接読み取り (2) イベント処理完了を確認してから応答。
クイズ
Q1: 「イベント駆動アーキテクチャ + CQRS + Sagaパターン:
MSA実践設計ガイド」の主なトピックは何ですか?
MSA環境におけるイベント駆動アーキテクチャ、CQRS(Command Query Responsibility
Segregation)、Sagaパターンを組み合わせた実践設計。Kafkaベースの実装コード、Mermaidダイアグラム、注文・決済・配送システムのケーススタディを含みます。
モノリスからMSAへ移行すると、分散トランザクションの問題が発生します: 同期方式の問題点:
一つでも失敗すると全体が失敗(カスケード障害) サービス間の強結合
呼び出しチェーンが長くなるほどレイテンシ増加 部分的な失敗時にロールバックが困難
イベント駆動のメリット: サービス間の疎結合(loose coupling) 非同期処理によるレスポンスタイム短縮
独立したスケーリングが可能 イベントリプレイによる状態復旧が可能
Q3: CQRS(Command Query Responsibility Segregation)の核心的な概念を説明してください。
2.1 コアコンセプト コマンド(Command)とクエリ(Query)を分離するパターン: 2.2 なぜ分離するのか?
2.3 実装例
3.1 Choreography vs Orchestration 3.2 Orchestration Sagaの実装 3.3 補償トランザクションの例
4.1 コンセプト 状態を直接保存する代わりに、状態変更イベントを保存する: 4.2 実装
현재 단락 (1/277)
モノリスからMSAへ移行すると、**分散トランザクション**の問題が発生します: