Skip to content

필사 모드: イベント駆動アーキテクチャ + CQRS + Sagaパターン: MSA実践設計ガイド

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

1. なぜイベント駆動アーキテクチャなのか

モノリスからMSAへ移行すると、**分散トランザクション**の問題が発生します:

graph LR

A[注文サービス] -->|HTTP同期呼び出し| B[決済サービス]

B -->|HTTP同期呼び出し| C[在庫サービス]

C -->|HTTP同期呼び出し| D[配送サービス]

style A fill:#f96,stroke:#333

style D fill:#6f9,stroke:#333

**同期方式の問題点:**

- 一つでも失敗すると全体が失敗(カスケード障害)

- サービス間の強結合

- 呼び出しチェーンが長くなるほどレイテンシ増加

- 部分的な失敗時にロールバックが困難

graph LR

A[注文サービス] -->|イベント発行| K[Kafka]

K -->|イベント消費| B[決済サービス]

K -->|イベント消費| C[在庫サービス]

K -->|イベント消費| D[配送サービス]

style K fill:#ff9,stroke:#333

**イベント駆動のメリット:**

- サービス間の疎結合(loose coupling)

- 非同期処理によるレスポンスタイム短縮

- 独立したスケーリングが可能

- イベントリプレイによる状態復旧が可能

2. CQRS(Command Query Responsibility Segregation)

2.1 コアコンセプト

**コマンド(Command)**と**クエリ(Query)**を分離するパターン:

graph TB

Client -->|Command: 注文作成| CmdAPI[Command API]

Client -->|Query: 注文照会| QryAPI[Query API]

CmdAPI --> WriteDB[(Write DB<br/>PostgreSQL)]

WriteDB -->|イベント発行| EventBus[Kafka]

EventBus -->|イベント消費| Projector[Projector]

Projector --> ReadDB[(Read DB<br/>Elasticsearch)]

QryAPI --> ReadDB

style CmdAPI fill:#f96,stroke:#333

style QryAPI fill:#6f9,stroke:#333

style EventBus fill:#ff9,stroke:#333

2.2 なぜ分離するのか?

| 側面 | Command(書き込み) | Query(読み取り) |

| ------------ | ---------------------- | ------------------------------------ |

| モデル | 正規化、整合性重視 | 非正規化、パフォーマンス重視 |

| スケーリング | 垂直スケーリング | 水平スケーリング(レプリケーション) |

| DB | PostgreSQL、MySQL | Elasticsearch、Redis、MongoDB |

| 最適化 | トランザクション安全性 | 読み取りパフォーマンス |

| 比率 | 全体の10〜20% | 全体の80〜90% |

2.3 実装例

Command Side

from dataclasses import dataclass

from datetime import datetime

@dataclass

class CreateOrderCommand:

customer_id: str

items: list[dict]

total_amount: float

class OrderCommandHandler:

def __init__(self, repo, event_publisher):

self.repo = repo

self.event_publisher = event_publisher

def handle_create_order(self, cmd: CreateOrderCommand):

order = Order(

id=str(uuid.uuid4()),

customer_id=cmd.customer_id,

items=cmd.items,

total_amount=cmd.total_amount,

status="PENDING",

created_at=datetime.utcnow()

)

Write DBに保存

self.repo.save(order)

イベント発行

self.event_publisher.publish("order.created", {

"order_id": order.id,

"customer_id": order.customer_id,

"total_amount": order.total_amount,

"items": order.items,

})

return order.id

Query Side

class OrderQueryService:

def __init__(self, read_repo):

self.read_repo = read_repo # Elasticsearch

def get_order(self, order_id: str):

return self.read_repo.find_by_id(order_id)

def search_orders(self, customer_id: str, status: str = None):

return self.read_repo.search(customer_id=customer_id, status=status)

Projector(イベント → Read Model同期)

class OrderProjector:

def __init__(self, read_repo):

self.read_repo = read_repo

def on_order_created(self, event):

self.read_repo.upsert({

"id": event["order_id"],

"customer_id": event["customer_id"],

"total_amount": event["total_amount"],

"status": "PENDING",

"items": event["items"],

})

def on_order_paid(self, event):

self.read_repo.update(event["order_id"], {"status": "PAID"})

3. Sagaパターン: 分散トランザクション管理

3.1 Choreography vs Orchestration

graph TB

subgraph "Choreography(イベント駆動)"

O1[注文] -->|OrderCreated| P1[決済]

P1 -->|PaymentCompleted| S1[在庫]

S1 -->|StockReserved| D1[配送]

D1 -->|DeliveryStarted| O1

end

graph TB

