Skip to content
Published on

Architecture: Event-Driven Governance in Practice 2026

Authors
  • Name
    Twitter
Architecture: Event-Driven Governance in Practice 2026

Why Event-Driven Architecture Needs Governance

Event-Driven Architecture (EDA) is a powerful pattern that reduces coupling between services and increases scalability. However, EDA without governance rapidly descends into chaos.

Team A renamed a field in the user.updated event, and the services of teams B, C, and D that subscribed to this event all broke simultaneously. Nobody knows who consumes this event, and schema changes are deployed without notice. Thousands of processing failure events pile up in the Dead Letter Queue, but it is unclear whose responsibility they are.

This situation is not a problem with EDA but a problem of missing governance. Governance refers to the system of creating and enforcing rules. This post covers the four pillars of governance that are practically necessary in event-driven systems:

  1. Event Schema Governance: Rules for event format and evolution
  2. Ownership Governance: Responsibility boundaries for event producers/consumers
  3. Quality Governance: SLOs and monitoring for event processing
  4. Cost Governance: Cost visibility and control for event infrastructure

Event Schema Governance

Introducing a Schema Registry

If event schemas are kept implicitly within code, the impact of changes cannot be assessed. Schemas should be managed in a central registry, with compatibility enforced in CI.

You can use Confluent Schema Registry, AWS Glue Schema Registry, or a self-built Git-based registry. Here we demonstrate a lightweight Git-based approach.

schemas/
├── order/
│   ├── order.created.v1.json
│   ├── order.created.v2.json     # BACKWARD compatible with v1
│   ├── order.confirmed.v1.json
│   └── order.cancelled.v1.json
├── payment/
│   ├── payment.reserved.v1.json
│   └── payment.completed.v1.json
├── inventory/
│   └── inventory.reserved.v1.json
└── _meta/
    ├── compatibility-rules.yaml  # Compatibility policies
    └── ownership.yaml            # Event ownership mapping

Automating Compatibility Checks

Automatically verify in CI that schema changes do not break existing consumers.

"""
Event schema compatibility checker.

Validates BACKWARD compatibility of JSON Schemas.
BACKWARD compatible = messages produced with the new schema
can be processed by consumers using the previous schema.

Rules:
  - Adding new fields: OK (must be optional)
  - Removing existing required fields: FAIL
  - Changing existing field types: FAIL
  - Adding required fields: FAIL (existing consumers don't send this field)
"""
import json
import sys
from pathlib import Path
from dataclasses import dataclass


@dataclass
class CompatibilityIssue:
    severity: str   # "ERROR" or "WARNING"
    path: str       # JSON path
    message: str


def check_backward_compatibility(
    old_schema: dict,
    new_schema: dict,
) -> list[CompatibilityIssue]:
    """Checks BACKWARD compatibility between old and new schemas."""
    issues = []

    old_props = old_schema.get("properties", {})
    new_props = new_schema.get("properties", {})
    old_required = set(old_schema.get("required", []))
    new_required = set(new_schema.get("required", []))

    # 1. Check if existing fields have been removed
    for field_name in old_props:
        if field_name not in new_props:
            issues.append(CompatibilityIssue(
                severity="ERROR",
                path=f"$.properties.{field_name}",
                message=f"Field '{field_name}' was removed. "
                        f"Existing consumers may depend on this field.",
            ))

    # 2. Check if existing field types have changed
    for field_name in old_props:
        if field_name in new_props:
            old_type = old_props[field_name].get("type")
            new_type = new_props[field_name].get("type")
            if old_type and new_type and old_type != new_type:
                issues.append(CompatibilityIssue(
                    severity="ERROR",
                    path=f"$.properties.{field_name}.type",
                    message=f"Type of '{field_name}' changed from "
                            f"'{old_type}' to '{new_type}'.",
                ))

    # 3. Check if new required fields have been added
    new_required_fields = new_required - old_required
    for field_name in new_required_fields:
        if field_name not in old_props:
            issues.append(CompatibilityIssue(
                severity="ERROR",
                path=f"$.required.{field_name}",
                message=f"New required field '{field_name}' added. "
                        f"Existing producers don't include this field.",
            ))

    # 4. Record new optional field additions as WARNING
    for field_name in new_props:
        if field_name not in old_props:
            if field_name not in new_required:
                issues.append(CompatibilityIssue(
                    severity="WARNING",
                    path=f"$.properties.{field_name}",
                    message=f"New optional field '{field_name}' added. "
                            f"Consumers should handle missing field gracefully.",
                ))

    # Recursively check nested object properties
    for field_name in old_props:
        if field_name in new_props:
            old_field = old_props[field_name]
            new_field = new_props[field_name]
            if old_field.get("type") == "object" and new_field.get("type") == "object":
                nested_issues = check_backward_compatibility(old_field, new_field)
                for issue in nested_issues:
                    issue.path = f"$.properties.{field_name}{issue.path[1:]}"
                    issues.append(issue)

    return issues


