Skip to content
Published on

アーキテクチャ:イベント駆動ガバナンス実践 2026

Authors
  • Name
    Twitter
アーキテクチャ:イベント駆動ガバナンス実践 2026

イベント駆動アーキテクチャにガバナンスが必要な理由

Event-Driven Architecture(EDA)はサービス間の結合度を低減し、スケーラビリティを向上させる強力なパターンである。しかし、ガバナンスのないEDAは急速に混沌に陥る。

チームAがuser.updatedイベントのフィールド名を変更したところ、このイベントをサブスクライブしていたチームB、C、Dのサービスが同時に壊れる。誰がこのイベントを消費しているか誰も知らず、スキーマ変更が予告なくデプロイされる。Dead Letter Queueに数千件の処理失敗イベントが溜まっているが、それが誰の責任なのか不明確である。

このような状況はEDAの問題ではなく、ガバナンスの不在の問題である。ガバナンスとは、ルールを作り強制する体系を意味する。この記事では、イベント駆動システムで実務的に必要なガバナンスの4つの軸を扱う。

  1. イベントスキーマガバナンス:イベントの形式と進化のルール
  2. オーナーシップガバナンス:イベントの生産者/消費者の責任境界
  3. 品質ガバナンス:イベント処理のSLOとモニタリング
  4. コストガバナンス:イベントインフラのコスト可視性と統制

イベントスキーマガバナンス

スキーマレジストリの導入

イベントスキーマをコード内に暗黙的に置くと、変更の影響度を把握できない。スキーマを中央レジストリで管理し、CIで互換性を強制すべきである。

Confluent Schema Registry、AWS Glue Schema Registry、または自前で構築したGitベースのレジストリを使用できる。ここではGitベースの軽量なアプローチを示す。

schemas/
├── order/
│   ├── order.created.v1.json
│   ├── order.created.v2.json     # v1とBACKWARD互換
│   ├── order.confirmed.v1.json
│   └── order.cancelled.v1.json
├── payment/
│   ├── payment.reserved.v1.json
│   └── payment.completed.v1.json
├── inventory/
│   └── inventory.reserved.v1.json
└── _meta/
    ├── compatibility-rules.yaml  # 互換性ポリシー
    └── ownership.yaml            # イベントオーナーシップマッピング

互換性チェックの自動化

スキーマ変更が既存の消費者を壊さないことをCIで自動検証する。

"""
イベントスキーマ互換性チェッカー。

JSON SchemaのBACKWARD互換性を検証する。
BACKWARD互換 = 新スキーマで生産されたメッセージを
以前のスキーマの消費者が処理できる。

ルール:
  - 新フィールド追加: OK(optionalであること)
  - 既存requiredフィールド削除: FAIL
  - 既存フィールドの型変更: FAIL
  - requiredフィールド追加: FAIL(既存の消費者がこのフィールドを送信しない)
"""
import json
import sys
from pathlib import Path
from dataclasses import dataclass


@dataclass
class CompatibilityIssue:
    severity: str   # "ERROR" or "WARNING"
    path: str       # JSON path
    message: str