subgraph "Orchestration(中央調整)"

Orch[Saga Orchestrator]

Orch -->|1. 決済リクエスト| P2[決済]

P2 -->|決済完了| Orch

Orch -->|2. 在庫予約| S2[在庫]

S2 -->|予約完了| Orch

Orch -->|3. 配送リクエスト| D2[配送]

D2 -->|配送開始| Orch

end

| 方式 | メリット | デメリット |

| ------------- | ---------------------- | ---------------------------------- |

| Choreography | シンプル、疎結合 | フロー把握が困難、循環依存のリスク |

| Orchestration | フローが明確、管理容易 | 中央集中、Orchestratorが単一障害点 |

3.2 Orchestration Sagaの実装

from enum import Enum

from kafka import KafkaProducer, KafkaConsumer

class SagaStep(Enum):

PAYMENT = "payment"

INVENTORY = "inventory"

DELIVERY = "delivery"

class OrderSagaOrchestrator:

STEPS = [SagaStep.PAYMENT, SagaStep.INVENTORY, SagaStep.DELIVERY]

def __init__(self, producer: KafkaProducer):

self.producer = producer

self.saga_state = {} # order_id -> {step, status}

def start_saga(self, order_id: str, order_data: dict):

self.saga_state[order_id] = {

"current_step": 0,

"data": order_data,

"completed_steps": [],

}

self._execute_step(order_id)

def _execute_step(self, order_id: str):

state = self.saga_state[order_id]

step_idx = state["current_step"]

if step_idx >= len(self.STEPS):

すべてのステップ完了

self._publish(f"order.completed", {"order_id": order_id})

return

step = self.STEPS[step_idx]

self._publish(f"{step.value}.request", {

"order_id": order_id,

"saga_id": order_id,

**state["data"]

})

def handle_step_success(self, order_id: str, step: SagaStep):

state = self.saga_state[order_id]

state["completed_steps"].append(step)

state["current_step"] += 1

self._execute_step(order_id)

def handle_step_failure(self, order_id: str, failed_step: SagaStep):

"""補償トランザクション実行(逆順)"""

state = self.saga_state[order_id]

for step in reversed(state["completed_steps"]):

self._publish(f"{step.value}.compensate", {

"order_id": order_id,

})

self._publish("order.failed", {

"order_id": order_id,

"failed_at": failed_step.value,

})

def _publish(self, topic: str, data: dict):

self.producer.send(topic, json.dumps(data).encode())

3.3 補償トランザクションの例

sequenceDiagram

participant O as Orchestrator

participant P as 決済サービス

participant I as 在庫サービス

participant D as 配送サービス

O->>P: 1. 決済リクエスト

P-->>O: 決済成功

O->>I: 2. 在庫予約

I-->>O: 在庫予約成功

O->>D: 3. 配送リクエスト

D-->>O: 配送失敗(住所エラー)

Note over O: 補償トランザクション開始(逆順)

O->>I: 在庫予約取消

I-->>O: 取消完了

O->>P: 決済返金

P-->>O: 返金完了

O->>O: 注文失敗処理

4. Event Sourcing

4.1 コンセプト

状態を直接保存する代わりに、**状態変更イベント**を保存する:

従来の方式:

ordersテーブル: {id: 1, status: "SHIPPED", amount: 50000}

Event Sourcing:

eventsテーブル:

1. OrderCreated {amount: 50000}

2. PaymentReceived {payment_id: "pay_123"}

3. StockReserved {warehouse: "seoul"}

4. OrderShipped {tracking: "KR12345"}

→ イベントを順番にリプレイすれば現在の状態を復元

4.2 実装

class EventStore:

def __init__(self, db):

self.db = db

def append(self, aggregate_id: str, event_type: str, data: dict, version: int):

self.db.execute("""

INSERT INTO events (aggregate_id, event_type, data, version, created_at)

VALUES (%s, %s, %s, %s, NOW())

""", (aggregate_id, event_type, json.dumps(data), version))

def get_events(self, aggregate_id: str, after_version: int = 0):

return self.db.query("""

SELECT event_type, data, version

FROM events

WHERE aggregate_id = %s AND version > %s

ORDER BY version

""", (aggregate_id, after_version))

class Order:

def __init__(self):

self.id = None

self.status = None

self.version = 0

self._pending_events = []

def apply_event(self, event_type: str, data: dict):

if event_type == "OrderCreated":

self.id = data["order_id"]

self.status = "PENDING"

elif event_type == "PaymentReceived":

self.status = "PAID"

elif event_type == "OrderShipped":

self.status = "SHIPPED"

self.version += 1

@classmethod