def validate_schema_evolution(schema_dir: str) -> bool:
    """Validates compatibility across versions for all schemas in the directory."""
    schema_path = Path(schema_dir)
    all_pass = True

    for domain_dir in sorted(schema_path.iterdir()):
        if not domain_dir.is_dir() or domain_dir.name.startswith("_"):
            continue

        # Group versions of the same event type
        event_versions: dict[str, list[Path]] = {}
        for schema_file in sorted(domain_dir.glob("*.json")):
            # event_name.v1.json -> event_name
            parts = schema_file.stem.rsplit(".v", 1)
            if len(parts) == 2:
                event_name = parts[0]
                event_versions.setdefault(event_name, []).append(schema_file)

        # Check compatibility between consecutive versions
        for event_name, versions in event_versions.items():
            for i in range(len(versions) - 1):
                old = json.loads(versions[i].read_text())
                new = json.loads(versions[i + 1].read_text())

                issues = check_backward_compatibility(old, new)
                errors = [i for i in issues if i.severity == "ERROR"]

                if errors:
                    all_pass = False
                    print(f"\nFAIL: {versions[i].name} -> {versions[i+1].name}")
                    for err in errors:
                        print(f"  [{err.severity}] {err.path}: {err.message}")

    return all_pass


if __name__ == "__main__":
    schema_dir = sys.argv[1] if len(sys.argv) > 1 else "schemas"
    success = validate_schema_evolution(schema_dir)
    sys.exit(0 if success else 1)

Ownership Governance: Who Is Responsible for This Event?

If event producers and consumers are not clearly defined, responsibility gaps emerge during incidents. You must be able to immediately answer questions like "Who publishes this event?" and "Who should handle this DLQ?"

# schemas/_meta/ownership.yaml
# Event ownership mapping. This file defines ownership relationships for all events.

events:
  order.created:
    producer:
      team: order-team
      service: order-service
      slack_channel: '#order-alerts'
      oncall_rotation: 'order-oncall'
    consumers:
      - team: payment-team
        service: payment-service
        purpose: 'Trigger payment reservation'
        slo_processing_time_ms: 500
      - team: inventory-team
        service: inventory-service
        purpose: 'Trigger inventory reservation'
        slo_processing_time_ms: 1000
      - team: analytics-team
        service: analytics-pipeline
        purpose: 'Collect order analytics data'
        slo_processing_time_ms: 60000 # Near real-time, not real-time

  order.cancelled:
    producer:
      team: order-team
      service: order-service
      slack_channel: '#order-alerts'
      oncall_rotation: 'order-oncall'
    consumers:
      - team: payment-team
        service: payment-service
        purpose: 'Trigger payment refund'
        slo_processing_time_ms: 500
      - team: inventory-team
        service: inventory-service
        purpose: 'Release inventory'
        slo_processing_time_ms: 1000

  payment.completed:
    producer:
      team: payment-team
      service: payment-service
      slack_channel: '#payment-alerts'
      oncall_rotation: 'payment-oncall'
    consumers:
      - team: order-team
        service: order-service
        purpose: 'Update order status to confirmed'
        slo_processing_time_ms: 500
      - team: notification-team
        service: notification-service
        purpose: 'Send payment completion notification'
        slo_processing_time_ms: 3000