def check_backward_compatibility(
    old_schema: dict,
    new_schema: dict,
) -> list[CompatibilityIssue]:
    """以前のスキーマと新スキーマのBACKWARD互換性を検査する。"""
    issues = []

    old_props = old_schema.get("properties", {})
    new_props = new_schema.get("properties", {})
    old_required = set(old_schema.get("required", []))
    new_required = set(new_schema.get("required", []))

    # 1. 既存フィールドが削除されたか確認
    for field_name in old_props:
        if field_name not in new_props:
            issues.append(CompatibilityIssue(
                severity="ERROR",
                path=f"$.properties.{field_name}",
                message=f"Field '{field_name}' was removed. "
                        f"Existing consumers may depend on this field.",
            ))

    # 2. 既存フィールドの型が変更されたか確認
    for field_name in old_props:
        if field_name in new_props:
            old_type = old_props[field_name].get("type")
            new_type = new_props[field_name].get("type")
            if old_type and new_type and old_type != new_type:
                issues.append(CompatibilityIssue(
                    severity="ERROR",
                    path=f"$.properties.{field_name}.type",
                    message=f"Type of '{field_name}' changed from "
                            f"'{old_type}' to '{new_type}'.",
                ))

    # 3. 新たにrequiredになったフィールドがあるか確認
    new_required_fields = new_required - old_required
    for field_name in new_required_fields:
        if field_name not in old_props:
            issues.append(CompatibilityIssue(
                severity="ERROR",
                path=f"$.required.{field_name}",
                message=f"New required field '{field_name}' added. "
                        f"Existing producers don't include this field.",
            ))

    # 4. 新しいoptionalフィールドの追加はWARNINGとして記録
    for field_name in new_props:
        if field_name not in old_props:
            if field_name not in new_required:
                issues.append(CompatibilityIssue(
                    severity="WARNING",
                    path=f"$.properties.{field_name}",
                    message=f"New optional field '{field_name}' added. "
                            f"Consumers should handle missing field gracefully.",
                ))

    # ネストされたobjectのpropertiesも再帰的に検査
    for field_name in old_props:
        if field_name in new_props:
            old_field = old_props[field_name]
            new_field = new_props[field_name]
            if old_field.get("type") == "object" and new_field.get("type") == "object":
                nested_issues = check_backward_compatibility(old_field, new_field)
                for issue in nested_issues:
                    issue.path = f"$.properties.{field_name}{issue.path[1:]}"
                    issues.append(issue)

    return issues


def validate_schema_evolution(schema_dir: str) -> bool:
    """ディレクトリ内のすべてのスキーマのバージョン間互換性を検証する。"""
    schema_path = Path(schema_dir)
    all_pass = True

    for domain_dir in sorted(schema_path.iterdir()):
        if not domain_dir.is_dir() or domain_dir.name.startswith("_"):
            continue

        # 同じイベントタイプのバージョンをグルーピング
        event_versions: dict[str, list[Path]] = {}
        for schema_file in sorted(domain_dir.glob("*.json")):
            # event_name.v1.json -> event_name
            parts = schema_file.stem.rsplit(".v", 1)
            if len(parts) == 2:
                event_name = parts[0]
                event_versions.setdefault(event_name, []).append(schema_file)

        # 連続バージョン間の互換性検査
        for event_name, versions in event_versions.items():
            for i in range(len(versions) - 1):
                old = json.loads(versions[i].read_text())
                new = json.loads(versions[i + 1].read_text())

                issues = check_backward_compatibility(old, new)
                errors = [i for i in issues if i.severity == "ERROR"]

                if errors:
                    all_pass = False
                    print(f"\nFAIL: {versions[i].name} -> {versions[i+1].name}")
                    for err in errors:
                        print(f"  [{err.severity}] {err.path}: {err.message}")

    return all_pass


if __name__ == "__main__":
    schema_dir = sys.argv[1] if len(sys.argv) > 1 else "schemas"
    success = validate_schema_evolution(schema_dir)
    sys.exit(0 if success else 1)

オーナーシップガバナンス:このイベントは誰が責任を持つのか

イベントの生産者と消費者が明確でないと、障害時に責任の空白が発生する。「このイベント誰が発行しているの?」「このDLQの処理は誰がやるべき?」といった質問に即座に答えられなければならない。

# schemas/_meta/ownership.yaml
# イベントオーナーシップマッピング。このファイルはすべてのイベントのオーナーシップ関係を定義する。

