- Authors
- Name

- Why Event-Driven Architecture Needs Governance
- Event Schema Governance
- Ownership Governance: Who Is Responsible for This Event?
- Quality Governance: SLO-Based Event Processing Monitoring
- Cost Governance: Visualizing Event Infrastructure Costs
- Event Catalog: Solving the Discovery Problem
- Governance Adoption Roadmap
- Practical Troubleshooting
- References
Why Event-Driven Architecture Needs Governance
Event-Driven Architecture (EDA) is a powerful pattern that reduces coupling between services and increases scalability. However, EDA without governance rapidly descends into chaos.
Team A renamed a field in the user.updated event, and the services of teams B, C, and D that subscribed to this event all broke simultaneously. Nobody knows who consumes this event, and schema changes are deployed without notice. Thousands of processing failure events pile up in the Dead Letter Queue, but it is unclear whose responsibility they are.
This situation is not a problem with EDA but a problem of missing governance. Governance refers to the system of creating and enforcing rules. This post covers the four pillars of governance that are practically necessary in event-driven systems:
- Event Schema Governance: Rules for event format and evolution
- Ownership Governance: Responsibility boundaries for event producers/consumers
- Quality Governance: SLOs and monitoring for event processing
- Cost Governance: Cost visibility and control for event infrastructure
Event Schema Governance
Introducing a Schema Registry
If event schemas are kept implicitly within code, the impact of changes cannot be assessed. Schemas should be managed in a central registry, with compatibility enforced in CI.
You can use Confluent Schema Registry, AWS Glue Schema Registry, or a self-built Git-based registry. Here we demonstrate a lightweight Git-based approach.
schemas/
├── order/
│ ├── order.created.v1.json
│ ├── order.created.v2.json # BACKWARD compatible with v1
│ ├── order.confirmed.v1.json
│ └── order.cancelled.v1.json
├── payment/
│ ├── payment.reserved.v1.json
│ └── payment.completed.v1.json
├── inventory/
│ └── inventory.reserved.v1.json
└── _meta/
├── compatibility-rules.yaml # Compatibility policies
└── ownership.yaml # Event ownership mapping
Automating Compatibility Checks
Automatically verify in CI that schema changes do not break existing consumers.
"""
Event schema compatibility checker.
Validates BACKWARD compatibility of JSON Schemas.
BACKWARD compatible = messages produced with the new schema
can be processed by consumers using the previous schema.
Rules:
- Adding new fields: OK (must be optional)
- Removing existing required fields: FAIL
- Changing existing field types: FAIL
- Adding required fields: FAIL (existing consumers don't send this field)
"""
import json
import sys
from pathlib import Path
from dataclasses import dataclass
@dataclass
class CompatibilityIssue:
severity: str # "ERROR" or "WARNING"
path: str # JSON path
message: str
def check_backward_compatibility(
old_schema: dict,
new_schema: dict,
) -> list[CompatibilityIssue]:
"""Checks BACKWARD compatibility between old and new schemas."""
issues = []
old_props = old_schema.get("properties", {})
new_props = new_schema.get("properties", {})
old_required = set(old_schema.get("required", []))
new_required = set(new_schema.get("required", []))
# 1. Check if existing fields have been removed
for field_name in old_props:
if field_name not in new_props:
issues.append(CompatibilityIssue(
severity="ERROR",
path=f"$.properties.{field_name}",
message=f"Field '{field_name}' was removed. "
f"Existing consumers may depend on this field.",
))
# 2. Check if existing field types have changed
for field_name in old_props:
if field_name in new_props:
old_type = old_props[field_name].get("type")
new_type = new_props[field_name].get("type")
if old_type and new_type and old_type != new_type:
issues.append(CompatibilityIssue(
severity="ERROR",
path=f"$.properties.{field_name}.type",
message=f"Type of '{field_name}' changed from "
f"'{old_type}' to '{new_type}'.",
))
# 3. Check if new required fields have been added
new_required_fields = new_required - old_required
for field_name in new_required_fields:
if field_name not in old_props:
issues.append(CompatibilityIssue(
severity="ERROR",
path=f"$.required.{field_name}",
message=f"New required field '{field_name}' added. "
f"Existing producers don't include this field.",
))
# 4. Record new optional field additions as WARNING
for field_name in new_props:
if field_name not in old_props:
if field_name not in new_required:
issues.append(CompatibilityIssue(
severity="WARNING",
path=f"$.properties.{field_name}",
message=f"New optional field '{field_name}' added. "
f"Consumers should handle missing field gracefully.",
))
# Recursively check nested object properties
for field_name in old_props:
if field_name in new_props:
old_field = old_props[field_name]
new_field = new_props[field_name]
if old_field.get("type") == "object" and new_field.get("type") == "object":
nested_issues = check_backward_compatibility(old_field, new_field)
for issue in nested_issues:
issue.path = f"$.properties.{field_name}{issue.path[1:]}"
issues.append(issue)
return issues
def validate_schema_evolution(schema_dir: str) -> bool:
"""Validates compatibility across versions for all schemas in the directory."""
schema_path = Path(schema_dir)
all_pass = True
for domain_dir in sorted(schema_path.iterdir()):
if not domain_dir.is_dir() or domain_dir.name.startswith("_"):
continue
# Group versions of the same event type
event_versions: dict[str, list[Path]] = {}
for schema_file in sorted(domain_dir.glob("*.json")):
# event_name.v1.json -> event_name
parts = schema_file.stem.rsplit(".v", 1)
if len(parts) == 2:
event_name = parts[0]
event_versions.setdefault(event_name, []).append(schema_file)
# Check compatibility between consecutive versions
for event_name, versions in event_versions.items():
for i in range(len(versions) - 1):
old = json.loads(versions[i].read_text())
new = json.loads(versions[i + 1].read_text())
issues = check_backward_compatibility(old, new)
errors = [i for i in issues if i.severity == "ERROR"]
if errors:
all_pass = False
print(f"\nFAIL: {versions[i].name} -> {versions[i+1].name}")
for err in errors:
print(f" [{err.severity}] {err.path}: {err.message}")
return all_pass
if __name__ == "__main__":
schema_dir = sys.argv[1] if len(sys.argv) > 1 else "schemas"
success = validate_schema_evolution(schema_dir)
sys.exit(0 if success else 1)
Ownership Governance: Who Is Responsible for This Event?
If event producers and consumers are not clearly defined, responsibility gaps emerge during incidents. You must be able to immediately answer questions like "Who publishes this event?" and "Who should handle this DLQ?"
# schemas/_meta/ownership.yaml
# Event ownership mapping. This file defines ownership relationships for all events.
events:
order.created:
producer:
team: order-team
service: order-service
slack_channel: '#order-alerts'
oncall_rotation: 'order-oncall'
consumers:
- team: payment-team
service: payment-service
purpose: 'Trigger payment reservation'
slo_processing_time_ms: 500
- team: inventory-team
service: inventory-service
purpose: 'Trigger inventory reservation'
slo_processing_time_ms: 1000
- team: analytics-team
service: analytics-pipeline
purpose: 'Collect order analytics data'
slo_processing_time_ms: 60000 # Near real-time, not real-time
order.cancelled:
producer:
team: order-team
service: order-service
slack_channel: '#order-alerts'
oncall_rotation: 'order-oncall'
consumers:
- team: payment-team
service: payment-service
purpose: 'Trigger payment refund'
slo_processing_time_ms: 500
- team: inventory-team
service: inventory-service
purpose: 'Release inventory'
slo_processing_time_ms: 1000
payment.completed:
producer:
team: payment-team
service: payment-service
slack_channel: '#payment-alerts'
oncall_rotation: 'payment-oncall'
consumers:
- team: order-team
service: order-service
purpose: 'Update order status to confirmed'
slo_processing_time_ms: 500
- team: notification-team
service: notification-service
purpose: 'Send payment completion notification'
slo_processing_time_ms: 3000
governance_rules:
# Prior notice to all consumer teams is required for schema changes
schema_change_notice_days: 7
# Producer team approval is required for new consumer registration
consumer_registration: requires_producer_approval
# Alert when consumers fail to meet SLO
slo_violation_alert: true
Based on this ownership file, you can build CI automation that automatically sends review requests to consumer teams when a schema change PR is submitted.
"""
Script that automatically notifies affected consumer teams on schema changes.
Detects changed schema files via git diff,
looks up consumer teams from ownership.yaml, and sends Slack notifications.
"""
import yaml
import subprocess
from pathlib import Path
def get_changed_schemas() -> list[str]:
"""Returns the list of changed schema files from git diff."""
result = subprocess.run(
["git", "diff", "--name-only", "origin/main", "--", "schemas/"],
capture_output=True, text=True,
)
return [
line.strip() for line in result.stdout.splitlines()
if line.strip().endswith(".json")
]
def load_ownership() -> dict:
with open("schemas/_meta/ownership.yaml") as f:
return yaml.safe_load(f)
def find_affected_consumers(
changed_files: list[str],
ownership: dict,
) -> dict[str, list[dict]]:
"""Returns the list of consumer teams for changed schemas."""
affected: dict[str, list[dict]] = {}
for filepath in changed_files:
# schemas/order/order.created.v2.json -> order.created
parts = Path(filepath).stem.rsplit(".v", 1)
if len(parts) != 2:
continue
event_type = parts[0].replace("/", ".")
# domain/event_name -> domain.event_name format
path_parts = Path(filepath).parts
if len(path_parts) >= 3:
event_type = f"{path_parts[1]}.{parts[0].split('.')[-1] if '.' in parts[0] else parts[0]}"
event_info = ownership.get("events", {}).get(event_type, {})
consumers = event_info.get("consumers", [])
if consumers:
affected[event_type] = consumers
return affected
def notify_consumers(affected: dict[str, list[dict]]):
"""Sends Slack notifications to affected consumer teams."""
for event_type, consumers in affected.items():
teams = set(c["team"] for c in consumers)
channels = set()
for team_name in teams:
# Look up team Slack channel (from ownership in practice)
channels.add(f"#{team_name}-alerts")
message = (
f"Schema change detected for `{event_type}`.\n"
f"Affected consumer teams: {', '.join(teams)}\n"
f"Please review the schema change PR for backward compatibility."
)
print(f"[NOTIFY] {channels}: {message}")
# In practice, call Slack API
if __name__ == "__main__":
changed = get_changed_schemas()
if not changed:
print("No schema changes detected.")
else:
ownership = load_ownership()
affected = find_affected_consumers(changed, ownership)
if affected:
notify_consumers(affected)
else:
print("Changed schemas have no registered consumers.")
Quality Governance: SLO-Based Event Processing Monitoring
To measure event processing quality, three SLIs (Service Level Indicators) are needed:
- Processing Latency: Time from event publication to consumption completion
- Success Rate: Percentage of successful event processing
- DLQ Dwell Time: Time from when an event enters the DLQ until it is resolved
-- Event Processing SLO Dashboard Queries
-- 1. Processing latency SLO achievement rate by consumer
-- SLO: 99% of events must be processed within the defined time
SELECT
consumer_service,
event_type,
COUNT(*) AS total_events,
PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY processing_latency_ms) AS p50_ms,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY processing_latency_ms) AS p99_ms,
-- Compare with SLO defined in ownership.yaml
ROUND(
100.0 * SUM(
CASE WHEN processing_latency_ms <= slo_target_ms THEN 1 ELSE 0 END
) / COUNT(*),
2
) AS slo_achievement_pct
FROM event_processing_log epl
JOIN event_slo_targets est
ON epl.consumer_service = est.consumer_service
AND epl.event_type = est.event_type
WHERE epl.processed_at >= NOW() - INTERVAL '24 hours'
GROUP BY consumer_service, event_type
ORDER BY slo_achievement_pct ASC;
-- 2. DLQ status: unresolved events by team
SELECT
owner_team,
event_type,
COUNT(*) AS dlq_count,
MIN(entered_dlq_at) AS oldest_entry,
EXTRACT(EPOCH FROM (NOW() - MIN(entered_dlq_at))) / 3600 AS oldest_hours,
-- Count of entries older than 24 hours (P1 alert target)
SUM(CASE WHEN entered_dlq_at < NOW() - INTERVAL '24 hours' THEN 1 ELSE 0 END)
AS stale_count
FROM dead_letter_queue dlq
JOIN event_ownership eo ON dlq.event_type = eo.event_type
WHERE dlq.resolved_at IS NULL
GROUP BY owner_team, event_type
ORDER BY oldest_hours DESC;
-- 3. Weekly event processing trend
SELECT
date_trunc('day', processed_at) AS day,
event_type,
COUNT(*) AS total,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS successes,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS failures,
SUM(CASE WHEN status = 'dlq' THEN 1 ELSE 0 END) AS dlq_entries,
ROUND(
100.0 * SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) / COUNT(*),
2
) AS success_rate_pct
FROM event_processing_log
WHERE processed_at >= NOW() - INTERVAL '7 days'
GROUP BY 1, 2
ORDER BY 1 DESC, success_rate_pct ASC;
Cost Governance: Visualizing Event Infrastructure Costs
Kafka cluster, event processing compute, and storage costs increase proportionally with event volume. Without tagging and visualizing costs by team, you cannot find the starting point for cost optimization.
"""
Event infrastructure cost tagging and report generator.
Maps event volume produced/consumed by each team
to corresponding infrastructure costs for the FinOps dashboard.
"""
from dataclasses import dataclass
from typing import Optional
@dataclass
class EventCostEntry:
team: str
event_type: str
role: str # "producer" or "consumer"
daily_volume: int
avg_payload_bytes: int
estimated_daily_cost_usd: float
@dataclass
class CostBreakdown:
# Kafka cost components
broker_storage_usd: float # Message retention storage
broker_network_usd: float # Network transfer
consumer_compute_usd: float # Consumer computing resources
schema_registry_usd: float # Schema registry operations
@property
def total_usd(self) -> float:
return (
self.broker_storage_usd
+ self.broker_network_usd
+ self.consumer_compute_usd
+ self.schema_registry_usd
)
class EventCostCalculator:
"""Event-based infrastructure cost calculator.
Estimates costs per team/event based on unit cost constants.
Must be periodically calibrated against actual cloud costs.
"""
# Unit cost constants (estimated based on AWS MSK)
STORAGE_PER_GB_DAY_USD = 0.10
NETWORK_PER_GB_USD = 0.05
CONSUMER_PER_MILLION_EVENTS_USD = 0.50
RETENTION_DAYS = 7
def calculate_team_cost(
self,
entries: list[EventCostEntry],
) -> dict[str, CostBreakdown]:
"""Calculates daily cost per team."""
team_costs: dict[str, CostBreakdown] = {}
for entry in entries:
daily_data_gb = (
entry.daily_volume * entry.avg_payload_bytes
) / (1024 ** 3)
cost = CostBreakdown(
broker_storage_usd=daily_data_gb * self.STORAGE_PER_GB_DAY_USD * self.RETENTION_DAYS,
broker_network_usd=daily_data_gb * self.NETWORK_PER_GB_USD,
consumer_compute_usd=(
entry.daily_volume / 1_000_000
) * self.CONSUMER_PER_MILLION_EVENTS_USD if entry.role == "consumer" else 0,
schema_registry_usd=0.01, # Fixed split
)
if entry.team in team_costs:
existing = team_costs[entry.team]
team_costs[entry.team] = CostBreakdown(
broker_storage_usd=existing.broker_storage_usd + cost.broker_storage_usd,
broker_network_usd=existing.broker_network_usd + cost.broker_network_usd,
consumer_compute_usd=existing.consumer_compute_usd + cost.consumer_compute_usd,
schema_registry_usd=existing.schema_registry_usd + cost.schema_registry_usd,
)
else:
team_costs[entry.team] = cost
return team_costs
def generate_report(self, team_costs: dict[str, CostBreakdown]) -> str:
"""Generates a text report of costs per team."""
lines = ["=== Event Infrastructure Daily Cost Report ===\n"]
total = 0.0
for team, cost in sorted(team_costs.items(), key=lambda x: x[1].total_usd, reverse=True):
lines.append(f"Team: {team}")
lines.append(f" Storage: ${cost.broker_storage_usd:.2f}")
lines.append(f" Network: ${cost.broker_network_usd:.2f}")
lines.append(f" Compute: ${cost.consumer_compute_usd:.2f}")
lines.append(f" Registry: ${cost.schema_registry_usd:.2f}")
lines.append(f" TOTAL: ${cost.total_usd:.2f}\n")
total += cost.total_usd
lines.append(f"=== Grand Total: ${total:.2f}/day (${total*30:.2f}/month est.) ===")
return "\n".join(lines)
Event Catalog: Solving the Discovery Problem
"What events exist in our system?" "Who publishes this event and who consumes it?"
When the number of events exceeds 50, it becomes difficult to answer these questions immediately. An event catalog makes all event metadata searchable in one place.
# event-catalog.yaml
# Event Catalog: Centralized management of all event metadata
catalog:
- event_type: order.created
domain: order
description: 'Published when a customer creates a new order.'
schema_path: schemas/order/order.created.v2.json
current_version: 2
topic: order-events
partition_key: order_id
avg_daily_volume: 150000
avg_payload_bytes: 1200
retention_days: 7
producer:
service: order-service
team: order-team
consumers:
- service: payment-service
team: payment-team
- service: inventory-service
team: inventory-team
- service: analytics-pipeline
team: analytics-team
tags: ['critical', 'transactional']
created_at: '2024-06-15'
last_schema_change: '2025-11-20'
- event_type: order.cancelled
domain: order
description: 'Published when an order is cancelled. Includes reason field.'
schema_path: schemas/order/order.cancelled.v1.json
current_version: 1
topic: order-events
partition_key: order_id
avg_daily_volume: 8000
avg_payload_bytes: 800
retention_days: 7
producer:
service: order-service
team: order-team
consumers:
- service: payment-service
team: payment-team
- service: inventory-service
team: inventory-team
tags: ['critical', 'transactional']
created_at: '2024-06-15'
last_schema_change: '2024-06-15'
- event_type: user.profile.updated
domain: user
description: 'Published when user profile information is changed.'
schema_path: schemas/user/user.profile.updated.v1.json
current_version: 1
topic: user-events
partition_key: user_id
avg_daily_volume: 25000
avg_payload_bytes: 500
retention_days: 3
producer:
service: user-service
team: user-team
consumers:
- service: recommendation-service
team: ml-team
- service: notification-service
team: notification-team
tags: ['non-critical']
created_at: '2024-08-01'
last_schema_change: '2025-03-10'
Governance Adoption Roadmap
Introducing all governance at once only invites team resistance. Start incrementally, beginning with the areas of greatest pain.
Phase 1: Establish Visibility (2-4 weeks)
├── Draft event catalog (inventory of existing events)
├── Build DLQ count dashboard
├── Start collecting event processing success/failure metrics
└── Result: "Current state" becomes visible through numbers
Phase 2: Establish Ownership (2-4 weeks)
├── Create ownership.yaml (producer/consumer mapping)
├── Assign DLQ processing responsible teams
├── Establish notification process for schema changes to consumer teams
└── Result: "Whose responsibility is it?" becomes answerable
Phase 3: Automated Rule Enforcement (4-8 weeks)
├── Build schema compatibility check CI pipeline
├── Build module boundary violation detection CI
├── Automate SLO violation alerts
└── Result: Rule violations are blocked before deployment
Phase 4: Cost Optimization (ongoing)
├── Tag and report event costs per team
├── Clean up unnecessary event consumption
├── Optimize retention policies
└── Result: Costs become visible and optimization begins
Practical Troubleshooting
Schema Changed Without Consumer Knowledge, Causing an Incident
Symptom: payment-service suddenly produces massive KeyError: 'customer_id' errors. The order-service team renamed customer_id to buyer_id without notifying consumer teams.
Post-incident response: (1) Immediately roll back order-service to the previous version. (2) In the postmortem, identify the absence of a schema change process as the root cause.
Prevention: (1) Introduce schema compatibility check CI to block BACKWARD-incompatible changes before deployment. (2) Based on ownership.yaml, automatically add consumer teams as reviewers on schema change PRs. (3) Major schema changes require a prior notice period of governance_rules.schema_change_notice_days (7 days).
Thousands of Items Piling Up in DLQ With Nobody Processing Them
Symptom: 3,000 items are accumulated in the DLQ dashboard, with the oldest being 2 weeks old.
Cause: No team was assigned DLQ processing responsibility, or alerts were not configured so nobody was aware.
Response: (1) Identify the consumer team for the event from ownership.yaml. (2) Set up DLQ count alerts: WARNING when exceeding 100 items, P1 when entries are older than 24 hours. (3) Include weekly DLQ review meetings in team routines. (4) Implement triage automation to classify events that can be automatically reprocessed (transient errors) vs those requiring manual intervention (schema mismatches, business logic errors).
Event Volume Spike Causing Kafka Cluster Cost Explosion
Symptom: Monthly Kafka costs jumped from 12,000.
Root cause analysis: Without cost tagging, it was impossible to immediately determine which team's events were responsible. Investigation revealed the analytics team's newly added page.viewed event was generating 50 million events per day with payloads of 5KB each.
Response: (1) Build per-event volume monitoring. (2) Establish a process requiring expected volume and cost impact documentation in the catalog when registering new events. (3) Optimize the event payload (remove unnecessary fields, apply compression). (4) Convert analytics events that do not require real-time processing to batch processing to reduce Kafka load.
References
- Martin Fowler, "What do you mean by Event-Driven?" -- martinfowler.com/articles/201701-event-driven
- Confluent Schema Registry Documentation -- docs.confluent.io/platform/current/schema-registry
- Chris Richardson, "Microservices Patterns", Manning, 2018 -- Chapter 3: Interprocess communication
- Event Catalog (Open Source) -- eventcatalog.dev
- "Event-Driven Architecture: Pros and Cons" -- solace.com/blog/event-driven-architecture-pros-and-cons
- CloudEvents Specification -- cloudevents.io
- OpenTelemetry Documentation -- opentelemetry.io/docs
Quiz
What is the first problem that occurs when governance is absent in EDA? Answer: Event schema changes propagate to consumers without prior notice, causing service outages, and the unclear producer/consumer responsibility boundaries delay incident resolution.
If BACKWARD compatibility is guaranteed, what deployment order is safe? Answer: It is safe to deploy consumers first. Existing consumers can ignore additional optional fields in messages produced with the new schema, and since existing required fields are maintained, existing consumer code does not break.
Why define SLOs for consumers in the event ownership mapping? Answer: Each consumer has different processing time requirements. The payment service needs processing within 500ms while the analytics pipeline allows up to 60 seconds. Defining SLOs enables sending alerts to the appropriate team with the appropriate priority when violations occur.
What are the three core elements of DLQ governance? Answer: (1) Assigning DLQ processing responsible teams (ownership), (2) setting up alerts for DLQ count and dwell time (monitoring), (3) automating classification of events that can be automatically reprocessed vs those requiring manual intervention (triage).
What is the key benefit of enforcing schema compatibility checks in CI? Answer: Incompatible schema changes are blocked before reaching production. This prevents service outages caused by schema changes at the source and allows verification of change impact before deployment.
What is the first step in introducing event cost governance? Answer: Tagging and visualizing event volume and costs per team on a dashboard. If it is not visible which team's events are consuming costs, optimization targets cannot be identified.
Why does "establishing visibility" come first in the governance adoption roadmap? Answer: Without being able to see the current state in numbers, there is no evidence to persuade the team of the need for ownership establishment or rule enforcement. Having data like "3,000 items accumulated in DLQ and neglected for 2 weeks" is necessary to gain team buy-in for governance adoption.
What are five pieces of information that must be included when registering a new event in the catalog? Answer: (1) Event type and description, (2) producer service/team, (3) schema path and version, (4) expected daily volume and payload size, (5) consumer list and each consumer's processing SLO.