Skip to content
Published on

아키텍처: 이벤트 기반 거버넌스 실전 2026

Authors
아키텍처: 이벤트 기반 거버넌스 실전 2026

이벤트 기반 아키텍처에 거버넌스가 필요한 이유

Event-Driven Architecture(EDA)는 서비스 간 결합도를 낮추고 확장성을 높이는 강력한 패턴이다. 하지만 거버넌스 없는 EDA는 빠르게 혼돈에 빠진다.

팀 A가 user.updated 이벤트의 필드명을 바꿨더니, 이 이벤트를 구독하던 팀 B, C, D의 서비스가 동시에 깨진다. 누가 이 이벤트를 소비하는지 아무도 모르고, 스키마 변경이 공지 없이 배포된다. Dead Letter Queue에 수천 건의 처리 실패 이벤트가 쌓여 있지만, 그것이 누구의 책임인지 불분명하다.

이런 상황은 EDA의 문제가 아니라 거버넌스의 부재 문제다. 거버넌스란 규칙을 만들고 강제하는 체계를 뜻한다. 이 글은 이벤트 기반 시스템에서 실무적으로 필요한 거버넌스의 네 가지 축을 다룬다.

  1. 이벤트 스키마 거버넌스: 이벤트의 형식과 진화 규칙
  2. 소유권 거버넌스: 이벤트의 생산자/소비자 책임 경계
  3. 품질 거버넌스: 이벤트 처리의 SLO와 모니터링
  4. 비용 거버넌스: 이벤트 인프라의 비용 가시성과 통제

이벤트 스키마 거버넌스

스키마 레지스트리 도입

이벤트 스키마를 코드 안에 암묵적으로 두면, 변경 영향도를 파악할 수 없다. 스키마를 중앙 레지스트리에서 관리하고, CI에서 호환성을 강제해야 한다.

Confluent Schema Registry, AWS Glue Schema Registry, 또는 자체 구축한 Git 기반 레지스트리를 사용할 수 있다. 여기서는 Git 기반의 가벼운 접근을 보여준다.

schemas/
├── order/
│   ├── order.created.v1.json
│   ├── order.created.v2.json     # v1과 BACKWARD 호환
│   ├── order.confirmed.v1.json
│   └── order.cancelled.v1.json
├── payment/
│   ├── payment.reserved.v1.json
│   └── payment.completed.v1.json
├── inventory/
│   └── inventory.reserved.v1.json
└── _meta/
    ├── compatibility-rules.yaml  # 호환성 정책
    └── ownership.yaml            # 이벤트 소유권 매핑

호환성 검사 자동화

스키마 변경이 기존 소비자를 깨뜨리지 않는지 CI에서 자동 검증한다.

"""
이벤트 스키마 호환성 검사기.

JSON Schema의 BACKWARD 호환성을 검증한다.
BACKWARD 호환 = 새 스키마로 생산된 메시지를 이전 스키마의 소비자가 처리할 수 있다.

규칙:
  - 새 필드 추가: OK (optional이어야 함)
  - 기존 required 필드 삭제: FAIL
  - 기존 필드 타입 변경: FAIL
  - required 필드 추가: FAIL (기존 소비자가 이 필드를 보내지 않음)
"""
import json
import sys
from pathlib import Path
from dataclasses import dataclass


@dataclass
class CompatibilityIssue:
    severity: str   # "ERROR" or "WARNING"
    path: str       # JSON path
    message: str