events:
  order.created:
    producer:
      team: order-team
      service: order-service
      slack_channel: '#order-alerts'
      oncall_rotation: 'order-oncall'
    consumers:
      - team: payment-team
        service: payment-service
        purpose: '決済予約トリガー'
        slo_processing_time_ms: 500
      - team: inventory-team
        service: inventory-service
        purpose: '在庫予約トリガー'
        slo_processing_time_ms: 1000
      - team: analytics-team
        service: analytics-pipeline
        purpose: '注文分析データ収集'
        slo_processing_time_ms: 60000 # リアルタイムではなく準リアルタイム

  order.cancelled:
    producer:
      team: order-team
      service: order-service
      slack_channel: '#order-alerts'
      oncall_rotation: 'order-oncall'
    consumers:
      - team: payment-team
        service: payment-service
        purpose: '決済返金トリガー'
        slo_processing_time_ms: 500
      - team: inventory-team
        service: inventory-service
        purpose: '在庫解放'
        slo_processing_time_ms: 1000

  payment.completed:
    producer:
      team: payment-team
      service: payment-service
      slack_channel: '#payment-alerts'
      oncall_rotation: 'payment-oncall'
    consumers:
      - team: order-team
        service: order-service
        purpose: '注文ステータスをconfirmedに更新'
        slo_processing_time_ms: 500
      - team: notification-team
        service: notification-service
        purpose: '決済完了通知送信'
        slo_processing_time_ms: 3000

governance_rules:
  # スキーマ変更時はすべての消費者チームに事前通知必須
  schema_change_notice_days: 7
  # 新しい消費者登録時は生産者チームの承認必須
  consumer_registration: requires_producer_approval
  # 消費者がSLOを守れない場合にアラート
  slo_violation_alert: true

このオーナーシップファイルを基に、スキーマ変更PRが上がった際に自動的に消費者チームにレビューリクエストを送るCI自動化を構築できる。

"""
スキーマ変更時に影響を受ける消費者チームに自動通知するスクリプト。

git diffで変更されたスキーマファイルを検出し、
ownership.yamlから消費者チームを照会してSlack通知を送信する。
"""
import yaml
import subprocess
from pathlib import Path


def get_changed_schemas() -> list[str]:
    """git diffから変更されたスキーマファイル一覧を返す。"""
    result = subprocess.run(
        ["git", "diff", "--name-only", "origin/main", "--", "schemas/"],
        capture_output=True, text=True,
    )
    return [
        line.strip() for line in result.stdout.splitlines()
        if line.strip().endswith(".json")
    ]


def load_ownership() -> dict:
    with open("schemas/_meta/ownership.yaml") as f:
        return yaml.safe_load(f)


def find_affected_consumers(
    changed_files: list[str],
    ownership: dict,
) -> dict[str, list[dict]]:
    """変更されたスキーマの消費者チーム一覧を返す。"""
    affected: dict[str, list[dict]] = {}

    for filepath in changed_files:
        # schemas/order/order.created.v2.json -> order.created
        parts = Path(filepath).stem.rsplit(".v", 1)
        if len(parts) != 2:
            continue
        event_type = parts[0].replace("/", ".")
        # domain/event_name -> domain.event_name形式に変換
        path_parts = Path(filepath).parts
        if len(path_parts) >= 3:
            event_type = f"{path_parts[1]}.{parts[0].split('.')[-1] if '.' in parts[0] else parts[0]}"

        event_info = ownership.get("events", {}).get(event_type, {})
        consumers = event_info.get("consumers", [])
        if consumers:
            affected[event_type] = consumers

    return affected


def notify_consumers(affected: dict[str, list[dict]]):
    """影響を受ける消費者チームにSlack通知を送信する。"""
    for event_type, consumers in affected.items():
        teams = set(c["team"] for c in consumers)
        channels = set()
        for team_name in teams:
            # チーム別Slackチャンネル照会(実際にはownershipから)
            channels.add(f"#{team_name}-alerts")

        message = (
            f"Schema change detected for `{event_type}`.\n"
            f"Affected consumer teams: {', '.join(teams)}\n"
            f"Please review the schema change PR for backward compatibility."
        )
        print(f"[NOTIFY] {channels}: {message}")
        # 実際にはSlack API呼び出し


