- Authors
- Name
- はじめに
- 1. イベント駆動アーキテクチャ概要
- 2. Apache Kafka 基礎
- 3. CQRS (Command Query Responsibility Segregation)
- 4. Event Sourcing — イベントがデータそのもの
- 5. Sagaパターン — 分散トランザクション管理
- 6. 実践的な考慮事項
- 7. EDA導入チェックリスト
- クイズ

はじめに
マイクロサービスが増えると、サービス間の通信が複雑になります。同期(Sync)呼び出しで結合されたサービスは、1つが停止すると連鎖障害が発生します。イベント駆動アーキテクチャ(EDA)は、この問題を非同期イベントベースで解決します。本記事では、Kafkaを中心にEDAのコアパターンであるCQRSとEvent Sourcingを実践コードとともに解説します。
1. イベント駆動アーキテクチャ概要
同期 vs 非同期通信
graph LR
subgraph "同期方式 (REST)"
A[注文サービス] -->|HTTP呼び出し| B[決済サービス]
B -->|HTTP呼び出し| C[在庫サービス]
C -->|HTTP呼び出し| D[通知サービス]
end
同期方式では、決済サービスが停止すると注文も失敗します。すべてのサービスが強く結合されています。
graph LR
subgraph "非同期方式 (Event-Driven)"
A2[注文サービス] -->|イベント発行| K[Kafka]
K -->|購読| B2[決済サービス]
K -->|購読| C2[在庫サービス]
K -->|購読| D2[通知サービス]
end
非同期方式では、注文サービスはイベントを発行するだけです。各サービスが独立してイベントを消費します。
EDAのコア構成要素
| 構成要素 | 役割 | 例 |
|---|---|---|
| Producer | イベントを生成し発行する | 注文サービス |
| Event Broker | イベントを保存し配信する | Apache Kafka |
| Consumer | イベントを購読し処理する | 決済、在庫、通知サービス |
| Event | 発生した事実を表すメッセージ | OrderCreated, PaymentCompleted |
2. Apache Kafka 基礎
Kafka アーキテクチャ
graph TB
P1[Producer 1] --> T[Topic: orders]
P2[Producer 2] --> T
T --> PA0[Partition 0]
T --> PA1[Partition 1]
T --> PA2[Partition 2]
PA0 --> C1[Consumer Group A - Consumer 1]
PA1 --> C2[Consumer Group A - Consumer 2]
PA2 --> C3[Consumer Group A - Consumer 3]
PA0 --> C4[Consumer Group B - Consumer 1]
PA1 --> C4
PA2 --> C4
主要概念
- Topic: イベントが保存される論理チャネル(例:
orders、payments) - Partition: Topic内の並列処理単位。パーティションキーで順序を保証
- Consumer Group: 同じグループのコンシューマーはパーティションを分配されて並列処理
- Offset: 各パーティション内のメッセージの通し番号。コンシューマーがどこまで読んだかを追跡
Kafka Producer 実装 (Python)
from confluent_kafka import Producer
import json
config = {
'bootstrap.servers': 'kafka-broker:9092',
'client.id': 'order-service',
'acks': 'all', # すべてのレプリカへの書き込みを確認
'retries': 3,
'retry.backoff.ms': 1000,
}
producer = Producer(config)
def publish_order_event(order: dict):
"""注文イベントを発行"""
event = {
'event_type': 'OrderCreated',
'event_id': str(uuid.uuid4()),
'timestamp': datetime.utcnow().isoformat(),
'data': {
'order_id': order['id'],
'user_id': order['user_id'],
'items': order['items'],
'total_amount': order['total'],
}
}
# パーティションキー = user_id → 同じユーザーの注文は同じパーティションへ
producer.produce(
topic='orders',
key=str(order['user_id']).encode('utf-8'),
value=json.dumps(event).encode('utf-8'),
callback=delivery_callback,
)
producer.flush()
def delivery_callback(err, msg):
if err:
print(f'送信失敗: {err}')
else:
print(f'送信成功: {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
Kafka Consumer 実装 (Python)
from confluent_kafka import Consumer
config = {
'bootstrap.servers': 'kafka-broker:9092',
'group.id': 'payment-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # 手動コミットでat-least-onceを保証
}
consumer = Consumer(config)
consumer.subscribe(['orders'])
def process_events():
"""イベント消費ループ"""
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f'Error: {msg.error()}')
continue
event = json.loads(msg.value().decode('utf-8'))
try:
if event['event_type'] == 'OrderCreated':
handle_order_created(event['data'])
# 処理成功後に手動コミット
consumer.commit(msg)
except Exception as e:
print(f'処理失敗: {e}')
# DLQ(Dead Letter Queue)へ送信
send_to_dlq(event, str(e))
def handle_order_created(data: dict):
"""注文作成イベント処理 → 決済開始"""
payment = create_payment(
order_id=data['order_id'],
amount=data['total_amount'],
)
# 決済完了イベントを発行
publish_payment_event(payment)
3. CQRS (Command Query Responsibility Segregation)
CQRSは**書き込み(Command)と読み取り(Query)**のモデルを分離するパターンです。
CQRSアーキテクチャフロー
graph TB
Client[クライアント]
Client -->|Command| CmdAPI[Command API]
Client -->|Query| QryAPI[Query API]
CmdAPI --> WriteDB[(Write DB - PostgreSQL)]
CmdAPI -->|イベント発行| Kafka[Kafka]
Kafka -->|イベント消費| Projector[Projector / Denormalizer]
Projector --> ReadDB[(Read DB - Elasticsearch/Redis)]
QryAPI --> ReadDB
なぜCQRSか?
| 従来の方式 | CQRS |
|---|---|
| 1つのモデルで読み書き | 読み/書きモデルを分離 |
| 複雑なJOINで読み取り性能が低下 | 読み取りに最適化された非正規化モデル |
| スケーリングが困難 | 読み/書きを独立してスケーリング |
| シンプルなドメインに適している | 複雑なドメインに適している |
CQRS 実装例
# === Command Side(書き込み)===
class OrderCommandHandler:
def __init__(self, db, event_publisher):
self.db = db
self.publisher = event_publisher
def create_order(self, cmd: CreateOrderCommand):
"""注文作成コマンドの処理"""
# ビジネスロジックのバリデーション
if not self._validate_stock(cmd.items):
raise InsufficientStockError()
# Write DBに保存
order = Order(
id=uuid.uuid4(),
user_id=cmd.user_id,
items=cmd.items,
status='CREATED',
)
self.db.save(order)
# イベント発行(非同期でRead Modelを更新)
self.publisher.publish('orders', OrderCreatedEvent(
order_id=order.id,
user_id=order.user_id,
items=order.items,
total=order.total,
created_at=order.created_at,
))
return order.id
# === Projector(イベント → Read Model 変換)===
class OrderProjector:
def __init__(self, read_db):
self.read_db = read_db # Elasticsearch or Redis
def handle_order_created(self, event: OrderCreatedEvent):
"""注文作成イベント → 読み取りモデルに非正規化して保存"""
user = self.get_user_info(event.user_id)
# 非正規化された読み取りモデル(JOIN不要!)
order_view = {
'order_id': str(event.order_id),
'user_name': user.name,
'user_email': user.email,
'items': [
{'name': item.name, 'qty': item.qty, 'price': item.price}
for item in event.items
],
'total': event.total,
'status': 'CREATED',
'created_at': event.created_at.isoformat(),
}
self.read_db.index('orders', order_view)
# === Query Side(読み取り)===
class OrderQueryHandler:
def __init__(self, read_db):
self.read_db = read_db
def get_user_orders(self, user_id: str, page: int = 1):
"""ユーザーの注文一覧を取得(Read Modelから直接返却)"""
return self.read_db.search('orders', {
'query': {'term': {'user_id': user_id}},
'sort': [{'created_at': 'desc'}],
'from': (page - 1) * 20,
'size': 20,
})
4. Event Sourcing — イベントがデータそのもの
Event Sourcingは、現在の状態を保存する代わりに、状態変更イベントのシーケンスを保存するパターンです。
Event Sourcing フロー
sequenceDiagram
participant Client
participant OrderAggregate
participant EventStore
participant Projector
participant ReadDB
Client->>OrderAggregate: CreateOrder コマンド
OrderAggregate->>EventStore: OrderCreated イベント保存
EventStore->>Projector: イベント伝播
Projector->>ReadDB: 読み取りモデル更新
Client->>OrderAggregate: PayOrder コマンド
OrderAggregate->>EventStore: OrderPaid イベント保存
EventStore->>Projector: イベント伝播
Projector->>ReadDB: 読み取りモデル更新
Note over EventStore: イベントは絶対に削除・修正されない!
従来のCRUD vs Event Sourcing
# 従来のCRUD — 現在の状態のみ保存
orders テーブル:
| id | user_id | status | total |
|----|---------|-----------|--------|
| 1 | 100 | SHIPPED | 50000 |
→ 「いつ注文して、いつ決済されて、いつ配送されたか?」わからない!
# Event Sourcing — すべての変更履歴を保存
events テーブル:
| seq | aggregate_id | type | data | timestamp |
|-----|-------------|----------------|-------------------------|---------------------|
| 1 | order-1 | OrderCreated | {items: [...], total: 50000} | 2026-03-02 10:00 |
| 2 | order-1 | PaymentReceived| {method: "card"} | 2026-03-02 10:05 |
| 3 | order-1 | OrderShipped | {tracking: "KR123"} | 2026-03-02 14:30 |
→ 全履歴を再構成可能!
Event Sourcing 実装
from dataclasses import dataclass, field
from datetime import datetime
from typing import List
import json
# === イベント定義 ===
@dataclass
class DomainEvent:
event_id: str
aggregate_id: str
event_type: str
data: dict
timestamp: datetime
version: int
# === Aggregate(ドメインオブジェクト)===
class OrderAggregate:
def __init__(self, order_id: str):
self.id = order_id
self.status = None
self.items = []
self.total = 0
self.version = 0
self._pending_events: List[DomainEvent] = []
# --- Command Handler ---
def create(self, user_id: str, items: list):
"""注文作成コマンド"""
if self.status is not None:
raise Exception("既に作成済みの注文です")
# イベント作成(状態変更なし、イベントのみ記録)
self._apply_event(DomainEvent(
event_id=str(uuid.uuid4()),
aggregate_id=self.id,
event_type='OrderCreated',
data={'user_id': user_id, 'items': items,
'total': sum(i['price'] * i['qty'] for i in items)},
timestamp=datetime.utcnow(),
version=self.version + 1,
))
def pay(self, payment_method: str):
"""決済コマンド"""
if self.status != 'CREATED':
raise Exception(f"決済できない状態: {self.status}")
self._apply_event(DomainEvent(
event_id=str(uuid.uuid4()),
aggregate_id=self.id,
event_type='OrderPaid',
data={'payment_method': payment_method},
timestamp=datetime.utcnow(),
version=self.version + 1,
))
# --- Event Handler(状態変更はここだけ!)---
def _on_order_created(self, event: DomainEvent):
self.status = 'CREATED'
self.items = event.data['items']
self.total = event.data['total']
def _on_order_paid(self, event: DomainEvent):
self.status = 'PAID'
def _apply_event(self, event: DomainEvent):
"""イベント適用(状態変更 + イベント保存)"""
handler = {
'OrderCreated': self._on_order_created,
'OrderPaid': self._on_order_paid,
}.get(event.event_type)
if handler:
handler(event)
self.version = event.version
self._pending_events.append(event)
@classmethod
def from_events(cls, order_id: str, events: List[DomainEvent]):
"""イベントシーケンスから現在の状態を復元"""
aggregate = cls(order_id)
for event in events:
aggregate._apply_event(event)
aggregate._pending_events.clear() # 既に保存済みのイベント
return aggregate
# === Event Store ===
class EventStore:
def __init__(self, db):
self.db = db
def save(self, aggregate: OrderAggregate):
"""未保存のイベントをEvent Storeに記録"""
for event in aggregate._pending_events:
self.db.execute(
"""INSERT INTO events
(event_id, aggregate_id, event_type, data, timestamp, version)
VALUES (%s, %s, %s, %s, %s, %s)""",
(event.event_id, event.aggregate_id, event.event_type,
json.dumps(event.data), event.timestamp, event.version)
)
aggregate._pending_events.clear()
def load(self, aggregate_id: str) -> OrderAggregate:
"""Event Storeからイベントを読み込みAggregateを復元"""
rows = self.db.query(
"SELECT * FROM events WHERE aggregate_id = %s ORDER BY version",
(aggregate_id,)
)
events = [DomainEvent(**row) for row in rows]
return OrderAggregate.from_events(aggregate_id, events)
5. Sagaパターン — 分散トランザクション管理
マイクロサービスにおいて複数サービスにまたがるトランザクションを管理するパターンです。
Choreography Saga(イベントベース)
sequenceDiagram
participant Order as 注文サービス
participant Kafka
participant Payment as 決済サービス
participant Stock as 在庫サービス
participant Notify as 通知サービス
Order->>Kafka: OrderCreated
Kafka->>Payment: OrderCreated 受信
Payment->>Kafka: PaymentCompleted
Kafka->>Stock: PaymentCompleted 受信
Stock->>Kafka: StockReserved
Kafka->>Notify: StockReserved 受信
Notify->>Notify: ユーザーに通知送信
Note over Payment,Stock: 決済失敗の場合?
Payment->>Kafka: PaymentFailed
Kafka->>Order: PaymentFailed 受信
Order->>Order: 注文キャンセル(補償トランザクション)
Orchestration Saga(オーケストレーターベース)
graph TB
Orch[Saga Orchestrator]
Orch -->|1. 注文作成| Order[注文サービス]
Order -->|成功| Orch
Orch -->|2. 決済要求| Payment[決済サービス]
Payment -->|成功| Orch
Orch -->|3. 在庫引当| Stock[在庫サービス]
Stock -->|成功| Orch
Orch -->|4. 通知送信| Notify[通知サービス]
Stock -->|失敗| Orch
Orch -->|補償: 決済取消| Payment
Orch -->|補償: 注文取消| Order
2つの方式の比較
| 項目 | Choreography | Orchestration |
|---|---|---|
| 中央制御 | なし(各サービスが自律的) | オーケストレーターが制御 |
| 結合度 | 疎結合 | オーケストレーターに依存 |
| デバッグ | 難しい(イベント追跡が必要) | 比較的容易 |
| 複雑性 | サービス数増加に伴い複雑化 | オーケストレーターが複雑化 |
| 適した場面 | シンプルなフロー(3-4ステップ) | 複雑なフロー(5+ステップ) |
6. 実践的な考慮事項
冪等性 (Idempotency)
イベントが重複配信される可能性があるため、冪等性の保証が必須です:
def handle_payment_event(event: dict):
"""冪等性保証 — 同じイベントの重複処理を防止"""
event_id = event['event_id']
# 既に処理済みのイベントか確認
if is_already_processed(event_id):
logger.info(f'既に処理済みのイベント: {event_id}')
return
# ビジネスロジックの処理
process_payment(event['data'])
# 処理完了を記録
mark_as_processed(event_id)
Dead Letter Queue (DLQ)
処理に失敗したイベントを別のキューに保管:
MAX_RETRIES = 3
def consume_with_retry(event: dict, retry_count: int = 0):
try:
handle_event(event)
except Exception as e:
if retry_count < MAX_RETRIES:
# リトライ(exponential backoff)
time.sleep(2 ** retry_count)
consume_with_retry(event, retry_count + 1)
else:
# DLQへ送信
producer.produce(
topic='orders.dlq',
value=json.dumps({
'original_event': event,
'error': str(e),
'failed_at': datetime.utcnow().isoformat(),
}).encode('utf-8'),
)
イベントスキーマの進化
# v1 イベント
{"event_type": "OrderCreated", "version": 1,
"data": {"order_id": "123", "total": 50000}}
# v2 イベント(currencyフィールド追加)
{"event_type": "OrderCreated", "version": 2,
"data": {"order_id": "123", "total": 50000, "currency": "KRW"}}
# アップキャスターでv1 → v2変換
def upcast_order_created_v1_to_v2(event: dict) -> dict:
if event.get('version', 1) == 1:
event['data']['currency'] = 'KRW' # デフォルト値
event['version'] = 2
return event
7. EDA導入チェックリスト
- サービス間の同期呼び出しが障害伝播を引き起こしているか?
- 読み取り/書き込み比率が大きく異なるか? → CQRS検討
- 監査ログ/履歴追跡が必要か? → Event Sourcing検討
- イベントの冪等性処理戦略があるか?
- DLQとモニタリング/アラートが設定されているか?
- イベントのスキーマバージョン管理戦略があるか?
- Sagaパターンで分散トランザクションを管理しているか?
- イベント順序保証が必要な箇所にパーティションキーを設定したか?
クイズ
Q1: イベント駆動アーキテクチャの同期方式に対する最大の利点は?
サービス間の疎結合(loose coupling)です。あるサービスの障害が他のサービスに伝播せず、各 サービスが独立してスケーリングできます。
Q2: Kafkaにおいて同じConsumer Group内のコンシューマーはどのように動作するか?
同じConsumer Group内では、各パーティションは1つのコンシューマーにのみ割り当てられます。これにより、 パーティション単位で並列処理しながらもパーティション内の順序が保証されます。
Q3: CQRSで読み/書きモデルを分離する理由は?
読み取りと書き込みの要件が異なるためです。書き込みは正規化されたモデルでデータ整合性を、読み取りは 非正規化されたモデルでクエリ性能をそれぞれ最適化できます。
Q4: Event Sourcingで現在の状態をどのように復元するか?
Event Storeに保存されたイベントシーケンスを最初から順番にリプレイ(replay)して現在の状態を構成します。 Aggregateのfrom_events()メソッドがこの役割を担います。
Q5: Choreography SagaとOrchestration Sagaの主な違いは?
Choreographyは中央制御なしに各サービスがイベントをやり取りしながら自律的に進行し、Orchestrationは 中央のオーケストレーターが各サービスに順番にコマンドを発行します。
Q6: イベント処理において冪等性(idempotency)が重要な理由は?
ネットワークエラーやコンシューマーの再起動により、同じイベントが複数回配信される可能性があります。 冪等性を保証しなければ、重複決済や重複在庫引当などの問題が発生します。
Q7: Dead Letter Queue(DLQ)の役割は?
最大リトライ回数を超えて処理に失敗したイベントを別のキューに保管し、後で手動で確認して 再処理できるようにします。
Q8: Kafkaでパーティションキーをuser_idに設定する理由は?
同じuser_idを持つイベントが常に同じパーティションに入るため、そのユーザーのイベント順序が 保証されます。
Q9: Event Sourcingのイベントを修正または削除できるか?
原則としてできません。イベントは不変(immutable)であり、状態を変更するには新しい補償イベントを 追加する必要があります。これがEvent Sourcingの核心原則です。
Q10: CQRSの欠点は?
読み/書きモデル間のデータ整合性が即時ではない結果整合性(eventual consistency)モデルとなり、 システム複雑性が増加します。シンプルなCRUDアプリケーションには過度な設計になる場合があります。