governance_rules:
  # Prior notice to all consumer teams is required for schema changes
  schema_change_notice_days: 7
  # Producer team approval is required for new consumer registration
  consumer_registration: requires_producer_approval
  # Alert when consumers fail to meet SLO
  slo_violation_alert: true

Based on this ownership file, you can build CI automation that automatically sends review requests to consumer teams when a schema change PR is submitted.

"""
Script that automatically notifies affected consumer teams on schema changes.

Detects changed schema files via git diff,
looks up consumer teams from ownership.yaml, and sends Slack notifications.
"""
import yaml
import subprocess
from pathlib import Path


def get_changed_schemas() -> list[str]:
    """Returns the list of changed schema files from git diff."""
    result = subprocess.run(
        ["git", "diff", "--name-only", "origin/main", "--", "schemas/"],
        capture_output=True, text=True,
    )
    return [
        line.strip() for line in result.stdout.splitlines()
        if line.strip().endswith(".json")
    ]


def load_ownership() -> dict:
    with open("schemas/_meta/ownership.yaml") as f:
        return yaml.safe_load(f)


def find_affected_consumers(
    changed_files: list[str],
    ownership: dict,
) -> dict[str, list[dict]]:
    """Returns the list of consumer teams for changed schemas."""
    affected: dict[str, list[dict]] = {}

    for filepath in changed_files:
        # schemas/order/order.created.v2.json -> order.created
        parts = Path(filepath).stem.rsplit(".v", 1)
        if len(parts) != 2:
            continue
        event_type = parts[0].replace("/", ".")
        # domain/event_name -> domain.event_name format
        path_parts = Path(filepath).parts
        if len(path_parts) >= 3:
            event_type = f"{path_parts[1]}.{parts[0].split('.')[-1] if '.' in parts[0] else parts[0]}"

        event_info = ownership.get("events", {}).get(event_type, {})
        consumers = event_info.get("consumers", [])
        if consumers:
            affected[event_type] = consumers

    return affected


def notify_consumers(affected: dict[str, list[dict]]):
    """Sends Slack notifications to affected consumer teams."""
    for event_type, consumers in affected.items():
        teams = set(c["team"] for c in consumers)
        channels = set()
        for team_name in teams:
            # Look up team Slack channel (from ownership in practice)
            channels.add(f"#{team_name}-alerts")

        message = (
            f"Schema change detected for `{event_type}`.\n"
            f"Affected consumer teams: {', '.join(teams)}\n"
            f"Please review the schema change PR for backward compatibility."
        )
        print(f"[NOTIFY] {channels}: {message}")
        # In practice, call Slack API


if __name__ == "__main__":
    changed = get_changed_schemas()
    if not changed:
        print("No schema changes detected.")
    else:
        ownership = load_ownership()
        affected = find_affected_consumers(changed, ownership)
        if affected:
            notify_consumers(affected)
        else:
            print("Changed schemas have no registered consumers.")

Quality Governance: SLO-Based Event Processing Monitoring

To measure event processing quality, three SLIs (Service Level Indicators) are needed:

  1. Processing Latency: Time from event publication to consumption completion
  2. Success Rate: Percentage of successful event processing
  3. DLQ Dwell Time: Time from when an event enters the DLQ until it is resolved
-- Event Processing SLO Dashboard Queries

-- 1. Processing latency SLO achievement rate by consumer
-- SLO: 99% of events must be processed within the defined time
SELECT
    consumer_service,
    event_type,
    COUNT(*) AS total_events,
    PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY processing_latency_ms) AS p50_ms,
    PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY processing_latency_ms) AS p99_ms,
    -- Compare with SLO defined in ownership.yaml
    ROUND(
        100.0 * SUM(
            CASE WHEN processing_latency_ms <= slo_target_ms THEN 1 ELSE 0 END
        ) / COUNT(*),
        2
    ) AS slo_achievement_pct
FROM event_processing_log epl
JOIN event_slo_targets est
    ON epl.consumer_service = est.consumer_service
    AND epl.event_type = est.event_type
WHERE epl.processed_at >= NOW() - INTERVAL '24 hours'
GROUP BY consumer_service, event_type
ORDER BY slo_achievement_pct ASC;