if __name__ == "__main__":
    changed = get_changed_schemas()
    if not changed:
        print("No schema changes detected.")
    else:
        ownership = load_ownership()
        affected = find_affected_consumers(changed, ownership)
        if affected:
            notify_consumers(affected)
        else:
            print("Changed schemas have no registered consumers.")

品質ガバナンス:SLOベースのイベント処理モニタリング

イベント処理の品質を測定するには、3つのSLI(Service Level Indicator)が必要である。

  1. 処理レイテンシー(Processing Latency):イベント発行から消費完了までの時間
  2. 処理成功率(Success Rate):イベント処理の成功率
  3. DLQ滞留時間(DLQ Dwell Time):DLQに入ったイベントが解決されるまでの時間
-- イベント処理SLOダッシュボードクエリ

-- 1. 消費者別処理レイテンシーSLO達成率
-- SLO: 99%のイベントが定義された時間内に処理されること
SELECT
    consumer_service,
    event_type,
    COUNT(*) AS total_events,
    PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY processing_latency_ms) AS p50_ms,
    PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY processing_latency_ms) AS p99_ms,
    -- ownership.yamlに定義されたSLOと比較
    ROUND(
        100.0 * SUM(
            CASE WHEN processing_latency_ms <= slo_target_ms THEN 1 ELSE 0 END
        ) / COUNT(*),
        2
    ) AS slo_achievement_pct
FROM event_processing_log epl
JOIN event_slo_targets est
    ON epl.consumer_service = est.consumer_service
    AND epl.event_type = est.event_type
WHERE epl.processed_at >= NOW() - INTERVAL '24 hours'
GROUP BY consumer_service, event_type
ORDER BY slo_achievement_pct ASC;

-- 2. DLQ現況:チーム別未処理イベント
SELECT
    owner_team,
    event_type,
    COUNT(*) AS dlq_count,
    MIN(entered_dlq_at) AS oldest_entry,
    EXTRACT(EPOCH FROM (NOW() - MIN(entered_dlq_at))) / 3600 AS oldest_hours,
    -- 24時間以上経過した件数(P1アラート対象)
    SUM(CASE WHEN entered_dlq_at < NOW() - INTERVAL '24 hours' THEN 1 ELSE 0 END)
        AS stale_count
FROM dead_letter_queue dlq
JOIN event_ownership eo ON dlq.event_type = eo.event_type
WHERE dlq.resolved_at IS NULL
GROUP BY owner_team, event_type
ORDER BY oldest_hours DESC;

-- 3. 週次イベント処理トレンド
SELECT
    date_trunc('day', processed_at) AS day,
    event_type,
    COUNT(*) AS total,
    SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS successes,
    SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS failures,
    SUM(CASE WHEN status = 'dlq' THEN 1 ELSE 0 END) AS dlq_entries,
    ROUND(
        100.0 * SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) / COUNT(*),
        2
    ) AS success_rate_pct
FROM event_processing_log
WHERE processed_at >= NOW() - INTERVAL '7 days'
GROUP BY 1, 2
ORDER BY 1 DESC, success_rate_pct ASC;

コストガバナンス:イベントインフラコストの可視化

Kafkaクラスター、イベント処理コンピューティング、ストレージコストはイベントボリュームに比例して増加する。チーム別にコストをタグ付けし可視化しなければ、コスト最適化の出発点を見つけることができない。

"""
イベントインフラコストタグ付けおよびレポート生成器。

各チームが生産/消費するイベントボリュームと
該当インフラコストをマッピングし、FinOpsダッシュボードに提供する。
"""
from dataclasses import dataclass
from typing import Optional


@dataclass
class EventCostEntry:
    team: str
    event_type: str
    role: str               # "producer" or "consumer"
    daily_volume: int
    avg_payload_bytes: int
    estimated_daily_cost_usd: float