def check_backward_compatibility(
    old_schema: dict,
    new_schema: dict,
) -> list[CompatibilityIssue]:
    """이전 스키마와 새 스키마의 BACKWARD 호환성을 검사한다."""
    issues = []

    old_props = old_schema.get("properties", {})
    new_props = new_schema.get("properties", {})
    old_required = set(old_schema.get("required", []))
    new_required = set(new_schema.get("required", []))

    # 1. 기존 필드가 삭제되었는지 확인
    for field_name in old_props:
        if field_name not in new_props:
            issues.append(CompatibilityIssue(
                severity="ERROR",
                path=f"$.properties.{field_name}",
                message=f"Field '{field_name}' was removed. "
                        f"Existing consumers may depend on this field.",
            ))

    # 2. 기존 필드의 타입이 변경되었는지 확인
    for field_name in old_props:
        if field_name in new_props:
            old_type = old_props[field_name].get("type")
            new_type = new_props[field_name].get("type")
            if old_type and new_type and old_type != new_type:
                issues.append(CompatibilityIssue(
                    severity="ERROR",
                    path=f"$.properties.{field_name}.type",
                    message=f"Type of '{field_name}' changed from "
                            f"'{old_type}' to '{new_type}'.",
                ))

    # 3. 새로 required가 된 필드가 있는지 확인
    new_required_fields = new_required - old_required
    for field_name in new_required_fields:
        if field_name not in old_props:
            issues.append(CompatibilityIssue(
                severity="ERROR",
                path=f"$.required.{field_name}",
                message=f"New required field '{field_name}' added. "
                        f"Existing producers don't include this field.",
            ))

    # 4. 새 optional 필드 추가는 WARNING으로 기록
    for field_name in new_props:
        if field_name not in old_props:
            if field_name not in new_required:
                issues.append(CompatibilityIssue(
                    severity="WARNING",
                    path=f"$.properties.{field_name}",
                    message=f"New optional field '{field_name}' added. "
                            f"Consumers should handle missing field gracefully.",
                ))

    # 중첩된 object의 properties도 재귀적으로 검사
    for field_name in old_props:
        if field_name in new_props:
            old_field = old_props[field_name]
            new_field = new_props[field_name]
            if old_field.get("type") == "object" and new_field.get("type") == "object":
                nested_issues = check_backward_compatibility(old_field, new_field)
                for issue in nested_issues:
                    issue.path = f"$.properties.{field_name}{issue.path[1:]}"
                    issues.append(issue)

    return issues


def validate_schema_evolution(schema_dir: str) -> bool:
    """디렉토리 내 모든 스키마의 버전 간 호환성을 검증한다."""
    schema_path = Path(schema_dir)
    all_pass = True

    for domain_dir in sorted(schema_path.iterdir()):
        if not domain_dir.is_dir() or domain_dir.name.startswith("_"):
            continue

        # 같은 이벤트 타입의 버전들을 그룹핑
        event_versions: dict[str, list[Path]] = {}
        for schema_file in sorted(domain_dir.glob("*.json")):
            # event_name.v1.json -> event_name
            parts = schema_file.stem.rsplit(".v", 1)
            if len(parts) == 2:
                event_name = parts[0]
                event_versions.setdefault(event_name, []).append(schema_file)

        # 연속 버전 간 호환성 검사
        for event_name, versions in event_versions.items():
            for i in range(len(versions) - 1):
                old = json.loads(versions[i].read_text())
                new = json.loads(versions[i + 1].read_text())

                issues = check_backward_compatibility(old, new)
                errors = [i for i in issues if i.severity == "ERROR"]

                if errors:
                    all_pass = False
                    print(f"\nFAIL: {versions[i].name} -> {versions[i+1].name}")
                    for err in errors:
                        print(f"  [{err.severity}] {err.path}: {err.message}")

    return all_pass


if __name__ == "__main__":
    schema_dir = sys.argv[1] if len(sys.argv) > 1 else "schemas"
    success = validate_schema_evolution(schema_dir)
    sys.exit(0 if success else 1)

소유권 거버넌스: 누가 이 이벤트를 책임지는가

이벤트의 생산자와 소비자가 명확하지 않으면 장애 시 책임 공백이 발생한다. "이 이벤트 누가 발행하는 거야?" "이 DLQ 처리 누가 해야 해?" 같은 질문에 즉시 답할 수 있어야 한다.

# schemas/_meta/ownership.yaml
# 이벤트 소유권 매핑. 이 파일은 모든 이벤트의 소유 관계를 정의한다.