-- 2. DLQ status: unresolved events by team
SELECT
    owner_team,
    event_type,
    COUNT(*) AS dlq_count,
    MIN(entered_dlq_at) AS oldest_entry,
    EXTRACT(EPOCH FROM (NOW() - MIN(entered_dlq_at))) / 3600 AS oldest_hours,
    -- Count of entries older than 24 hours (P1 alert target)
    SUM(CASE WHEN entered_dlq_at < NOW() - INTERVAL '24 hours' THEN 1 ELSE 0 END)
        AS stale_count
FROM dead_letter_queue dlq
JOIN event_ownership eo ON dlq.event_type = eo.event_type
WHERE dlq.resolved_at IS NULL
GROUP BY owner_team, event_type
ORDER BY oldest_hours DESC;

-- 3. Weekly event processing trend
SELECT
    date_trunc('day', processed_at) AS day,
    event_type,
    COUNT(*) AS total,
    SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS successes,
    SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS failures,
    SUM(CASE WHEN status = 'dlq' THEN 1 ELSE 0 END) AS dlq_entries,
    ROUND(
        100.0 * SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) / COUNT(*),
        2
    ) AS success_rate_pct
FROM event_processing_log
WHERE processed_at >= NOW() - INTERVAL '7 days'
GROUP BY 1, 2
ORDER BY 1 DESC, success_rate_pct ASC;

Cost Governance: Visualizing Event Infrastructure Costs

Kafka cluster, event processing compute, and storage costs increase proportionally with event volume. Without tagging and visualizing costs by team, you cannot find the starting point for cost optimization.

"""
Event infrastructure cost tagging and report generator.

Maps event volume produced/consumed by each team
to corresponding infrastructure costs for the FinOps dashboard.
"""
from dataclasses import dataclass
from typing import Optional


@dataclass
class EventCostEntry:
    team: str
    event_type: str
    role: str               # "producer" or "consumer"
    daily_volume: int
    avg_payload_bytes: int
    estimated_daily_cost_usd: float


@dataclass
class CostBreakdown:
    # Kafka cost components
    broker_storage_usd: float    # Message retention storage
    broker_network_usd: float    # Network transfer
    consumer_compute_usd: float  # Consumer computing resources
    schema_registry_usd: float   # Schema registry operations

    @property
    def total_usd(self) -> float:
        return (
            self.broker_storage_usd
            + self.broker_network_usd
            + self.consumer_compute_usd
            + self.schema_registry_usd
        )


class EventCostCalculator:
    """Event-based infrastructure cost calculator.

    Estimates costs per team/event based on unit cost constants.
    Must be periodically calibrated against actual cloud costs.
    """

    # Unit cost constants (estimated based on AWS MSK)
    STORAGE_PER_GB_DAY_USD = 0.10
    NETWORK_PER_GB_USD = 0.05
    CONSUMER_PER_MILLION_EVENTS_USD = 0.50
    RETENTION_DAYS = 7

    def calculate_team_cost(
        self,
        entries: list[EventCostEntry],
    ) -> dict[str, CostBreakdown]:
        """Calculates daily cost per team."""
        team_costs: dict[str, CostBreakdown] = {}

        for entry in entries:
            daily_data_gb = (
                entry.daily_volume * entry.avg_payload_bytes
            ) / (1024 ** 3)

            cost = CostBreakdown(
                broker_storage_usd=daily_data_gb * self.STORAGE_PER_GB_DAY_USD * self.RETENTION_DAYS,
                broker_network_usd=daily_data_gb * self.NETWORK_PER_GB_USD,
                consumer_compute_usd=(
                    entry.daily_volume / 1_000_000
                ) * self.CONSUMER_PER_MILLION_EVENTS_USD if entry.role == "consumer" else 0,
                schema_registry_usd=0.01,  # Fixed split
            )

            if entry.team in team_costs:
                existing = team_costs[entry.team]
                team_costs[entry.team] = CostBreakdown(
                    broker_storage_usd=existing.broker_storage_usd + cost.broker_storage_usd,
                    broker_network_usd=existing.broker_network_usd + cost.broker_network_usd,
                    consumer_compute_usd=existing.consumer_compute_usd + cost.consumer_compute_usd,
                    schema_registry_usd=existing.schema_registry_usd + cost.schema_registry_usd,
                )
            else:
                team_costs[entry.team] = cost

        return team_costs

    def generate_report(self, team_costs: dict[str, CostBreakdown]) -> str:
        """Generates a text report of costs per team."""
        lines = ["=== Event Infrastructure Daily Cost Report ===\n"]

        total = 0.0
        for team, cost in sorted(team_costs.items(), key=lambda x: x[1].total_usd, reverse=True):
            lines.append(f"Team: {team}")
            lines.append(f"  Storage:  ${cost.broker_storage_usd:.2f}")
            lines.append(f"  Network:  ${cost.broker_network_usd:.2f}")
            lines.append(f"  Compute:  ${cost.consumer_compute_usd:.2f}")
            lines.append(f"  Registry: ${cost.schema_registry_usd:.2f}")
            lines.append(f"  TOTAL:    ${cost.total_usd:.2f}\n")
            total += cost.total_usd

        lines.append(f"=== Grand Total: ${total:.2f}/day (${total*30:.2f}/month est.) ===")
        return "\n".join(lines)