@dataclass
class CostBreakdown:
    # Kafkaコスト構成要素
    broker_storage_usd: float    # メッセージ保管ストレージ
    broker_network_usd: float    # ネットワーク転送
    consumer_compute_usd: float  # 消費者コンピューティングリソース
    schema_registry_usd: float   # スキーマレジストリ運用

    @property
    def total_usd(self) -> float:
        return (
            self.broker_storage_usd
            + self.broker_network_usd
            + self.consumer_compute_usd
            + self.schema_registry_usd
        )


class EventCostCalculator:
    """イベントベースインフラコスト算出器。

    単位コスト定数に基づいてチーム別/イベント別コストを推定する。
    実際のクラウドコストと定期的にキャリブレーションが必要。
    """

    # 単位コスト定数(AWS MSK基準の推定)
    STORAGE_PER_GB_DAY_USD = 0.10
    NETWORK_PER_GB_USD = 0.05
    CONSUMER_PER_MILLION_EVENTS_USD = 0.50
    RETENTION_DAYS = 7

    def calculate_team_cost(
        self,
        entries: list[EventCostEntry],
    ) -> dict[str, CostBreakdown]:
        """チーム別日次コストを算出する。"""
        team_costs: dict[str, CostBreakdown] = {}

        for entry in entries:
            daily_data_gb = (
                entry.daily_volume * entry.avg_payload_bytes
            ) / (1024 ** 3)

            cost = CostBreakdown(
                broker_storage_usd=daily_data_gb * self.STORAGE_PER_GB_DAY_USD * self.RETENTION_DAYS,
                broker_network_usd=daily_data_gb * self.NETWORK_PER_GB_USD,
                consumer_compute_usd=(
                    entry.daily_volume / 1_000_000
                ) * self.CONSUMER_PER_MILLION_EVENTS_USD if entry.role == "consumer" else 0,
                schema_registry_usd=0.01,  # 固定分割
            )

            if entry.team in team_costs:
                existing = team_costs[entry.team]
                team_costs[entry.team] = CostBreakdown(
                    broker_storage_usd=existing.broker_storage_usd + cost.broker_storage_usd,
                    broker_network_usd=existing.broker_network_usd + cost.broker_network_usd,
                    consumer_compute_usd=existing.consumer_compute_usd + cost.consumer_compute_usd,
                    schema_registry_usd=existing.schema_registry_usd + cost.schema_registry_usd,
                )
            else:
                team_costs[entry.team] = cost

        return team_costs

    def generate_report(self, team_costs: dict[str, CostBreakdown]) -> str:
        """チーム別コストレポートをテキストで生成する。"""
        lines = ["=== Event Infrastructure Daily Cost Report ===\n"]

        total = 0.0
        for team, cost in sorted(team_costs.items(), key=lambda x: x[1].total_usd, reverse=True):
            lines.append(f"Team: {team}")
            lines.append(f"  Storage:  ${cost.broker_storage_usd:.2f}")
            lines.append(f"  Network:  ${cost.broker_network_usd:.2f}")
            lines.append(f"  Compute:  ${cost.consumer_compute_usd:.2f}")
            lines.append(f"  Registry: ${cost.schema_registry_usd:.2f}")
            lines.append(f"  TOTAL:    ${cost.total_usd:.2f}\n")
            total += cost.total_usd

        lines.append(f"=== Grand Total: ${total:.2f}/day (${total*30:.2f}/month est.) ===")
        return "\n".join(lines)

イベントカタログ:ディスカバリー問題の解決

「私たちのシステムにどんなイベントがあるの?」「このイベントを誰が発行して誰が消費しているの?」

イベント数が50を超えると、これらの質問に即座に答えるのが難しくなる。イベントカタログはすべてのイベントのメタデータを一箇所で検索可能にする。

# event-catalog.yaml
# イベントカタログ:すべてのイベントのメタデータ中央管理