events:
  order.created:
    producer:
      team: order-team
      service: order-service
      slack_channel: '#order-alerts'
      oncall_rotation: 'order-oncall'
    consumers:
      - team: payment-team
        service: payment-service
        purpose: '결제 예약 트리거'
        slo_processing_time_ms: 500
      - team: inventory-team
        service: inventory-service
        purpose: '재고 예약 트리거'
        slo_processing_time_ms: 1000
      - team: analytics-team
        service: analytics-pipeline
        purpose: '주문 분석 데이터 수집'
        slo_processing_time_ms: 60000 # 실시간이 아닌 준실시간

  order.cancelled:
    producer:
      team: order-team
      service: order-service
      slack_channel: '#order-alerts'
      oncall_rotation: 'order-oncall'
    consumers:
      - team: payment-team
        service: payment-service
        purpose: '결제 환불 트리거'
        slo_processing_time_ms: 500
      - team: inventory-team
        service: inventory-service
        purpose: '재고 해제'
        slo_processing_time_ms: 1000

  payment.completed:
    producer:
      team: payment-team
      service: payment-service
      slack_channel: '#payment-alerts'
      oncall_rotation: 'payment-oncall'
    consumers:
      - team: order-team
        service: order-service
        purpose: '주문 상태를 confirmed로 업데이트'
        slo_processing_time_ms: 500
      - team: notification-team
        service: notification-service
        purpose: '결제 완료 알림 발송'
        slo_processing_time_ms: 3000

governance_rules:
  # 스키마 변경 시 모든 소비자 팀에 사전 공지 필수
  schema_change_notice_days: 7
  # 새 소비자 등록 시 생산자 팀 승인 필수
  consumer_registration: requires_producer_approval
  # 소비자가 SLO를 지키지 못하면 알림
  slo_violation_alert: true

이 소유권 파일을 기반으로, 스키마 변경 PR이 올라오면 자동으로 소비자 팀에 리뷰 요청을 보내는 CI 자동화를 구축할 수 있다.

"""
스키마 변경 시 영향받는 소비자 팀에 자동 알림을 보내는 스크립트.

git diff로 변경된 스키마 파일을 감지하고,
ownership.yaml에서 소비자 팀을 조회하여 Slack 알림을 보낸다.
"""
import yaml
import subprocess
from pathlib import Path


def get_changed_schemas() -> list[str]:
    """git diff에서 변경된 스키마 파일 목록을 반환한다."""
    result = subprocess.run(
        ["git", "diff", "--name-only", "origin/main", "--", "schemas/"],
        capture_output=True, text=True,
    )
    return [
        line.strip() for line in result.stdout.splitlines()
        if line.strip().endswith(".json")
    ]


def load_ownership() -> dict:
    with open("schemas/_meta/ownership.yaml") as f:
        return yaml.safe_load(f)


def find_affected_consumers(
    changed_files: list[str],
    ownership: dict,
) -> dict[str, list[dict]]:
    """변경된 스키마의 소비자 팀 목록을 반환한다."""
    affected: dict[str, list[dict]] = {}

    for filepath in changed_files:
        # schemas/order/order.created.v2.json -> order.created
        parts = Path(filepath).stem.rsplit(".v", 1)
        if len(parts) != 2:
            continue
        event_type = parts[0].replace("/", ".")
        # domain/event_name -> domain.event_name 형태로 변환
        path_parts = Path(filepath).parts
        if len(path_parts) >= 3:
            event_type = f"{path_parts[1]}.{parts[0].split('.')[-1] if '.' in parts[0] else parts[0]}"

        event_info = ownership.get("events", {}).get(event_type, {})
        consumers = event_info.get("consumers", [])
        if consumers:
            affected[event_type] = consumers

    return affected


def notify_consumers(affected: dict[str, list[dict]]):
    """영향받는 소비자 팀에 Slack 알림을 보낸다."""
    for event_type, consumers in affected.items():
        teams = set(c["team"] for c in consumers)
        channels = set()
        for team_name in teams:
            # 팀별 Slack 채널 조회 (실제로는 ownership에서)
            channels.add(f"#{team_name}-alerts")

        message = (
            f"Schema change detected for `{event_type}`.\n"
            f"Affected consumer teams: {', '.join(teams)}\n"
            f"Please review the schema change PR for backward compatibility."
        )
        print(f"[NOTIFY] {channels}: {message}")
        # 실제로는 Slack API 호출


if __name__ == "__main__":
    changed = get_changed_schemas()
    if not changed:
        print("No schema changes detected.")
    else:
        ownership = load_ownership()
        affected = find_affected_consumers(changed, ownership)
        if affected:
            notify_consumers(affected)
        else:
            print("Changed schemas have no registered consumers.")