Event Catalog: Solving the Discovery Problem

"What events exist in our system?" "Who publishes this event and who consumes it?"

When the number of events exceeds 50, it becomes difficult to answer these questions immediately. An event catalog makes all event metadata searchable in one place.

# event-catalog.yaml
# Event Catalog: Centralized management of all event metadata

catalog:
  - event_type: order.created
    domain: order
    description: 'Published when a customer creates a new order.'
    schema_path: schemas/order/order.created.v2.json
    current_version: 2
    topic: order-events
    partition_key: order_id
    avg_daily_volume: 150000
    avg_payload_bytes: 1200
    retention_days: 7
    producer:
      service: order-service
      team: order-team
    consumers:
      - service: payment-service
        team: payment-team
      - service: inventory-service
        team: inventory-team
      - service: analytics-pipeline
        team: analytics-team
    tags: ['critical', 'transactional']
    created_at: '2024-06-15'
    last_schema_change: '2025-11-20'

  - event_type: order.cancelled
    domain: order
    description: 'Published when an order is cancelled. Includes reason field.'
    schema_path: schemas/order/order.cancelled.v1.json
    current_version: 1
    topic: order-events
    partition_key: order_id
    avg_daily_volume: 8000
    avg_payload_bytes: 800
    retention_days: 7
    producer:
      service: order-service
      team: order-team
    consumers:
      - service: payment-service
        team: payment-team
      - service: inventory-service
        team: inventory-team
    tags: ['critical', 'transactional']
    created_at: '2024-06-15'
    last_schema_change: '2024-06-15'

  - event_type: user.profile.updated
    domain: user
    description: 'Published when user profile information is changed.'
    schema_path: schemas/user/user.profile.updated.v1.json
    current_version: 1
    topic: user-events
    partition_key: user_id
    avg_daily_volume: 25000
    avg_payload_bytes: 500
    retention_days: 3
    producer:
      service: user-service
      team: user-team
    consumers:
      - service: recommendation-service
        team: ml-team
      - service: notification-service
        team: notification-team
    tags: ['non-critical']
    created_at: '2024-08-01'
    last_schema_change: '2025-03-10'

Governance Adoption Roadmap

Introducing all governance at once only invites team resistance. Start incrementally, beginning with the areas of greatest pain.

Phase 1: Establish Visibility (2-4 weeks)
├── Draft event catalog (inventory of existing events)
├── Build DLQ count dashboard
├── Start collecting event processing success/failure metrics
└── Result: "Current state" becomes visible through numbers

Phase 2: Establish Ownership (2-4 weeks)
├── Create ownership.yaml (producer/consumer mapping)
├── Assign DLQ processing responsible teams
├── Establish notification process for schema changes to consumer teams
└── Result: "Whose responsibility is it?" becomes answerable

Phase 3: Automated Rule Enforcement (4-8 weeks)
├── Build schema compatibility check CI pipeline
├── Build module boundary violation detection CI
├── Automate SLO violation alerts
└── Result: Rule violations are blocked before deployment