catalog:
  - event_type: order.created
    domain: order
    description: '顧客が新しい注文を作成した時に発行される。'
    schema_path: schemas/order/order.created.v2.json
    current_version: 2
    topic: order-events
    partition_key: order_id
    avg_daily_volume: 150000
    avg_payload_bytes: 1200
    retention_days: 7
    producer:
      service: order-service
      team: order-team
    consumers:
      - service: payment-service
        team: payment-team
      - service: inventory-service
        team: inventory-team
      - service: analytics-pipeline
        team: analytics-team
    tags: ['critical', 'transactional']
    created_at: '2024-06-15'
    last_schema_change: '2025-11-20'

  - event_type: order.cancelled
    domain: order
    description: '注文がキャンセルされた時に発行される。理由フィールドを含む。'
    schema_path: schemas/order/order.cancelled.v1.json
    current_version: 1
    topic: order-events
    partition_key: order_id
    avg_daily_volume: 8000
    avg_payload_bytes: 800
    retention_days: 7
    producer:
      service: order-service
      team: order-team
    consumers:
      - service: payment-service
        team: payment-team
      - service: inventory-service
        team: inventory-team
    tags: ['critical', 'transactional']
    created_at: '2024-06-15'
    last_schema_change: '2024-06-15'

  - event_type: user.profile.updated
    domain: user
    description: 'ユーザープロファイル情報が変更された時に発行される。'
    schema_path: schemas/user/user.profile.updated.v1.json
    current_version: 1
    topic: user-events
    partition_key: user_id
    avg_daily_volume: 25000
    avg_payload_bytes: 500
    retention_days: 3
    producer:
      service: user-service
      team: user-team
    consumers:
      - service: recommendation-service
        team: ml-team
      - service: notification-service
        team: notification-team
    tags: ['non-critical']
    created_at: '2024-08-01'
    last_schema_change: '2025-03-10'

ガバナンス導入ロードマップ

すべてのガバナンスを一度に導入すると、チームの反発を招くだけである。段階的に、最も痛みの大きいところから始める。

Phase 1: 可視性の確保(2-4週間)
├── イベントカタログ初案作成(既存イベントの棚卸し)
├── DLQ件数ダッシュボード構築
├── イベント処理成功/失敗メトリクス収集開始
└── 結果:「現在の状態」を数字で見られるようになる

Phase 2: オーナーシップの確立(2-4週間)
├── ownership.yaml作成(生産者/消費者マッピング)
├── DLQ処理担当チーム指定
├── スキーマ変更時の消費者チーム通知プロセス策定
└── 結果:「誰の責任か」答えられるようになる

Phase 3: 自動化されたルール強制(4-8週間)
├── スキーマ互換性チェックCIパイプライン構築
├── モジュール境界違反検出CI構築
├── SLO違反アラート自動化
└── 結果:ルール違反がデプロイ前にブロックされる

Phase 4: コスト最適化(継続的)
├── チーム別イベントコストタグ付けおよびレポート
├── 不要なイベント消費の整理
├── リテンションポリシーの最適化
└── 結果:コストが可視化され最適化が開始される

実践トラブルシューティング

消費者に知らせずにスキーマが変更され障害発生

症状:payment-serviceが突然KeyError: 'customer_id'エラーを大量に発生させる。order-serviceチームがcustomer_idbuyer_idに名前を変えたが、消費者チームに知らせなかった。

事後対応:(1)即座にorder-serviceを以前のバージョンにロールバックする。(2)ポストモーテムでスキーマ変更プロセスの不在を根本原因として特定する。

再発防止:(1)スキーマ互換性チェックCIを導入し、BACKWARD互換が壊れる変更をデプロイ前にブロックする。(2)ownership.yamlを基にスキーマ変更PRに消費者チームを自動的にレビュアーに追加する。(3)重要なスキーマ変更はgovernance_rules.schema_change_notice_days(7日)の事前告知期間を設ける。