def from_events(cls, events):

order = cls()

for e in events:

order.apply_event(e["event_type"], e["data"])

return order

5. Kafkaベースの全体アーキテクチャ

graph TB

Client[クライアント] --> Gateway[API Gateway]

Gateway --> OrderCmd[注文 Command API]

Gateway --> OrderQry[注文 Query API]

OrderCmd --> WriteDB[(PostgreSQL)]

WriteDB --> CDC[Debezium CDC]

CDC --> Kafka[Apache Kafka]

Kafka --> Saga[Saga Orchestrator]

Kafka --> Projector[CQRS Projector]

Saga --> PaymentSvc[決済サービス]

Saga --> InventorySvc[在庫サービス]

Saga --> DeliverySvc[配送サービス]

Projector --> ReadDB[(Elasticsearch)]

OrderQry --> ReadDB

PaymentSvc --> Kafka

InventorySvc --> Kafka

DeliverySvc --> Kafka

style Kafka fill:#ff9,stroke:#333

style Saga fill:#f96,stroke:#333

6. 実践上の考慮事項

6.1 冪等性(Idempotency)

イベントの重複処理防止

class IdempotentConsumer:

def __init__(self, redis_client):

self.redis = redis_client

def process_event(self, event_id: str, handler):

key = f"processed:{event_id}"

if self.redis.setnx(key, "1"):

self.redis.expire(key, 86400) # 24時間TTL

handler()

else:

pass # すでに処理済み、スキップ

6.2 イベント順序の保証

Kafkaのパーティションキーで順序保証

producer.send(

"order.events",

key=order_id.encode(), # 同じ注文 → 同じパーティション → 順序保証

value=json.dumps(event).encode()

)

7. クイズ

読み取り/書き込みの**要件が根本的に異なるため**。書き込みは正規化+トランザクション、読み取りは非正規化+パフォーマンス最適化。それぞれに最適化されたストレージとモデルを使用可能。

**Choreography**: 各サービスがイベントを発行/購読して自律的に動作。**Orchestration**: 中央のOrchestratorが各サービスに明示的に指示。Orchestrationの方がフロー把握と管理が容易。

Sagaの一つのステップが失敗した際、すでに完了したステップを**逆順で取り消す**トランザクション。例: 配送失敗 → 在庫予約取消 → 決済返金。

(1) 完全な**監査証跡(Audit Trail)** (2) イベントリプレイによる**時点復旧**が可能 (3) **過去のイベントから新しいRead Modelを構築**可能。

ネットワーク障害、リトライ、リバランシングなどにより**同じイベントが複数回配信**される可能性がある。冪等でなければ、二重決済、二重在庫引き当てなど深刻な問題が発生する。

**同じパーティションキー**を使用する。同一キーのメッセージは同じパーティションに配信され、順序が保証される。例: order_idをパーティションキーとして使用。

**Eventually Consistent** — 結果整合性。クエリ時に最新データでない可能性がある。解決策: (1) 書き込み直後はWrite DBから直接読み取り (2) イベント処理完了を確認してから応答。

クイズ

Q1: 「イベント駆動アーキテクチャ + CQRS + Sagaパターン:

MSA実践設計ガイド」の主なトピックは何ですか?

MSA環境におけるイベント駆動アーキテクチャ、CQRS(Command Query Responsibility

Segregation)、Sagaパターンを組み合わせた実践設計。Kafkaベースの実装コード、Mermaidダイアグラム、注文・決済・配送システムのケーススタディを含みます。

モノリスからMSAへ移行すると、分散トランザクションの問題が発生します: 同期方式の問題点:

一つでも失敗すると全体が失敗(カスケード障害) サービス間の強結合

呼び出しチェーンが長くなるほどレイテンシ増加 部分的な失敗時にロールバックが困難

イベント駆動のメリット: サービス間の疎結合(loose coupling) 非同期処理によるレスポンスタイム短縮

独立したスケーリングが可能 イベントリプレイによる状態復旧が可能

Q3: CQRS(Command Query Responsibility Segregation)の核心的な概念を説明してください。

2.1 コアコンセプト コマンド(Command)とクエリ(Query)を分離するパターン: 2.2 なぜ分離するのか?

2.3 実装例

3.1 Choreography vs Orchestration 3.2 Orchestration Sagaの実装 3.3 補償トランザクションの例

4.1 コンセプト 状態を直接保存する代わりに、状態変更イベントを保存する: 4.2 実装

현재 단락 (1/277)

モノリスからMSAへ移行すると、**分散トランザクション**の問題が発生します:

작성 글자: 0원문 글자: 8,490작성 단락: 0/277