- Authors
- Name

- イベント駆動アーキテクチャにガバナンスが必要な理由
- イベントスキーマガバナンス
- オーナーシップガバナンス:このイベントは誰が責任を持つのか
- 品質ガバナンス:SLOベースのイベント処理モニタリング
- コストガバナンス:イベントインフラコストの可視化
- イベントカタログ:ディスカバリー問題の解決
- ガバナンス導入ロードマップ
- 実践トラブルシューティング
- 参考資料
イベント駆動アーキテクチャにガバナンスが必要な理由
Event-Driven Architecture(EDA)はサービス間の結合度を低減し、スケーラビリティを向上させる強力なパターンである。しかし、ガバナンスのないEDAは急速に混沌に陥る。
チームAがuser.updatedイベントのフィールド名を変更したところ、このイベントをサブスクライブしていたチームB、C、Dのサービスが同時に壊れる。誰がこのイベントを消費しているか誰も知らず、スキーマ変更が予告なくデプロイされる。Dead Letter Queueに数千件の処理失敗イベントが溜まっているが、それが誰の責任なのか不明確である。
このような状況はEDAの問題ではなく、ガバナンスの不在の問題である。ガバナンスとは、ルールを作り強制する体系を意味する。この記事では、イベント駆動システムで実務的に必要なガバナンスの4つの軸を扱う。
- イベントスキーマガバナンス:イベントの形式と進化のルール
- オーナーシップガバナンス:イベントの生産者/消費者の責任境界
- 品質ガバナンス:イベント処理のSLOとモニタリング
- コストガバナンス:イベントインフラのコスト可視性と統制
イベントスキーマガバナンス
スキーマレジストリの導入
イベントスキーマをコード内に暗黙的に置くと、変更の影響度を把握できない。スキーマを中央レジストリで管理し、CIで互換性を強制すべきである。
Confluent Schema Registry、AWS Glue Schema Registry、または自前で構築したGitベースのレジストリを使用できる。ここではGitベースの軽量なアプローチを示す。
schemas/
├── order/
│ ├── order.created.v1.json
│ ├── order.created.v2.json # v1とBACKWARD互換
│ ├── order.confirmed.v1.json
│ └── order.cancelled.v1.json
├── payment/
│ ├── payment.reserved.v1.json
│ └── payment.completed.v1.json
├── inventory/
│ └── inventory.reserved.v1.json
└── _meta/
├── compatibility-rules.yaml # 互換性ポリシー
└── ownership.yaml # イベントオーナーシップマッピング
互換性チェックの自動化
スキーマ変更が既存の消費者を壊さないことをCIで自動検証する。
"""
イベントスキーマ互換性チェッカー。
JSON SchemaのBACKWARD互換性を検証する。
BACKWARD互換 = 新スキーマで生産されたメッセージを
以前のスキーマの消費者が処理できる。
ルール:
- 新フィールド追加: OK(optionalであること)
- 既存requiredフィールド削除: FAIL
- 既存フィールドの型変更: FAIL
- requiredフィールド追加: FAIL(既存の消費者がこのフィールドを送信しない)
"""
import json
import sys
from pathlib import Path
from dataclasses import dataclass
@dataclass
class CompatibilityIssue:
severity: str # "ERROR" or "WARNING"
path: str # JSON path
message: str
def check_backward_compatibility(
old_schema: dict,
new_schema: dict,
) -> list[CompatibilityIssue]:
"""以前のスキーマと新スキーマのBACKWARD互換性を検査する。"""
issues = []
old_props = old_schema.get("properties", {})
new_props = new_schema.get("properties", {})
old_required = set(old_schema.get("required", []))
new_required = set(new_schema.get("required", []))
# 1. 既存フィールドが削除されたか確認
for field_name in old_props:
if field_name not in new_props:
issues.append(CompatibilityIssue(
severity="ERROR",
path=f"$.properties.{field_name}",
message=f"Field '{field_name}' was removed. "
f"Existing consumers may depend on this field.",
))
# 2. 既存フィールドの型が変更されたか確認
for field_name in old_props:
if field_name in new_props:
old_type = old_props[field_name].get("type")
new_type = new_props[field_name].get("type")
if old_type and new_type and old_type != new_type:
issues.append(CompatibilityIssue(
severity="ERROR",
path=f"$.properties.{field_name}.type",
message=f"Type of '{field_name}' changed from "
f"'{old_type}' to '{new_type}'.",
))
# 3. 新たにrequiredになったフィールドがあるか確認
new_required_fields = new_required - old_required
for field_name in new_required_fields:
if field_name not in old_props:
issues.append(CompatibilityIssue(
severity="ERROR",
path=f"$.required.{field_name}",
message=f"New required field '{field_name}' added. "
f"Existing producers don't include this field.",
))
# 4. 新しいoptionalフィールドの追加はWARNINGとして記録
for field_name in new_props:
if field_name not in old_props:
if field_name not in new_required:
issues.append(CompatibilityIssue(
severity="WARNING",
path=f"$.properties.{field_name}",
message=f"New optional field '{field_name}' added. "
f"Consumers should handle missing field gracefully.",
))
# ネストされたobjectのpropertiesも再帰的に検査
for field_name in old_props:
if field_name in new_props:
old_field = old_props[field_name]
new_field = new_props[field_name]
if old_field.get("type") == "object" and new_field.get("type") == "object":
nested_issues = check_backward_compatibility(old_field, new_field)
for issue in nested_issues:
issue.path = f"$.properties.{field_name}{issue.path[1:]}"
issues.append(issue)
return issues
def validate_schema_evolution(schema_dir: str) -> bool:
"""ディレクトリ内のすべてのスキーマのバージョン間互換性を検証する。"""
schema_path = Path(schema_dir)
all_pass = True
for domain_dir in sorted(schema_path.iterdir()):
if not domain_dir.is_dir() or domain_dir.name.startswith("_"):
continue
# 同じイベントタイプのバージョンをグルーピング
event_versions: dict[str, list[Path]] = {}
for schema_file in sorted(domain_dir.glob("*.json")):
# event_name.v1.json -> event_name
parts = schema_file.stem.rsplit(".v", 1)
if len(parts) == 2:
event_name = parts[0]
event_versions.setdefault(event_name, []).append(schema_file)
# 連続バージョン間の互換性検査
for event_name, versions in event_versions.items():
for i in range(len(versions) - 1):
old = json.loads(versions[i].read_text())
new = json.loads(versions[i + 1].read_text())
issues = check_backward_compatibility(old, new)
errors = [i for i in issues if i.severity == "ERROR"]
if errors:
all_pass = False
print(f"\nFAIL: {versions[i].name} -> {versions[i+1].name}")
for err in errors:
print(f" [{err.severity}] {err.path}: {err.message}")
return all_pass
if __name__ == "__main__":
schema_dir = sys.argv[1] if len(sys.argv) > 1 else "schemas"
success = validate_schema_evolution(schema_dir)
sys.exit(0 if success else 1)
オーナーシップガバナンス:このイベントは誰が責任を持つのか
イベントの生産者と消費者が明確でないと、障害時に責任の空白が発生する。「このイベント誰が発行しているの?」「このDLQの処理は誰がやるべき?」といった質問に即座に答えられなければならない。
# schemas/_meta/ownership.yaml
# イベントオーナーシップマッピング。このファイルはすべてのイベントのオーナーシップ関係を定義する。
events:
order.created:
producer:
team: order-team
service: order-service
slack_channel: '#order-alerts'
oncall_rotation: 'order-oncall'
consumers:
- team: payment-team
service: payment-service
purpose: '決済予約トリガー'
slo_processing_time_ms: 500
- team: inventory-team
service: inventory-service
purpose: '在庫予約トリガー'
slo_processing_time_ms: 1000
- team: analytics-team
service: analytics-pipeline
purpose: '注文分析データ収集'
slo_processing_time_ms: 60000 # リアルタイムではなく準リアルタイム
order.cancelled:
producer:
team: order-team
service: order-service
slack_channel: '#order-alerts'
oncall_rotation: 'order-oncall'
consumers:
- team: payment-team
service: payment-service
purpose: '決済返金トリガー'
slo_processing_time_ms: 500
- team: inventory-team
service: inventory-service
purpose: '在庫解放'
slo_processing_time_ms: 1000
payment.completed:
producer:
team: payment-team
service: payment-service
slack_channel: '#payment-alerts'
oncall_rotation: 'payment-oncall'
consumers:
- team: order-team
service: order-service
purpose: '注文ステータスをconfirmedに更新'
slo_processing_time_ms: 500
- team: notification-team
service: notification-service
purpose: '決済完了通知送信'
slo_processing_time_ms: 3000
governance_rules:
# スキーマ変更時はすべての消費者チームに事前通知必須
schema_change_notice_days: 7
# 新しい消費者登録時は生産者チームの承認必須
consumer_registration: requires_producer_approval
# 消費者がSLOを守れない場合にアラート
slo_violation_alert: true
このオーナーシップファイルを基に、スキーマ変更PRが上がった際に自動的に消費者チームにレビューリクエストを送るCI自動化を構築できる。
"""
スキーマ変更時に影響を受ける消費者チームに自動通知するスクリプト。
git diffで変更されたスキーマファイルを検出し、
ownership.yamlから消費者チームを照会してSlack通知を送信する。
"""
import yaml
import subprocess
from pathlib import Path
def get_changed_schemas() -> list[str]:
"""git diffから変更されたスキーマファイル一覧を返す。"""
result = subprocess.run(
["git", "diff", "--name-only", "origin/main", "--", "schemas/"],
capture_output=True, text=True,
)
return [
line.strip() for line in result.stdout.splitlines()
if line.strip().endswith(".json")
]
def load_ownership() -> dict:
with open("schemas/_meta/ownership.yaml") as f:
return yaml.safe_load(f)
def find_affected_consumers(
changed_files: list[str],
ownership: dict,
) -> dict[str, list[dict]]:
"""変更されたスキーマの消費者チーム一覧を返す。"""
affected: dict[str, list[dict]] = {}
for filepath in changed_files:
# schemas/order/order.created.v2.json -> order.created
parts = Path(filepath).stem.rsplit(".v", 1)
if len(parts) != 2:
continue
event_type = parts[0].replace("/", ".")
# domain/event_name -> domain.event_name形式に変換
path_parts = Path(filepath).parts
if len(path_parts) >= 3:
event_type = f"{path_parts[1]}.{parts[0].split('.')[-1] if '.' in parts[0] else parts[0]}"
event_info = ownership.get("events", {}).get(event_type, {})
consumers = event_info.get("consumers", [])
if consumers:
affected[event_type] = consumers
return affected
def notify_consumers(affected: dict[str, list[dict]]):
"""影響を受ける消費者チームにSlack通知を送信する。"""
for event_type, consumers in affected.items():
teams = set(c["team"] for c in consumers)
channels = set()
for team_name in teams:
# チーム別Slackチャンネル照会(実際にはownershipから)
channels.add(f"#{team_name}-alerts")
message = (
f"Schema change detected for `{event_type}`.\n"
f"Affected consumer teams: {', '.join(teams)}\n"
f"Please review the schema change PR for backward compatibility."
)
print(f"[NOTIFY] {channels}: {message}")
# 実際にはSlack API呼び出し
if __name__ == "__main__":
changed = get_changed_schemas()
if not changed:
print("No schema changes detected.")
else:
ownership = load_ownership()
affected = find_affected_consumers(changed, ownership)
if affected:
notify_consumers(affected)
else:
print("Changed schemas have no registered consumers.")
品質ガバナンス:SLOベースのイベント処理モニタリング
イベント処理の品質を測定するには、3つのSLI(Service Level Indicator)が必要である。
- 処理レイテンシー(Processing Latency):イベント発行から消費完了までの時間
- 処理成功率(Success Rate):イベント処理の成功率
- DLQ滞留時間(DLQ Dwell Time):DLQに入ったイベントが解決されるまでの時間
-- イベント処理SLOダッシュボードクエリ
-- 1. 消費者別処理レイテンシーSLO達成率
-- SLO: 99%のイベントが定義された時間内に処理されること
SELECT
consumer_service,
event_type,
COUNT(*) AS total_events,
PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY processing_latency_ms) AS p50_ms,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY processing_latency_ms) AS p99_ms,
-- ownership.yamlに定義されたSLOと比較
ROUND(
100.0 * SUM(
CASE WHEN processing_latency_ms <= slo_target_ms THEN 1 ELSE 0 END
) / COUNT(*),
2
) AS slo_achievement_pct
FROM event_processing_log epl
JOIN event_slo_targets est
ON epl.consumer_service = est.consumer_service
AND epl.event_type = est.event_type
WHERE epl.processed_at >= NOW() - INTERVAL '24 hours'
GROUP BY consumer_service, event_type
ORDER BY slo_achievement_pct ASC;
-- 2. DLQ現況:チーム別未処理イベント
SELECT
owner_team,
event_type,
COUNT(*) AS dlq_count,
MIN(entered_dlq_at) AS oldest_entry,
EXTRACT(EPOCH FROM (NOW() - MIN(entered_dlq_at))) / 3600 AS oldest_hours,
-- 24時間以上経過した件数(P1アラート対象)
SUM(CASE WHEN entered_dlq_at < NOW() - INTERVAL '24 hours' THEN 1 ELSE 0 END)
AS stale_count
FROM dead_letter_queue dlq
JOIN event_ownership eo ON dlq.event_type = eo.event_type
WHERE dlq.resolved_at IS NULL
GROUP BY owner_team, event_type
ORDER BY oldest_hours DESC;
-- 3. 週次イベント処理トレンド
SELECT
date_trunc('day', processed_at) AS day,
event_type,
COUNT(*) AS total,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS successes,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS failures,
SUM(CASE WHEN status = 'dlq' THEN 1 ELSE 0 END) AS dlq_entries,
ROUND(
100.0 * SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) / COUNT(*),
2
) AS success_rate_pct
FROM event_processing_log
WHERE processed_at >= NOW() - INTERVAL '7 days'
GROUP BY 1, 2
ORDER BY 1 DESC, success_rate_pct ASC;
コストガバナンス:イベントインフラコストの可視化
Kafkaクラスター、イベント処理コンピューティング、ストレージコストはイベントボリュームに比例して増加する。チーム別にコストをタグ付けし可視化しなければ、コスト最適化の出発点を見つけることができない。
"""
イベントインフラコストタグ付けおよびレポート生成器。
各チームが生産/消費するイベントボリュームと
該当インフラコストをマッピングし、FinOpsダッシュボードに提供する。
"""
from dataclasses import dataclass
from typing import Optional
@dataclass
class EventCostEntry:
team: str
event_type: str
role: str # "producer" or "consumer"
daily_volume: int
avg_payload_bytes: int
estimated_daily_cost_usd: float
@dataclass
class CostBreakdown:
# Kafkaコスト構成要素
broker_storage_usd: float # メッセージ保管ストレージ
broker_network_usd: float # ネットワーク転送
consumer_compute_usd: float # 消費者コンピューティングリソース
schema_registry_usd: float # スキーマレジストリ運用
@property
def total_usd(self) -> float:
return (
self.broker_storage_usd
+ self.broker_network_usd
+ self.consumer_compute_usd
+ self.schema_registry_usd
)
class EventCostCalculator:
"""イベントベースインフラコスト算出器。
単位コスト定数に基づいてチーム別/イベント別コストを推定する。
実際のクラウドコストと定期的にキャリブレーションが必要。
"""
# 単位コスト定数(AWS MSK基準の推定)
STORAGE_PER_GB_DAY_USD = 0.10
NETWORK_PER_GB_USD = 0.05
CONSUMER_PER_MILLION_EVENTS_USD = 0.50
RETENTION_DAYS = 7
def calculate_team_cost(
self,
entries: list[EventCostEntry],
) -> dict[str, CostBreakdown]:
"""チーム別日次コストを算出する。"""
team_costs: dict[str, CostBreakdown] = {}
for entry in entries:
daily_data_gb = (
entry.daily_volume * entry.avg_payload_bytes
) / (1024 ** 3)
cost = CostBreakdown(
broker_storage_usd=daily_data_gb * self.STORAGE_PER_GB_DAY_USD * self.RETENTION_DAYS,
broker_network_usd=daily_data_gb * self.NETWORK_PER_GB_USD,
consumer_compute_usd=(
entry.daily_volume / 1_000_000
) * self.CONSUMER_PER_MILLION_EVENTS_USD if entry.role == "consumer" else 0,
schema_registry_usd=0.01, # 固定分割
)
if entry.team in team_costs:
existing = team_costs[entry.team]
team_costs[entry.team] = CostBreakdown(
broker_storage_usd=existing.broker_storage_usd + cost.broker_storage_usd,
broker_network_usd=existing.broker_network_usd + cost.broker_network_usd,
consumer_compute_usd=existing.consumer_compute_usd + cost.consumer_compute_usd,
schema_registry_usd=existing.schema_registry_usd + cost.schema_registry_usd,
)
else:
team_costs[entry.team] = cost
return team_costs
def generate_report(self, team_costs: dict[str, CostBreakdown]) -> str:
"""チーム別コストレポートをテキストで生成する。"""
lines = ["=== Event Infrastructure Daily Cost Report ===\n"]
total = 0.0
for team, cost in sorted(team_costs.items(), key=lambda x: x[1].total_usd, reverse=True):
lines.append(f"Team: {team}")
lines.append(f" Storage: ${cost.broker_storage_usd:.2f}")
lines.append(f" Network: ${cost.broker_network_usd:.2f}")
lines.append(f" Compute: ${cost.consumer_compute_usd:.2f}")
lines.append(f" Registry: ${cost.schema_registry_usd:.2f}")
lines.append(f" TOTAL: ${cost.total_usd:.2f}\n")
total += cost.total_usd
lines.append(f"=== Grand Total: ${total:.2f}/day (${total*30:.2f}/month est.) ===")
return "\n".join(lines)
イベントカタログ:ディスカバリー問題の解決
「私たちのシステムにどんなイベントがあるの?」「このイベントを誰が発行して誰が消費しているの?」
イベント数が50を超えると、これらの質問に即座に答えるのが難しくなる。イベントカタログはすべてのイベントのメタデータを一箇所で検索可能にする。
# event-catalog.yaml
# イベントカタログ:すべてのイベントのメタデータ中央管理
catalog:
- event_type: order.created
domain: order
description: '顧客が新しい注文を作成した時に発行される。'
schema_path: schemas/order/order.created.v2.json
current_version: 2
topic: order-events
partition_key: order_id
avg_daily_volume: 150000
avg_payload_bytes: 1200
retention_days: 7
producer:
service: order-service
team: order-team
consumers:
- service: payment-service
team: payment-team
- service: inventory-service
team: inventory-team
- service: analytics-pipeline
team: analytics-team
tags: ['critical', 'transactional']
created_at: '2024-06-15'
last_schema_change: '2025-11-20'
- event_type: order.cancelled
domain: order
description: '注文がキャンセルされた時に発行される。理由フィールドを含む。'
schema_path: schemas/order/order.cancelled.v1.json
current_version: 1
topic: order-events
partition_key: order_id
avg_daily_volume: 8000
avg_payload_bytes: 800
retention_days: 7
producer:
service: order-service
team: order-team
consumers:
- service: payment-service
team: payment-team
- service: inventory-service
team: inventory-team
tags: ['critical', 'transactional']
created_at: '2024-06-15'
last_schema_change: '2024-06-15'
- event_type: user.profile.updated
domain: user
description: 'ユーザープロファイル情報が変更された時に発行される。'
schema_path: schemas/user/user.profile.updated.v1.json
current_version: 1
topic: user-events
partition_key: user_id
avg_daily_volume: 25000
avg_payload_bytes: 500
retention_days: 3
producer:
service: user-service
team: user-team
consumers:
- service: recommendation-service
team: ml-team
- service: notification-service
team: notification-team
tags: ['non-critical']
created_at: '2024-08-01'
last_schema_change: '2025-03-10'
ガバナンス導入ロードマップ
すべてのガバナンスを一度に導入すると、チームの反発を招くだけである。段階的に、最も痛みの大きいところから始める。
Phase 1: 可視性の確保(2-4週間)
├── イベントカタログ初案作成(既存イベントの棚卸し)
├── DLQ件数ダッシュボード構築
├── イベント処理成功/失敗メトリクス収集開始
└── 結果:「現在の状態」を数字で見られるようになる
Phase 2: オーナーシップの確立(2-4週間)
├── ownership.yaml作成(生産者/消費者マッピング)
├── DLQ処理担当チーム指定
├── スキーマ変更時の消費者チーム通知プロセス策定
└── 結果:「誰の責任か」答えられるようになる
Phase 3: 自動化されたルール強制(4-8週間)
├── スキーマ互換性チェックCIパイプライン構築
├── モジュール境界違反検出CI構築
├── SLO違反アラート自動化
└── 結果:ルール違反がデプロイ前にブロックされる
Phase 4: コスト最適化(継続的)
├── チーム別イベントコストタグ付けおよびレポート
├── 不要なイベント消費の整理
├── リテンションポリシーの最適化
└── 結果:コストが可視化され最適化が開始される
実践トラブルシューティング
消費者に知らせずにスキーマが変更され障害発生
症状:payment-serviceが突然KeyError: 'customer_id'エラーを大量に発生させる。order-serviceチームがcustomer_idをbuyer_idに名前を変えたが、消費者チームに知らせなかった。
事後対応:(1)即座にorder-serviceを以前のバージョンにロールバックする。(2)ポストモーテムでスキーマ変更プロセスの不在を根本原因として特定する。
再発防止:(1)スキーマ互換性チェックCIを導入し、BACKWARD互換が壊れる変更をデプロイ前にブロックする。(2)ownership.yamlを基にスキーマ変更PRに消費者チームを自動的にレビュアーに追加する。(3)重要なスキーマ変更はgovernance_rules.schema_change_notice_days(7日)の事前告知期間を設ける。
DLQに数千件が溜まっているが誰も処理しない
症状:DLQダッシュボードに3,000件が溜まっており、最も古いものは2週間前である。
原因:DLQ処理担当チームが指定されていないか、アラートが設定されていないため誰も認知していなかった。
対応:(1)ownership.yamlから該当イベントの消費者チームを特定する。(2)DLQ件数アラートを設定する:100件超過でWARNING、24時間以上経過した件があればP1。(3)週次DLQレビューミーティングをチームルーティンに含める。(4)DLQから自動再処理可能な件(一時的エラー)と手動介入が必要な件(スキーマ不一致、ビジネスロジックエラー)を分類するトリアージ自動化を実装する。
イベントボリュームが急増しKafkaクラスターコストが爆発
症状:月間Kafkaコストが12,000に急増した。
原因分析:コストタグ付けがないため、どのチームのどのイベントが原因か即座に把握できない。調査の結果、analyticsチームが新たに追加したpage.viewedイベントが日5,000万件で、ペイロードが5KBに達していた。
対応:(1)イベント別ボリュームモニタリングを構築する。(2)新イベント登録時に予想ボリュームとコスト影響度をカタログに記録するプロセスを策定する。(3)該当イベントのペイロードを最適化する(不要なフィールド削除、圧縮適用)。(4)リアルタイム性が不要なanalyticsイベントはバッチ処理に切り替え、Kafka負荷を軽減する。
参考資料
- Martin Fowler, "What do you mean by Event-Driven?" -- martinfowler.com/articles/201701-event-driven
- Confluent Schema Registry Documentation -- docs.confluent.io/platform/current/schema-registry
- Chris Richardson, "Microservices Patterns", Manning, 2018 -- Chapter 3: Interprocess communication
- Event Catalog (Open Source) -- eventcatalog.dev
- "Event-Driven Architecture: Pros and Cons" -- solace.com/blog/event-driven-architecture-pros-and-cons
- CloudEvents Specification -- cloudevents.io
- OpenTelemetry Documentation -- opentelemetry.io/docs
クイズ
EDAでガバナンスが不在の場合、最初に発生する問題は何か? 正解:イベントスキーマ変更が消費者に予告なく伝播してサービス障害が発生し、該当イベントの生産者/消費者の責任境界が不明確なため障害解決が遅延する。
BACKWARD互換性が保証されている場合、どのデプロイ順序が安全か? 正解:消費者を先にデプロイしても安全である。新スキーマで生産されたメッセージに追加されたoptionalフィールドを既存の消費者が無視でき、既存のrequiredフィールドが維持されるため、既存の消費者コードが壊れない。
イベントオーナーシップマッピングで消費者のSLOを定義する理由は? 正解:消費者ごとに処理時間の要件が異なる。決済サービスは500ms以内の処理が必要だが、analyticsパイプラインは60秒まで許容される。SLOを定義しておかないと、違反時に適切なチームに適切な優先度でアラートを送ることができない。
DLQガバナンスの核心的な3つの要素は? 正解:(1)DLQ処理担当チームの指定(ownership)、(2)DLQ件数と滞留時間に対するアラート設定(モニタリング)、(3)自動再処理可能な件と手動介入が必要な件の分類自動化(triage)。
スキーマ互換性チェックをCIで強制することの核心的なメリットは? 正解:互換性のないスキーマ変更がプロダクションにデプロイされる前にブロックされる。スキーマ変更によるサービス障害を根本的に防止し、変更の影響度をデプロイ前に確認できる。
イベントコストガバナンスを導入する最初のステップは? 正解:チーム別のイベントボリュームとコストをタグ付けしダッシュボードで可視化すること。どのチームのどのイベントがコストを消費しているか見えなければ、最適化対象を特定できない。
ガバナンス導入ロードマップで「可視性の確保」を最初に行う理由は? 正解:現在の状態を数字で見ることができなければ、オーナーシップ確立やルール強制の必要性をチームに説得する根拠がない。「DLQに3,000件が溜まっていて2週間放置されている」というデータがあってこそ、ガバナンス導入に対するチームの合意を得ることができる。
新イベントをカタログに登録する際に必ず含めるべき5つの情報は? 正解:(1)イベントタイプと説明、(2)生産者サービス/チーム、(3)スキーマパスとバージョン、(4)予想日次ボリュームとペイロードサイズ、(5)消費者リストと各消費者の処理SLO。