품질 거버넌스: SLO 기반 이벤트 처리 모니터링

이벤트 처리의 품질을 측정하려면 세 가지 SLI(Service Level Indicator)가 필요하다.

  1. 처리 지연(Processing Latency): 이벤트 발행에서 소비 완료까지의 시간
  2. 처리 성공률(Success Rate): 이벤트 처리 성공 비율
  3. DLQ 체류 시간(DLQ Dwell Time): DLQ에 들어간 이벤트가 해결되기까지의 시간
-- 이벤트 처리 SLO 대시보드 쿼리

-- 1. 소비자별 처리 지연 SLO 달성률
-- SLO: 99%의 이벤트가 정의된 시간 내에 처리되어야 함
SELECT
    consumer_service,
    event_type,
    COUNT(*) AS total_events,
    PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY processing_latency_ms) AS p50_ms,
    PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY processing_latency_ms) AS p99_ms,
    -- ownership.yaml에 정의된 SLO와 비교
    ROUND(
        100.0 * SUM(
            CASE WHEN processing_latency_ms <= slo_target_ms THEN 1 ELSE 0 END
        ) / COUNT(*),
        2
    ) AS slo_achievement_pct
FROM event_processing_log epl
JOIN event_slo_targets est
    ON epl.consumer_service = est.consumer_service
    AND epl.event_type = est.event_type
WHERE epl.processed_at >= NOW() - INTERVAL '24 hours'
GROUP BY consumer_service, event_type
ORDER BY slo_achievement_pct ASC;

-- 2. DLQ 현황: 팀별 미처리 이벤트
SELECT
    owner_team,
    event_type,
    COUNT(*) AS dlq_count,
    MIN(entered_dlq_at) AS oldest_entry,
    EXTRACT(EPOCH FROM (NOW() - MIN(entered_dlq_at))) / 3600 AS oldest_hours,
    -- 24시간 이상 된 건수 (P1 알림 대상)
    SUM(CASE WHEN entered_dlq_at < NOW() - INTERVAL '24 hours' THEN 1 ELSE 0 END)
        AS stale_count
FROM dead_letter_queue dlq
JOIN event_ownership eo ON dlq.event_type = eo.event_type
WHERE dlq.resolved_at IS NULL
GROUP BY owner_team, event_type
ORDER BY oldest_hours DESC;

-- 3. 주간 이벤트 처리 트렌드
SELECT
    date_trunc('day', processed_at) AS day,
    event_type,
    COUNT(*) AS total,
    SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS successes,
    SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS failures,
    SUM(CASE WHEN status = 'dlq' THEN 1 ELSE 0 END) AS dlq_entries,
    ROUND(
        100.0 * SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) / COUNT(*),
        2
    ) AS success_rate_pct
FROM event_processing_log
WHERE processed_at >= NOW() - INTERVAL '7 days'
GROUP BY 1, 2
ORDER BY 1 DESC, success_rate_pct ASC;

비용 거버넌스: 이벤트 인프라 비용 가시화

Kafka 클러스터, 이벤트 처리 컴퓨팅, 스토리지 비용은 이벤트 볼륨에 비례해 증가한다. 팀별로 비용을 태깅하고 가시화하지 않으면, 비용 최적화의 시작점을 찾을 수 없다.

"""
이벤트 인프라 비용 태깅 및 리포트 생성기.

각 팀이 생산/소비하는 이벤트 볼륨과 해당 인프라 비용을
매핑하여 FinOps 대시보드에 제공한다.
"""
from dataclasses import dataclass
from typing import Optional


@dataclass
class EventCostEntry:
    team: str
    event_type: str
    role: str               # "producer" or "consumer"
    daily_volume: int
    avg_payload_bytes: int
    estimated_daily_cost_usd: float


@dataclass
class CostBreakdown:
    # Kafka 비용 구성 요소
    broker_storage_usd: float    # 메시지 보관 스토리지
    broker_network_usd: float    # 네트워크 전송
    consumer_compute_usd: float  # 소비자 컴퓨팅 리소스
    schema_registry_usd: float   # 스키마 레지스트리 운영

    @property
    def total_usd(self) -> float:
        return (
            self.broker_storage_usd
            + self.broker_network_usd
            + self.consumer_compute_usd
            + self.schema_registry_usd
        )