DLQに数千件が溜まっているが誰も処理しない

症状:DLQダッシュボードに3,000件が溜まっており、最も古いものは2週間前である。

原因:DLQ処理担当チームが指定されていないか、アラートが設定されていないため誰も認知していなかった。

対応:(1)ownership.yamlから該当イベントの消費者チームを特定する。(2)DLQ件数アラートを設定する:100件超過でWARNING、24時間以上経過した件があればP1。(3)週次DLQレビューミーティングをチームルーティンに含める。(4)DLQから自動再処理可能な件(一時的エラー)と手動介入が必要な件(スキーマ不一致、ビジネスロジックエラー)を分類するトリアージ自動化を実装する。

イベントボリュームが急増しKafkaクラスターコストが爆発

症状:月間Kafkaコストが3,000から3,000から12,000に急増した。

原因分析:コストタグ付けがないため、どのチームのどのイベントが原因か即座に把握できない。調査の結果、analyticsチームが新たに追加したpage.viewedイベントが日5,000万件で、ペイロードが5KBに達していた。

対応:(1)イベント別ボリュームモニタリングを構築する。(2)新イベント登録時に予想ボリュームとコスト影響度をカタログに記録するプロセスを策定する。(3)該当イベントのペイロードを最適化する(不要なフィールド削除、圧縮適用)。(4)リアルタイム性が不要なanalyticsイベントはバッチ処理に切り替え、Kafka負荷を軽減する。

参考資料

クイズ
  1. EDAでガバナンスが不在の場合、最初に発生する問題は何か? 正解:イベントスキーマ変更が消費者に予告なく伝播してサービス障害が発生し、該当イベントの生産者/消費者の責任境界が不明確なため障害解決が遅延する。

  2. BACKWARD互換性が保証されている場合、どのデプロイ順序が安全か? 正解:消費者を先にデプロイしても安全である。新スキーマで生産されたメッセージに追加されたoptionalフィールドを既存の消費者が無視でき、既存のrequiredフィールドが維持されるため、既存の消費者コードが壊れない。

  3. イベントオーナーシップマッピングで消費者のSLOを定義する理由は? 正解:消費者ごとに処理時間の要件が異なる。決済サービスは500ms以内の処理が必要だが、analyticsパイプラインは60秒まで許容される。SLOを定義しておかないと、違反時に適切なチームに適切な優先度でアラートを送ることができない。

  4. DLQガバナンスの核心的な3つの要素は? 正解:(1)DLQ処理担当チームの指定(ownership)、(2)DLQ件数と滞留時間に対するアラート設定(モニタリング)、(3)自動再処理可能な件と手動介入が必要な件の分類自動化(triage)。

  5. スキーマ互換性チェックをCIで強制することの核心的なメリットは? 正解:互換性のないスキーマ変更がプロダクションにデプロイされる前にブロックされる。スキーマ変更によるサービス障害を根本的に防止し、変更の影響度をデプロイ前に確認できる。

  6. イベントコストガバナンスを導入する最初のステップは? 正解:チーム別のイベントボリュームとコストをタグ付けしダッシュボードで可視化すること。どのチームのどのイベントがコストを消費しているか見えなければ、最適化対象を特定できない。

  7. ガバナンス導入ロードマップで「可視性の確保」を最初に行う理由は? 正解:現在の状態を数字で見ることができなければ、オーナーシップ確立やルール強制の必要性をチームに説得する根拠がない。「DLQに3,000件が溜まっていて2週間放置されている」というデータがあってこそ、ガバナンス導入に対するチームの合意を得ることができる。

  8. 新イベントをカタログに登録する際に必ず含めるべき5つの情報は? 正解:(1)イベントタイプと説明、(2)生産者サービス/チーム、(3)スキーマパスとバージョン、(4)予想日次ボリュームとペイロードサイズ、(5)消費者リストと各消費者の処理SLO。