Phase 4: Cost Optimization (ongoing)
├── Tag and report event costs per team
├── Clean up unnecessary event consumption
├── Optimize retention policies
└── Result: Costs become visible and optimization begins

Practical Troubleshooting

Schema Changed Without Consumer Knowledge, Causing an Incident

Symptom: payment-service suddenly produces massive KeyError: 'customer_id' errors. The order-service team renamed customer_id to buyer_id without notifying consumer teams.

Post-incident response: (1) Immediately roll back order-service to the previous version. (2) In the postmortem, identify the absence of a schema change process as the root cause.

Prevention: (1) Introduce schema compatibility check CI to block BACKWARD-incompatible changes before deployment. (2) Based on ownership.yaml, automatically add consumer teams as reviewers on schema change PRs. (3) Major schema changes require a prior notice period of governance_rules.schema_change_notice_days (7 days).

Thousands of Items Piling Up in DLQ With Nobody Processing Them

Symptom: 3,000 items are accumulated in the DLQ dashboard, with the oldest being 2 weeks old.

Cause: No team was assigned DLQ processing responsibility, or alerts were not configured so nobody was aware.

Response: (1) Identify the consumer team for the event from ownership.yaml. (2) Set up DLQ count alerts: WARNING when exceeding 100 items, P1 when entries are older than 24 hours. (3) Include weekly DLQ review meetings in team routines. (4) Implement triage automation to classify events that can be automatically reprocessed (transient errors) vs those requiring manual intervention (schema mismatches, business logic errors).

Event Volume Spike Causing Kafka Cluster Cost Explosion

Symptom: Monthly Kafka costs jumped from 3,000to3,000 to 12,000.

Root cause analysis: Without cost tagging, it was impossible to immediately determine which team's events were responsible. Investigation revealed the analytics team's newly added page.viewed event was generating 50 million events per day with payloads of 5KB each.

Response: (1) Build per-event volume monitoring. (2) Establish a process requiring expected volume and cost impact documentation in the catalog when registering new events. (3) Optimize the event payload (remove unnecessary fields, apply compression). (4) Convert analytics events that do not require real-time processing to batch processing to reduce Kafka load.

References

Quiz
  1. What is the first problem that occurs when governance is absent in EDA? Answer: Event schema changes propagate to consumers without prior notice, causing service outages, and the unclear producer/consumer responsibility boundaries delay incident resolution.

  2. If BACKWARD compatibility is guaranteed, what deployment order is safe? Answer: It is safe to deploy consumers first. Existing consumers can ignore additional optional fields in messages produced with the new schema, and since existing required fields are maintained, existing consumer code does not break.

  3. Why define SLOs for consumers in the event ownership mapping? Answer: Each consumer has different processing time requirements. The payment service needs processing within 500ms while the analytics pipeline allows up to 60 seconds. Defining SLOs enables sending alerts to the appropriate team with the appropriate priority when violations occur.

  4. What are the three core elements of DLQ governance? Answer: (1) Assigning DLQ processing responsible teams (ownership), (2) setting up alerts for DLQ count and dwell time (monitoring), (3) automating classification of events that can be automatically reprocessed vs those requiring manual intervention (triage).

  5. What is the key benefit of enforcing schema compatibility checks in CI? Answer: Incompatible schema changes are blocked before reaching production. This prevents service outages caused by schema changes at the source and allows verification of change impact before deployment.

  6. What is the first step in introducing event cost governance? Answer: Tagging and visualizing event volume and costs per team on a dashboard. If it is not visible which team's events are consuming costs, optimization targets cannot be identified.

  7. Why does "establishing visibility" come first in the governance adoption roadmap? Answer: Without being able to see the current state in numbers, there is no evidence to persuade the team of the need for ownership establishment or rule enforcement. Having data like "3,000 items accumulated in DLQ and neglected for 2 weeks" is necessary to gain team buy-in for governance adoption.

  8. What are five pieces of information that must be included when registering a new event in the catalog? Answer: (1) Event type and description, (2) producer service/team, (3) schema path and version, (4) expected daily volume and payload size, (5) consumer list and each consumer's processing SLO.