class EventCostCalculator:
    """이벤트 기반 인프라 비용 산출기.

    단위 비용 상수를 기반으로 팀별/이벤트별 비용을 추정한다.
    실제 클라우드 비용과 정기적으로 교정(calibration)해야 한다.
    """

    # 단위 비용 상수 (AWS MSK 기준 추정)
    STORAGE_PER_GB_DAY_USD = 0.10
    NETWORK_PER_GB_USD = 0.05
    CONSUMER_PER_MILLION_EVENTS_USD = 0.50
    RETENTION_DAYS = 7

    def calculate_team_cost(
        self,
        entries: list[EventCostEntry],
    ) -> dict[str, CostBreakdown]:
        """팀별 일간 비용을 산출한다."""
        team_costs: dict[str, CostBreakdown] = {}

        for entry in entries:
            daily_data_gb = (
                entry.daily_volume * entry.avg_payload_bytes
            ) / (1024 ** 3)

            cost = CostBreakdown(
                broker_storage_usd=daily_data_gb * self.STORAGE_PER_GB_DAY_USD * self.RETENTION_DAYS,
                broker_network_usd=daily_data_gb * self.NETWORK_PER_GB_USD,
                consumer_compute_usd=(
                    entry.daily_volume / 1_000_000
                ) * self.CONSUMER_PER_MILLION_EVENTS_USD if entry.role == "consumer" else 0,
                schema_registry_usd=0.01,  # 고정 분할
            )

            if entry.team in team_costs:
                existing = team_costs[entry.team]
                team_costs[entry.team] = CostBreakdown(
                    broker_storage_usd=existing.broker_storage_usd + cost.broker_storage_usd,
                    broker_network_usd=existing.broker_network_usd + cost.broker_network_usd,
                    consumer_compute_usd=existing.consumer_compute_usd + cost.consumer_compute_usd,
                    schema_registry_usd=existing.schema_registry_usd + cost.schema_registry_usd,
                )
            else:
                team_costs[entry.team] = cost

        return team_costs

    def generate_report(self, team_costs: dict[str, CostBreakdown]) -> str:
        """팀별 비용 리포트를 텍스트로 생성한다."""
        lines = ["=== Event Infrastructure Daily Cost Report ===\n"]

        total = 0.0
        for team, cost in sorted(team_costs.items(), key=lambda x: x[1].total_usd, reverse=True):
            lines.append(f"Team: {team}")
            lines.append(f"  Storage:  ${cost.broker_storage_usd:.2f}")
            lines.append(f"  Network:  ${cost.broker_network_usd:.2f}")
            lines.append(f"  Compute:  ${cost.consumer_compute_usd:.2f}")
            lines.append(f"  Registry: ${cost.schema_registry_usd:.2f}")
            lines.append(f"  TOTAL:    ${cost.total_usd:.2f}\n")
            total += cost.total_usd

        lines.append(f"=== Grand Total: ${total:.2f}/day (${total*30:.2f}/month est.) ===")
        return "\n".join(lines)

이벤트 카탈로그: 디스커버리 문제 해결

"우리 시스템에 어떤 이벤트가 있지?" "이 이벤트를 누가 발행하고 누가 소비하지?"

이벤트 수가 50개를 넘으면 이 질문에 즉시 답하기 어렵다. 이벤트 카탈로그는 모든 이벤트의 메타데이터를 한곳에서 검색 가능하게 만든다.

# event-catalog.yaml
# 이벤트 카탈로그: 모든 이벤트의 메타데이터 중앙 관리

catalog:
  - event_type: order.created
    domain: order
    description: '고객이 새 주문을 생성했을 때 발행된다.'
    schema_path: schemas/order/order.created.v2.json
    current_version: 2
    topic: order-events
    partition_key: order_id
    avg_daily_volume: 150000
    avg_payload_bytes: 1200
    retention_days: 7
    producer:
      service: order-service
      team: order-team
    consumers:
      - service: payment-service
        team: payment-team
      - service: inventory-service
        team: inventory-team
      - service: analytics-pipeline
        team: analytics-team
    tags: ['critical', 'transactional']
    created_at: '2024-06-15'
    last_schema_change: '2025-11-20'

  - event_type: order.cancelled
    domain: order
    description: '주문이 취소되었을 때 발행된다. 사유 필드 포함.'
    schema_path: schemas/order/order.cancelled.v1.json
    current_version: 1
    topic: order-events
    partition_key: order_id
    avg_daily_volume: 8000
    avg_payload_bytes: 800
    retention_days: 7
    producer:
      service: order-service
      team: order-team
    consumers:
      - service: payment-service
        team: payment-team
      - service: inventory-service
        team: inventory-team
    tags: ['critical', 'transactional']
    created_at: '2024-06-15'
    last_schema_change: '2024-06-15'

  - event_type: user.profile.updated
    domain: user
    description: '사용자 프로필 정보가 변경되었을 때 발행된다.'
    schema_path: schemas/user/user.profile.updated.v1.json
    current_version: 1
    topic: user-events
    partition_key: user_id
    avg_daily_volume: 25000
    avg_payload_bytes: 500
    retention_days: 3
    producer:
      service: user-service
      team: user-team
    consumers:
      - service: recommendation-service
        team: ml-team
      - service: notification-service
        team: notification-team
    tags: ['non-critical']
    created_at: '2024-08-01'
    last_schema_change: '2025-03-10'

거버넌스 도입 로드맵

모든 거버넌스를 한 번에 도입하면 팀의 반발만 산다. 점진적으로, 가장 고통이 큰 곳부터 시작한다.

Phase 1: 가시성 확보 (2-4주)
├── 이벤트 카탈로그 초안 작성 (기존 이벤트 목록화)
├── DLQ 건수 대시보드 구축
├── 이벤트 처리 성공/실패 메트릭 수집 시작
└── 결과: "현재 상태"를 숫자로 볼 수 있게 됨

Phase 2: 소유권 정립 (2-4주)
├── ownership.yaml 작성 (생산자/소비자 매핑)
├── DLQ 처리 담당 팀 지정
├── 스키마 변경 시 소비자 팀 알림 프로세스 수립
└── 결과: "누구 책임인지" 답할 수 있게 됨

Phase 3: 자동화된 규칙 강제 (4-8주)
├── 스키마 호환성 검사 CI 파이프라인 구축
├── 모듈 경계 위반 감지 CI 구축
├── SLO 위반 알림 자동화
└── 결과: 규칙 위반이 배포 전에 차단됨

Phase 4: 비용 최적화 (지속적)
├── 팀별 이벤트 비용 태깅 및 리포트
├── 불필요한 이벤트 소비 정리
├── retention 정책 최적화
└── 결과: 비용이 가시화되고 최적화 시작됨

실전 트러블슈팅

소비자가 모르게 스키마가 변경되어 장애 발생

증상: payment-service가 갑자기 KeyError: 'customer_id' 에러를 대량으로 발생시킨다. order-service 팀이 customer_idbuyer_id로 이름을 바꿨는데, 소비자 팀에 알리지 않았다.

사후 대응: (1) 즉시 order-service를 이전 버전으로 rollback한다. (2) 포스트모템에서 스키마 변경 프로세스의 부재를 근본 원인으로 식별한다.

재발 방지: (1) 스키마 호환성 검사 CI를 도입하여 BACKWARD 호환이 깨지는 변경을 배포 전에 차단한다. (2) ownership.yaml 기반으로 스키마 변경 PR에 소비자 팀을 자동으로 리뷰어에 추가한다. (3) 주요 스키마 변경은 governance_rules.schema_change_notice_days(7일)의 사전 공지 기간을 둔다.

DLQ에 수천 건이 쌓여 있지만 아무도 처리하지 않는다

증상: DLQ 대시보드에 3,000건이 쌓여 있고, 가장 오래된 건은 2주 전이다.

원인: DLQ 처리 담당 팀이 지정되지 않았거나, 알림이 설정되지 않아서 아무도 인지하지 못했다.

대응: (1) ownership.yaml에서 해당 이벤트의 소비자 팀을 식별한다. (2) DLQ 건수 알림을 설정한다: 100건 초과 시 WARNING, 24시간 이상 된 건이 있으면 P1. (3) 주간 DLQ 리뷰 미팅을 팀 루틴에 포함한다. (4) DLQ에서 자동 재처리가 가능한 건(일시적 에러)과 수동 개입이 필요한 건(스키마 불일치, 비즈니스 로직 에러)을 분류하는 triage 자동화를 구현한다.

이벤트 볼륨이 급증하여 Kafka 클러스터 비용 폭발

증상: 월 Kafka 비용이 $3,000에서 $12,000으로 급증했다.

원인 분석: 비용 태깅이 없어서 어떤 팀의 어떤 이벤트가 원인인지 바로 파악할 수 없다. 조사 결과, analytics 팀이 새로 추가한 page.viewed 이벤트가 일 5,000만 건으로, payload가 5KB에 달했다.

대응: (1) 이벤트별 볼륨 모니터링을 구축한다. (2) 새 이벤트 등록 시 예상 볼륨과 비용 영향도를 카탈로그에 기록하도록 프로세스를 수립한다. (3) 해당 이벤트의 payload를 최적화(불필요한 필드 제거, 압축 적용)한다. (4) 실시간성이 불필요한 analytics 이벤트는 배치 처리로 전환하여 Kafka 부하를 줄인다.

참고 자료

퀴즈
  1. EDA에서 거버넌스가 부재하면 가장 먼저 발생하는 문제는? 정답: ||이벤트 스키마 변경이 소비자에게 예고 없이 전파되어 서비스 장애가 발생하고, 해당 이벤트의 생산자/소비자 책임 경계가 불분명하여 장애 해결이 지연된다.||

  2. BACKWARD 호환성이 보장되면 어떤 배포 순서가 안전한가? 정답: ||소비자를 먼저 배포해도 안전하다. 새 스키마로 생산된 메시지에 추가된 optional 필드를 기존 소비자가 무시할 수 있고, 기존 required 필드가 유지되므로 기존 소비자 코드가 깨지지 않는다.||

  3. 이벤트 소유권 매핑에서 소비자의 SLO를 정의하는 이유는? 정답: ||소비자마다 처리 시간 요구가 다르다. 결제 서비스는 500ms 내 처리가 필요하지만 analytics 파이프라인은 60초도 허용된다. SLO를 정의해야 위반 시 적절한 팀에 적절한 우선순위로 알림을 보낼 수 있다.||

  4. DLQ 거버넌스의 핵심 세 가지 요소는? 정답: ||(1) DLQ 처리 담당 팀 지정(ownership), (2) DLQ 건수와 체류 시간에 대한 알림 설정(모니터링), (3) 자동 재처리 가능 건과 수동 개입 필요 건의 분류 자동화(triage).||

  5. 스키마 호환성 검사를 CI에서 강제하는 것의 핵심 이점은? 정답: ||호환되지 않는 스키마 변경이 프로덕션에 배포되기 전에 차단된다. 스키마 변경으로 인한 서비스 장애를 원천 방지하고, 변경 영향도를 배포 전에 확인할 수 있다.||

  6. 이벤트 비용 거버넌스를 도입하는 첫 단계는? 정답: ||팀별 이벤트 볼륨과 비용을 태깅하고 대시보드로 가시화하는 것. 어떤 팀의 어떤 이벤트가 비용을 소모하는지 보이지 않으면 최적화 대상을 식별할 수 없다.||

  7. 거버넌스 도입 로드맵에서 "가시성 확보"를 가장 먼저 하는 이유는? 정답: ||현재 상태를 숫자로 볼 수 없으면, 소유권 정립이나 규칙 강제의 필요성을 팀에 설득할 근거가 없다. "DLQ에 3,000건이 쌓여 있고 2주째 방치"라는 데이터가 있어야 거버넌스 도입에 대한 팀의 동의를 얻을 수 있다.||

  8. 새 이벤트를 카탈로그에 등록할 때 반드시 포함해야 하는 정보 다섯 가지는? 정답: ||(1) 이벤트 타입과 설명, (2) 생산자 서비스/팀, (3) 스키마 경로와 버전, (4) 예상 일일 볼륨과 payload 크기, (5) 소비자 목록과 각 소비자의 처리 SLO.||