Skip to content

Split View: Event-Driven + Saga 아키텍처 트레이드오프

✨ Learn with Quiz
|

Event-Driven + Saga 아키텍처 트레이드오프

Event-Driven + Saga 아키텍처 트레이드오프

분산 트랜잭션이 필요해지는 순간

모놀리스에서는 하나의 DB 트랜잭션 안에서 주문 생성, 재고 차감, 결제 처리를 모두 할 수 있다. ACID가 보장되니 중간에 실패하면 rollback 한 번이면 끝이다.

하지만 서비스가 분리되는 순간, 이 "당연한 것"이 불가능해진다. 주문 서비스, 재고 서비스, 결제 서비스가 각각의 DB를 가지면, 하나의 트랜잭션으로 묶을 수 없다. 2PC(Two-Phase Commit)는 이론적으로 가능하지만, 네트워크 파티션에 취약하고 coordinator가 단일 장애점이 되며 잠금 시간이 길어져 처리량이 급감한다.

Saga 패턴은 Chris Richardson이 체계화한 대안이다(microservices.io/patterns/data/saga). 핵심 아이디어는 단순하다.

분산 트랜잭션을 여러 개의 로컬 트랜잭션으로 분해하고, 중간에 실패하면 이미 완료된 트랜잭션을 취소하는 **보상 트랜잭션(compensating transaction)**을 실행한다.

이 글은 Saga와 Event-Driven Architecture(EDA)를 결합할 때 발생하는 실제 트레이드오프를 다룬다. "분리하면 확장된다"는 구호 이면에 있는 복잡도, 비용, 조직 요건을 정직하게 분석한다.

Saga의 두 가지 조율 방식

Choreography: 이벤트 기반 자율 조율

각 서비스가 로컬 트랜잭션을 완료한 뒤 도메인 이벤트를 발행하면, 다음 서비스가 그 이벤트를 구독해서 자신의 작업을 수행한다. 중앙 조율자가 없다.

주문서비스                재고서비스                결제서비스
    |                        |                        |
    |-- OrderCreated ------->|                        |
    |                        |-- InventoryReserved --->|
    |                        |                        |-- PaymentProcessed
    |<----------- OrderConfirmed ----------------------|

실패 시에는 역방향으로 보상 이벤트가 흐른다.

결제서비스: PaymentFailed
  --> 재고서비스: InventoryReleased
    --> 주문서비스: OrderCancelled

장점: 서비스 간 직접 의존성이 없다. 새 서비스를 추가할 때 기존 서비스를 수정할 필요가 없다. 단점: 전체 흐름을 코드에서 한 눈에 볼 수 없다. 이벤트 체인이 복잡해지면 어디서 실패했는지 추적이 어렵다. 순환 의존성이 발생할 위험이 있다.

Orchestration: 중앙 조율자 방식

Saga Orchestrator가 전체 흐름을 관리한다. 각 단계의 성공/실패에 따라 다음 단계를 결정하고, 실패 시 보상 순서를 제어한다.

SagaOrchestrator
    |-- (1) reserve_credit --> 결제서비스
    |-- (2) reserve_inventory --> 재고서비스
    |-- (3) create_shipment --> 배송서비스
    |
    |  (3) 실패 시:
    |-- compensate (2) release_inventory
    |-- compensate (1) refund_credit

장점: 전체 흐름이 orchestrator 코드에 명시적으로 존재한다. 디버깅과 모니터링이 쉽다. 단점: orchestrator가 단일 장애점이 될 수 있다. 서비스가 많아지면 orchestrator의 복잡도가 커진다.

Saga Orchestrator 구현

다음은 프로덕션에서 사용할 수 있는 Saga Orchestrator의 핵심 구현이다.

"""
Saga Orchestrator: 분산 트랜잭션의 실행과 보상을 관리한다.

각 step은 execute()와 compensate()를 구현하며,
orchestrator는 순차 실행 후 실패 시 역순으로 보상한다.
"""
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import logging
import uuid
import time

logger = logging.getLogger(__name__)


class SagaStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    COMPENSATED = "compensated"
    FAILED = "failed"  # 보상마저 실패한 경우


class StepStatus(Enum):
    PENDING = "pending"
    EXECUTED = "executed"
    COMPENSATED = "compensated"
    COMPENSATION_FAILED = "compensation_failed"


@dataclass
class SagaStep:
    """Saga의 개별 단계.

    execute()와 compensate()는 서브클래스에서 구현한다.
    idempotency_key를 통해 재시도 안전성을 보장한다.
    """
    name: str
    status: StepStatus = StepStatus.PENDING
    idempotency_key: str = ""
    executed_at: Optional[float] = None
    compensated_at: Optional[float] = None
    error: Optional[str] = None

    def __post_init__(self):
        if not self.idempotency_key:
            self.idempotency_key = str(uuid.uuid4())

    def execute(self, context: dict) -> dict:
        raise NotImplementedError

    def compensate(self, context: dict) -> None:
        raise NotImplementedError


@dataclass
class SagaExecution:
    """Saga 실행 기록."""
    saga_id: str
    status: SagaStatus = SagaStatus.PENDING
    steps: list[SagaStep] = field(default_factory=list)
    context: dict = field(default_factory=dict)
    started_at: Optional[float] = None
    completed_at: Optional[float] = None
    error: Optional[str] = None


class SagaOrchestrator:
    """Saga 실행을 관리하는 orchestrator.

    실행 흐름:
    1. 각 step을 순차적으로 execute()
    2. 어떤 step이 실패하면, 이미 실행된 step들을 역순으로 compensate()
    3. 보상마저 실패하면 FAILED 상태로 기록하고 알림 발송
    """

    def __init__(self, steps: list[SagaStep], on_failure: Optional[callable] = None):
        self.steps = steps
        self.on_failure = on_failure  # 보상 실패 시 호출할 콜백

    def run(self, initial_context: dict | None = None) -> SagaExecution:
        execution = SagaExecution(
            saga_id=str(uuid.uuid4()),
            steps=self.steps,
            context=initial_context or {},
            started_at=time.time(),
        )
        execution.status = SagaStatus.RUNNING
        completed_steps: list[SagaStep] = []

        logger.info(f"Saga {execution.saga_id} started with {len(self.steps)} steps")

        try:
            for step in self.steps:
                logger.info(f"Saga {execution.saga_id}: executing '{step.name}'")
                try:
                    result = step.execute(execution.context)
                    step.status = StepStatus.EXECUTED
                    step.executed_at = time.time()

                    # step 결과를 context에 병합
                    if isinstance(result, dict):
                        execution.context.update(result)

                    completed_steps.append(step)

                except Exception as e:
                    step.error = str(e)
                    logger.error(
                        f"Saga {execution.saga_id}: step '{step.name}' failed: {e}"
                    )
                    # 보상 시작
                    self._compensate(execution, completed_steps)
                    return execution

            execution.status = SagaStatus.COMPLETED
            execution.completed_at = time.time()
            logger.info(f"Saga {execution.saga_id} completed successfully")

        except Exception as e:
            execution.status = SagaStatus.FAILED
            execution.error = str(e)
            logger.critical(f"Saga {execution.saga_id} unexpected error: {e}")

        return execution

    def _compensate(self, execution: SagaExecution, completed_steps: list[SagaStep]):
        """완료된 step들을 역순으로 보상한다."""
        execution.status = SagaStatus.COMPENSATING
        compensation_failures = []

        for step in reversed(completed_steps):
            logger.info(
                f"Saga {execution.saga_id}: compensating '{step.name}'"
            )
            try:
                step.compensate(execution.context)
                step.status = StepStatus.COMPENSATED
                step.compensated_at = time.time()
            except Exception as e:
                step.status = StepStatus.COMPENSATION_FAILED
                step.error = f"Compensation failed: {e}"
                compensation_failures.append((step.name, str(e)))
                logger.critical(
                    f"Saga {execution.saga_id}: compensation of '{step.name}' FAILED: {e}"
                )

        if compensation_failures:
            execution.status = SagaStatus.FAILED
            execution.error = f"Compensation failures: {compensation_failures}"
            if self.on_failure:
                self.on_failure(execution, compensation_failures)
        else:
            execution.status = SagaStatus.COMPENSATED
            execution.completed_at = time.time()

실제 step 구현 예시를 보면 다음과 같다.

class ReserveCreditStep(SagaStep):
    """결제 서비스에 크레딧을 예약한다."""

    def __init__(self, payment_client):
        super().__init__(name="reserve_credit")
        self.payment_client = payment_client

    def execute(self, context: dict) -> dict:
        result = self.payment_client.reserve(
            customer_id=context["customer_id"],
            amount=context["order_total"],
            idempotency_key=self.idempotency_key,
        )
        return {"reservation_id": result.reservation_id}

    def compensate(self, context: dict) -> None:
        self.payment_client.release(
            reservation_id=context["reservation_id"],
            idempotency_key=f"{self.idempotency_key}_compensate",
        )


class ReserveInventoryStep(SagaStep):
    """재고 서비스에 상품을 예약한다."""

    def __init__(self, inventory_client):
        super().__init__(name="reserve_inventory")
        self.inventory_client = inventory_client

    def execute(self, context: dict) -> dict:
        result = self.inventory_client.reserve(
            items=context["order_items"],
            idempotency_key=self.idempotency_key,
        )
        return {"inventory_reservation_id": result.reservation_id}

    def compensate(self, context: dict) -> None:
        self.inventory_client.release(
            reservation_id=context["inventory_reservation_id"],
            idempotency_key=f"{self.idempotency_key}_compensate",
        )

이벤트 계약과 스키마 관리

Event-Driven Architecture에서 이벤트는 서비스 간의 **계약(contract)**이다. 이 계약을 관리하지 않으면 한 서비스의 이벤트 형식 변경이 소비자 서비스를 예고 없이 깨뜨린다.

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "$id": "https://events.example.com/order/created/v2",
  "title": "OrderCreated",
  "description": "주문이 생성되었을 때 발행되는 이벤트",
  "type": "object",
  "required": ["event_id", "event_type", "event_version", "timestamp", "data"],
  "properties": {
    "event_id": {
      "type": "string",
      "format": "uuid",
      "description": "이벤트 고유 ID (idempotency key로 사용)"
    },
    "event_type": {
      "type": "string",
      "const": "order.created"
    },
    "event_version": {
      "type": "integer",
      "const": 2,
      "description": "스키마 버전. 호환성 판단에 사용"
    },
    "timestamp": {
      "type": "string",
      "format": "date-time"
    },
    "data": {
      "type": "object",
      "required": ["order_id", "customer_id", "items", "total_amount"],
      "properties": {
        "order_id": { "type": "string" },
        "customer_id": { "type": "string" },
        "items": {
          "type": "array",
          "items": {
            "type": "object",
            "required": ["sku", "quantity", "unit_price"],
            "properties": {
              "sku": { "type": "string" },
              "quantity": { "type": "integer", "minimum": 1 },
              "unit_price": { "type": "number", "minimum": 0 }
            }
          }
        },
        "total_amount": { "type": "number", "minimum": 0 },
        "currency": { "type": "string", "default": "KRW" },
        "shipping_address": {
          "type": "object",
          "description": "v2에서 추가된 필드. v1 소비자는 이 필드를 무시한다."
        }
      }
    }
  }
}

스키마 호환성 정책은 Confluent Schema Registry의 호환성 모드를 참고하면 좋다.

호환성 모드허용되는 변경사용 시점
BACKWARD필드 추가(optional), 필드 삭제소비자가 먼저 배포되는 경우
FORWARD필드 추가, 기본값 있는 필드 삭제생산자가 먼저 배포되는 경우
FULL위 두 가지를 모두 만족생산자/소비자 배포 순서를 보장할 수 없을 때
NONE모든 변경 허용 (위험)개발 환경에서만

Outbox 패턴: 이벤트 발행의 원자성 보장

가장 흔한 실수 중 하나는 "DB에 저장하고 이벤트를 발행하는" 두 작업이 원자적이지 않은 것이다.

# 위험한 패턴
1. DB에 주문 저장     -- 성공
2. Kafka에 이벤트 발행 -- 실패 (네트워크 문제)
=> 주문은 생성됐지만 이벤트가 없어서 재고도 안 빠지고 결제도 안 됨

Outbox 패턴은 이벤트를 별도의 outbox 테이블에 함께 저장하고, 별도의 프로세스가 outbox를 polling하여 메시지 브로커로 전달한다.

-- outbox 테이블 스키마
CREATE TABLE outbox (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type  VARCHAR(100) NOT NULL,    -- 'Order', 'Payment' 등
    aggregate_id    VARCHAR(100) NOT NULL,    -- 주문 ID, 결제 ID
    event_type      VARCHAR(200) NOT NULL,    -- 'order.created'
    event_version   INT NOT NULL DEFAULT 1,
    payload         JSONB NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMPTZ,              -- NULL이면 아직 미발행
    retry_count     INT NOT NULL DEFAULT 0,
    max_retries     INT NOT NULL DEFAULT 5
);

-- 미발행 이벤트 조회 인덱스
CREATE INDEX idx_outbox_unpublished
    ON outbox (created_at)
    WHERE published_at IS NULL AND retry_count < max_retries;

-- 주문 생성과 이벤트를 하나의 트랜잭션으로 처리
BEGIN;
    INSERT INTO orders (id, customer_id, total, status)
    VALUES ('ord-123', 'cust-456', 50000, 'created');

    INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
    VALUES (
        'Order',
        'ord-123',
        'order.created',
        '{"order_id": "ord-123", "customer_id": "cust-456", "total": 50000}'::jsonb
    );
COMMIT;

Outbox publisher는 별도 프로세스로 동작하며, 미발행 이벤트를 주기적으로 polling한다.

"""
Outbox Publisher: outbox 테이블의 미발행 이벤트를 broker로 전달한다.

at-least-once 전달을 보장하며, 소비자 측에서
idempotency를 처리해야 한다.
"""
import time
import json
import logging

logger = logging.getLogger(__name__)


class OutboxPublisher:
    def __init__(self, db_pool, kafka_producer, poll_interval_seconds: float = 1.0):
        self.db_pool = db_pool
        self.producer = kafka_producer
        self.poll_interval = poll_interval_seconds

    def run(self):
        """메인 루프. 미발행 이벤트를 계속 polling하여 발행한다."""
        logger.info("OutboxPublisher started")
        while True:
            try:
                published_count = self._poll_and_publish()
                if published_count == 0:
                    time.sleep(self.poll_interval)
            except Exception as e:
                logger.error(f"OutboxPublisher error: {e}")
                time.sleep(self.poll_interval * 5)

    def _poll_and_publish(self, batch_size: int = 100) -> int:
        """미발행 이벤트 배치를 조회하고 발행한다."""
        with self.db_pool.connection() as conn:
            with conn.cursor() as cur:
                cur.execute("""
                    SELECT id, aggregate_type, aggregate_id,
                           event_type, event_version, payload
                    FROM outbox
                    WHERE published_at IS NULL
                      AND retry_count < max_retries
                    ORDER BY created_at
                    LIMIT %s
                    FOR UPDATE SKIP LOCKED
                """, (batch_size,))

                rows = cur.fetchall()
                if not rows:
                    return 0

                published_ids = []
                failed_ids = []

                for row in rows:
                    event_id, agg_type, agg_id, evt_type, evt_ver, payload = row
                    topic = f"{agg_type.lower()}-events"

                    try:
                        self.producer.send(
                            topic=topic,
                            key=agg_id.encode(),
                            value=json.dumps({
                                "event_id": str(event_id),
                                "event_type": evt_type,
                                "event_version": evt_ver,
                                "data": payload,
                            }).encode(),
                        )
                        published_ids.append(event_id)
                    except Exception as e:
                        logger.warning(f"Failed to publish {event_id}: {e}")
                        failed_ids.append(event_id)

                # 성공한 이벤트 표시
                if published_ids:
                    cur.execute("""
                        UPDATE outbox SET published_at = NOW()
                        WHERE id = ANY(%s)
                    """, (published_ids,))

                # 실패한 이벤트 재시도 카운트 증가
                if failed_ids:
                    cur.execute("""
                        UPDATE outbox SET retry_count = retry_count + 1
                        WHERE id = ANY(%s)
                    """, (failed_ids,))

                conn.commit()
                return len(published_ids)

Idempotency: 중복 이벤트 처리

분산 시스템에서 이벤트는 "적어도 한 번(at-least-once)" 전달되므로, 동일 이벤트가 여러 번 도착할 수 있다. 소비자는 반드시 idempotent해야 한다.

class IdempotentEventHandler:
    """중복 이벤트를 안전하게 처리하는 핸들러 래퍼.

    event_id를 키로 처리 이력을 기록하고,
    이미 처리된 이벤트는 무시한다.
    """

    def __init__(self, db_pool, handler_fn):
        self.db_pool = db_pool
        self.handler_fn = handler_fn

    def handle(self, event: dict) -> bool:
        event_id = event["event_id"]

        with self.db_pool.connection() as conn:
            with conn.cursor() as cur:
                # 이미 처리된 이벤트인지 확인
                cur.execute(
                    "SELECT 1 FROM processed_events WHERE event_id = %s",
                    (event_id,),
                )
                if cur.fetchone():
                    logger.info(f"Duplicate event {event_id}, skipping")
                    return True

                try:
                    # 비즈니스 로직 실행과 이벤트 처리 기록을 하나의 트랜잭션으로
                    self.handler_fn(event, cur)

                    cur.execute(
                        "INSERT INTO processed_events (event_id, processed_at) VALUES (%s, NOW())",
                        (event_id,),
                    )
                    conn.commit()
                    return True

                except Exception as e:
                    conn.rollback()
                    logger.error(f"Failed to process event {event_id}: {e}")
                    return False

Saga 운영 모니터링

Saga가 프로덕션에서 돌아가면, 각 saga 인스턴스의 상태를 실시간으로 추적해야 한다.

-- Saga 상태 대시보드 쿼리

-- 1. 시간대별 saga 상태 분포
SELECT
    date_trunc('hour', started_at) AS hour,
    status,
    COUNT(*) AS count,
    AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) AS avg_duration_seconds
FROM saga_executions
WHERE started_at >= NOW() - INTERVAL '24 hours'
GROUP BY 1, 2
ORDER BY 1 DESC, 2;

-- 2. 보상 실패가 발생한 saga (즉시 대응 필요)
SELECT
    saga_id,
    started_at,
    error,
    jsonb_agg(
        jsonb_build_object(
            'step', step_name,
            'status', step_status,
            'error', step_error
        ) ORDER BY step_order
    ) AS step_details
FROM saga_executions e
JOIN saga_steps s ON e.saga_id = s.saga_id
WHERE e.status = 'FAILED'
  AND e.started_at >= NOW() - INTERVAL '7 days'
GROUP BY e.saga_id, e.started_at, e.error
ORDER BY e.started_at DESC;

-- 3. step별 실패율 (병목 식별)
SELECT
    step_name,
    COUNT(*) AS total_executions,
    SUM(CASE WHEN step_status = 'executed' THEN 1 ELSE 0 END) AS successes,
    SUM(CASE WHEN step_status != 'executed' THEN 1 ELSE 0 END) AS failures,
    ROUND(
        100.0 * SUM(CASE WHEN step_status != 'executed' THEN 1 ELSE 0 END) / COUNT(*),
        2
    ) AS failure_rate_pct
FROM saga_steps
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY step_name
ORDER BY failure_rate_pct DESC;

도입 여부 판단: 모든 곳에 Saga가 필요하지는 않다

Saga + EDA는 강력하지만 복잡하다. 도입 전에 다음을 솔직하게 답해봐야 한다.

Saga/EDA가 적합한 경우:

  • 서비스 간 데이터 일관성이 필수이지만, 강한 일관성(ACID)이 아닌 최종적 일관성(eventual consistency)으로 충분한 경우
  • 각 서비스의 독립적 배포와 확장이 비즈니스 요건인 경우
  • 팀이 서비스 소유권 경계에 따라 분리되어 있고, 각 팀이 자체 배포 사이클을 가져야 하는 경우
  • 트랜잭션 볼륨이 높아서 2PC의 잠금 비용을 감당할 수 없는 경우

모놀리스 트랜잭션이 나은 경우:

  • 서비스 경계가 명확하지 않아서 자주 변경되는 초기 제품
  • 팀 규모가 작아서(5명 이하) 분산 시스템 운영 부담이 개발 속도 이점을 상회하는 경우
  • 데이터 일관성 요건이 즉시적(real-time)이어서 eventual consistency가 비즈니스적으로 불가능한 경우(예: 은행 계좌 간 이체)
  • 메시지 브로커(Kafka, RabbitMQ) 운영 역량이 팀에 없는 경우

실전 트러블슈팅

보상 트랜잭션이 실패하면?

이것이 Saga의 가장 어려운 문제다. 보상이 실패하면 시스템이 "반쪽 상태"에 놓인다. 주문은 취소됐는데 결제는 환불되지 않은 상태.

대응 방안:

  1. Dead Letter Queue(DLQ): 보상 실패 이벤트를 DLQ로 보내고, 운영팀이 수동으로 처리한다.
  2. 재시도 정책: exponential backoff로 3-5회 재시도한다. 일시적 네트워크 문제는 이것으로 해결된다.
  3. 수동 복구 런북: DLQ에서도 해결되지 않는 경우를 위한 step-by-step 수동 복구 절차를 문서화한다.
  4. 알림: 보상 실패는 P1 알림으로 설정한다. "나중에 처리하지" 하면 데이터 불일치가 누적된다.

이벤트 순서가 보장되지 않을 때

Kafka는 파티션 내에서는 순서를 보장하지만, 파티션 간에는 보장하지 않는다. 같은 주문의 이벤트가 다른 파티션으로 가면 순서가 뒤집힐 수 있다.

대응 방안:

  1. 동일 aggregate의 이벤트는 같은 파티션 키를 사용한다(예: order_id).
  2. 소비자에서 이벤트의 시퀀스 번호를 확인하고, 순서가 맞지 않으면 잠시 대기하거나 재처리 큐로 보낸다.
  3. 이벤트에 causation_id(원인 이벤트 ID)를 포함시켜 인과 관계를 추적한다.

Dead Letter Queue가 쌓이기만 한다

증상: DLQ에 수천 건의 이벤트가 쌓여 있는데 아무도 처리하지 않는다.

원인: DLQ 처리 프로세스가 없거나, 처리 책임이 명확하지 않다.

대응: (1) DLQ 건수에 대한 알림을 설정한다(100건 초과 시 경고, 1000건 초과 시 P1). (2) 주간 DLQ 리뷰를 팀 루틴에 포함한다. (3) 자동 재처리가 가능한 건과 수동 개입이 필요한 건을 분류하는 triage 로직을 구현한다.

참고 자료

퀴즈
  1. Saga 패턴이 2PC(Two-Phase Commit) 대신 사용되는 주된 이유는? 정답: ||2PC는 coordinator가 단일 장애점이 되고, 네트워크 파티션에 취약하며, 잠금 시간이 길어 처리량이 급감한다. Saga는 각 단계를 독립적인 로컬 트랜잭션으로 처리하고, 실패 시 보상 트랜잭션으로 복구하므로 가용성과 처리량이 높다.||

  2. Choreography와 Orchestration의 가장 큰 운영상 차이는? 정답: ||Choreography는 전체 saga 흐름이 여러 서비스에 분산되어 있어 한 눈에 파악하기 어렵고 디버깅이 복잡하다. Orchestration은 전체 흐름이 orchestrator 코드에 명시적으로 존재하여 추적과 디버깅이 쉽지만, orchestrator가 단일 장애점이 될 수 있다.||

  3. Outbox 패턴이 해결하는 핵심 문제는? 정답: ||DB 저장과 이벤트 발행이 원자적이지 않은 문제. DB에는 저장됐지만 이벤트 발행이 실패하면 데이터 불일치가 발생한다. Outbox 패턴은 이벤트를 같은 DB 트랜잭션 안에서 outbox 테이블에 저장하고, 별도 프로세스가 이를 읽어 발행함으로써 원자성을 보장한다.||

  4. 소비자의 idempotency가 필수적인 이유는? 정답: ||분산 시스템에서 이벤트는 at-least-once로 전달되므로 동일 이벤트가 여러 번 도착할 수 있다. idempotent하지 않으면 동일 주문이 두 번 결제되거나 재고가 이중 차감되는 등의 문제가 발생한다.||

  5. Saga의 보상 트랜잭션이 실패했을 때 최소한으로 해야 하는 세 가지는? 정답: ||DLQ로 실패 이벤트를 보관하고, P1 알림을 발송하며, 수동 복구 런북에 따라 운영팀이 개입하여 데이터 일관성을 복구한다.||

  6. Kafka에서 같은 주문의 이벤트 순서를 보장하려면? 정답: ||동일 aggregate(예: order_id)의 이벤트가 같은 파티션으로 가도록 파티션 키를 order_id로 설정한다. Kafka는 파티션 내에서는 순서를 보장하므로, 같은 키의 이벤트는 순서대로 처리된다.||

  7. 이벤트 스키마 호환성 모드 중 FULL이 권장되는 상황은? 정답: ||생산자와 소비자의 배포 순서를 보장할 수 없는 경우. FULL 호환성은 BACKWARD(소비자 먼저)와 FORWARD(생산자 먼저) 모두를 만족하므로 배포 순서에 무관하게 안전하다.||

  8. Saga/EDA 도입을 피해야 하는 대표적인 상황 두 가지는? 정답: ||(1) 서비스 경계가 자주 변경되는 초기 제품 단계에서는 모놀리스가 변경 비용이 낮다. (2) 팀 규모가 작아서 분산 시스템 운영(Kafka, 모니터링, DLQ 처리 등)의 부담이 개발 속도 이점을 상회하는 경우.||

Event-Driven + Saga Architecture Tradeoffs

Event-Driven + Saga Architecture Tradeoffs

The Moment Distributed Transactions Become Necessary

In a monolith, you can handle order creation, inventory deduction, and payment processing all within a single DB transaction. ACID is guaranteed, so if something fails midway, a single rollback handles everything.

But the moment services are separated, this "obvious" capability becomes impossible. When the Order Service, Inventory Service, and Payment Service each have their own databases, you cannot wrap them in a single transaction. 2PC (Two-Phase Commit) is theoretically possible, but it is vulnerable to network partitions, the coordinator becomes a single point of failure, and lock duration increases causing throughput to plummet.

The Saga pattern is an alternative systematized by Chris Richardson (microservices.io/patterns/data/saga). The core idea is simple:

Decompose a distributed transaction into multiple local transactions, and if any intermediate step fails, execute compensating transactions to undo the already-completed transactions.

This article covers the real tradeoffs that arise when combining Saga with Event-Driven Architecture (EDA). It honestly analyzes the complexity, costs, and organizational requirements behind the slogan "separating enables scaling."

Two Coordination Approaches for Saga

Choreography: Event-Based Autonomous Coordination

Each service completes its local transaction and publishes a domain event, then the next service subscribes to that event and performs its own work. There is no central orchestrator.

OrderService              InventoryService          PaymentService
    |                        |                        |
    |-- OrderCreated ------->|                        |
    |                        |-- InventoryReserved --->|
    |                        |                        |-- PaymentProcessed
    |<----------- OrderConfirmed ----------------------|

On failure, compensation events flow in reverse:

PaymentService: PaymentFailed
  --> InventoryService: InventoryReleased
    --> OrderService: OrderCancelled

Advantages: No direct dependencies between services. No need to modify existing services when adding new ones. Disadvantages: The entire flow cannot be seen at a glance in code. When event chains become complex, tracking where failures occurred is difficult. Risk of circular dependencies.

Orchestration: Central Orchestrator Approach

The Saga Orchestrator manages the entire flow. It determines the next step based on success/failure of each stage and controls the compensation order upon failure.

SagaOrchestrator
    |-- (1) reserve_credit --> PaymentService
    |-- (2) reserve_inventory --> InventoryService
    |-- (3) create_shipment --> ShippingService
    |
    |  On (3) failure:
    |-- compensate (2) release_inventory
    |-- compensate (1) refund_credit

Advantages: The entire flow exists explicitly in the orchestrator code. Debugging and monitoring are easy. Disadvantages: The orchestrator can become a single point of failure. As the number of services grows, the orchestrator's complexity increases.

Saga Orchestrator Implementation

The following is a core implementation of a Saga Orchestrator usable in production.

"""
Saga Orchestrator: Manages execution and compensation of distributed transactions.

Each step implements execute() and compensate(),
and the orchestrator executes sequentially, then compensates in reverse order on failure.
"""
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import logging
import uuid
import time

logger = logging.getLogger(__name__)


class SagaStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    COMPENSATED = "compensated"
    FAILED = "failed"  # When compensation itself fails


class StepStatus(Enum):
    PENDING = "pending"
    EXECUTED = "executed"
    COMPENSATED = "compensated"
    COMPENSATION_FAILED = "compensation_failed"


@dataclass
class SagaStep:
    """Individual step of a Saga.

    execute() and compensate() are implemented in subclasses.
    Retry safety is ensured through idempotency_key.
    """
    name: str
    status: StepStatus = StepStatus.PENDING
    idempotency_key: str = ""
    executed_at: Optional[float] = None
    compensated_at: Optional[float] = None
    error: Optional[str] = None

    def __post_init__(self):
        if not self.idempotency_key:
            self.idempotency_key = str(uuid.uuid4())

    def execute(self, context: dict) -> dict:
        raise NotImplementedError

    def compensate(self, context: dict) -> None:
        raise NotImplementedError


@dataclass
class SagaExecution:
    """Saga execution record."""
    saga_id: str
    status: SagaStatus = SagaStatus.PENDING
    steps: list[SagaStep] = field(default_factory=list)
    context: dict = field(default_factory=dict)
    started_at: Optional[float] = None
    completed_at: Optional[float] = None
    error: Optional[str] = None


class SagaOrchestrator:
    """Orchestrator that manages Saga execution.

    Execution flow:
    1. Execute each step sequentially
    2. If any step fails, compensate already-executed steps in reverse order
    3. If compensation also fails, record as FAILED and send alert
    """

    def __init__(self, steps: list[SagaStep], on_failure: Optional[callable] = None):
        self.steps = steps
        self.on_failure = on_failure  # Callback invoked on compensation failure

    def run(self, initial_context: dict | None = None) -> SagaExecution:
        execution = SagaExecution(
            saga_id=str(uuid.uuid4()),
            steps=self.steps,
            context=initial_context or {},
            started_at=time.time(),
        )
        execution.status = SagaStatus.RUNNING
        completed_steps: list[SagaStep] = []

        logger.info(f"Saga {execution.saga_id} started with {len(self.steps)} steps")

        try:
            for step in self.steps:
                logger.info(f"Saga {execution.saga_id}: executing '{step.name}'")
                try:
                    result = step.execute(execution.context)
                    step.status = StepStatus.EXECUTED
                    step.executed_at = time.time()

                    # Merge step result into context
                    if isinstance(result, dict):
                        execution.context.update(result)

                    completed_steps.append(step)

                except Exception as e:
                    step.error = str(e)
                    logger.error(
                        f"Saga {execution.saga_id}: step '{step.name}' failed: {e}"
                    )
                    # Start compensation
                    self._compensate(execution, completed_steps)
                    return execution

            execution.status = SagaStatus.COMPLETED
            execution.completed_at = time.time()
            logger.info(f"Saga {execution.saga_id} completed successfully")

        except Exception as e:
            execution.status = SagaStatus.FAILED
            execution.error = str(e)
            logger.critical(f"Saga {execution.saga_id} unexpected error: {e}")

        return execution

    def _compensate(self, execution: SagaExecution, completed_steps: list[SagaStep]):
        """Compensate completed steps in reverse order."""
        execution.status = SagaStatus.COMPENSATING
        compensation_failures = []

        for step in reversed(completed_steps):
            logger.info(
                f"Saga {execution.saga_id}: compensating '{step.name}'"
            )
            try:
                step.compensate(execution.context)
                step.status = StepStatus.COMPENSATED
                step.compensated_at = time.time()
            except Exception as e:
                step.status = StepStatus.COMPENSATION_FAILED
                step.error = f"Compensation failed: {e}"
                compensation_failures.append((step.name, str(e)))
                logger.critical(
                    f"Saga {execution.saga_id}: compensation of '{step.name}' FAILED: {e}"
                )

        if compensation_failures:
            execution.status = SagaStatus.FAILED
            execution.error = f"Compensation failures: {compensation_failures}"
            if self.on_failure:
                self.on_failure(execution, compensation_failures)
        else:
            execution.status = SagaStatus.COMPENSATED
            execution.completed_at = time.time()

Here is an example of actual step implementations:

class ReserveCreditStep(SagaStep):
    """Reserves credit in the payment service."""

    def __init__(self, payment_client):
        super().__init__(name="reserve_credit")
        self.payment_client = payment_client

    def execute(self, context: dict) -> dict:
        result = self.payment_client.reserve(
            customer_id=context["customer_id"],
            amount=context["order_total"],
            idempotency_key=self.idempotency_key,
        )
        return {"reservation_id": result.reservation_id}

    def compensate(self, context: dict) -> None:
        self.payment_client.release(
            reservation_id=context["reservation_id"],
            idempotency_key=f"{self.idempotency_key}_compensate",
        )


class ReserveInventoryStep(SagaStep):
    """Reserves products in the inventory service."""

    def __init__(self, inventory_client):
        super().__init__(name="reserve_inventory")
        self.inventory_client = inventory_client

    def execute(self, context: dict) -> dict:
        result = self.inventory_client.reserve(
            items=context["order_items"],
            idempotency_key=self.idempotency_key,
        )
        return {"inventory_reservation_id": result.reservation_id}

    def compensate(self, context: dict) -> None:
        self.inventory_client.release(
            reservation_id=context["inventory_reservation_id"],
            idempotency_key=f"{self.idempotency_key}_compensate",
        )

Event Contracts and Schema Management

In Event-Driven Architecture, events are contracts between services. Without managing these contracts, a change in one service's event format can silently break consumer services.

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "$id": "https://events.example.com/order/created/v2",
  "title": "OrderCreated",
  "description": "Event published when an order is created",
  "type": "object",
  "required": ["event_id", "event_type", "event_version", "timestamp", "data"],
  "properties": {
    "event_id": {
      "type": "string",
      "format": "uuid",
      "description": "Unique event ID (used as idempotency key)"
    },
    "event_type": {
      "type": "string",
      "const": "order.created"
    },
    "event_version": {
      "type": "integer",
      "const": 2,
      "description": "Schema version. Used for compatibility determination"
    },
    "timestamp": {
      "type": "string",
      "format": "date-time"
    },
    "data": {
      "type": "object",
      "required": ["order_id", "customer_id", "items", "total_amount"],
      "properties": {
        "order_id": { "type": "string" },
        "customer_id": { "type": "string" },
        "items": {
          "type": "array",
          "items": {
            "type": "object",
            "required": ["sku", "quantity", "unit_price"],
            "properties": {
              "sku": { "type": "string" },
              "quantity": { "type": "integer", "minimum": 1 },
              "unit_price": { "type": "number", "minimum": 0 }
            }
          }
        },
        "total_amount": { "type": "number", "minimum": 0 },
        "currency": { "type": "string", "default": "KRW" },
        "shipping_address": {
          "type": "object",
          "description": "Field added in v2. v1 consumers ignore this field."
        }
      }
    }
  }
}

For schema compatibility policies, it is helpful to refer to the Confluent Schema Registry compatibility modes.

Compatibility ModeAllowed ChangesWhen to Use
BACKWARDAdd fields (optional), delete fieldsWhen consumers are deployed first
FORWARDAdd fields, delete fields with defaultsWhen producers are deployed first
FULLSatisfies both of the aboveWhen producer/consumer deployment order cannot be guaranteed
NONEAll changes allowed (dangerous)Development environments only

Outbox Pattern: Ensuring Atomicity of Event Publishing

One of the most common mistakes is that the two operations of "saving to DB and publishing an event" are not atomic.

# Dangerous pattern
1. Save order to DB     -- Success
2. Publish event to Kafka -- Failure (network issue)
=> Order is created but no event, so inventory is not deducted and payment is not processed

The Outbox pattern stores events together in a separate outbox table, and a separate process polls the outbox to deliver messages to the message broker.

-- outbox table schema
CREATE TABLE outbox (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type  VARCHAR(100) NOT NULL,    -- 'Order', 'Payment', etc.
    aggregate_id    VARCHAR(100) NOT NULL,    -- Order ID, Payment ID
    event_type      VARCHAR(200) NOT NULL,    -- 'order.created'
    event_version   INT NOT NULL DEFAULT 1,
    payload         JSONB NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMPTZ,              -- NULL means not yet published
    retry_count     INT NOT NULL DEFAULT 0,
    max_retries     INT NOT NULL DEFAULT 5
);

-- Index for querying unpublished events
CREATE INDEX idx_outbox_unpublished
    ON outbox (created_at)
    WHERE published_at IS NULL AND retry_count < max_retries;

-- Process order creation and event in a single transaction
BEGIN;
    INSERT INTO orders (id, customer_id, total, status)
    VALUES ('ord-123', 'cust-456', 50000, 'created');

    INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
    VALUES (
        'Order',
        'ord-123',
        'order.created',
        '{"order_id": "ord-123", "customer_id": "cust-456", "total": 50000}'::jsonb
    );
COMMIT;

The Outbox publisher runs as a separate process, periodically polling for unpublished events.

"""
Outbox Publisher: Delivers unpublished events from the outbox table to the broker.

Guarantees at-least-once delivery, and consumers must
handle idempotency on their side.
"""
import time
import json
import logging

logger = logging.getLogger(__name__)


class OutboxPublisher:
    def __init__(self, db_pool, kafka_producer, poll_interval_seconds: float = 1.0):
        self.db_pool = db_pool
        self.producer = kafka_producer
        self.poll_interval = poll_interval_seconds

    def run(self):
        """Main loop. Continuously polls and publishes unpublished events."""
        logger.info("OutboxPublisher started")
        while True:
            try:
                published_count = self._poll_and_publish()
                if published_count == 0:
                    time.sleep(self.poll_interval)
            except Exception as e:
                logger.error(f"OutboxPublisher error: {e}")
                time.sleep(self.poll_interval * 5)

    def _poll_and_publish(self, batch_size: int = 100) -> int:
        """Queries a batch of unpublished events and publishes them."""
        with self.db_pool.connection() as conn:
            with conn.cursor() as cur:
                cur.execute("""
                    SELECT id, aggregate_type, aggregate_id,
                           event_type, event_version, payload
                    FROM outbox
                    WHERE published_at IS NULL
                      AND retry_count < max_retries
                    ORDER BY created_at
                    LIMIT %s
                    FOR UPDATE SKIP LOCKED
                """, (batch_size,))

                rows = cur.fetchall()
                if not rows:
                    return 0

                published_ids = []
                failed_ids = []

                for row in rows:
                    event_id, agg_type, agg_id, evt_type, evt_ver, payload = row
                    topic = f"{agg_type.lower()}-events"

                    try:
                        self.producer.send(
                            topic=topic,
                            key=agg_id.encode(),
                            value=json.dumps({
                                "event_id": str(event_id),
                                "event_type": evt_type,
                                "event_version": evt_ver,
                                "data": payload,
                            }).encode(),
                        )
                        published_ids.append(event_id)
                    except Exception as e:
                        logger.warning(f"Failed to publish {event_id}: {e}")
                        failed_ids.append(event_id)

                # Mark successful events
                if published_ids:
                    cur.execute("""
                        UPDATE outbox SET published_at = NOW()
                        WHERE id = ANY(%s)
                    """, (published_ids,))

                # Increment retry count for failed events
                if failed_ids:
                    cur.execute("""
                        UPDATE outbox SET retry_count = retry_count + 1
                        WHERE id = ANY(%s)
                    """, (failed_ids,))

                conn.commit()
                return len(published_ids)

Idempotency: Handling Duplicate Events

In distributed systems, events are delivered "at-least-once," meaning the same event may arrive multiple times. Consumers must be idempotent.

class IdempotentEventHandler:
    """Handler wrapper that safely processes duplicate events.

    Records processing history using event_id as key,
    and ignores already-processed events.
    """

    def __init__(self, db_pool, handler_fn):
        self.db_pool = db_pool
        self.handler_fn = handler_fn

    def handle(self, event: dict) -> bool:
        event_id = event["event_id"]

        with self.db_pool.connection() as conn:
            with conn.cursor() as cur:
                # Check if already processed
                cur.execute(
                    "SELECT 1 FROM processed_events WHERE event_id = %s",
                    (event_id,),
                )
                if cur.fetchone():
                    logger.info(f"Duplicate event {event_id}, skipping")
                    return True

                try:
                    # Execute business logic and record event processing in a single transaction
                    self.handler_fn(event, cur)

                    cur.execute(
                        "INSERT INTO processed_events (event_id, processed_at) VALUES (%s, NOW())",
                        (event_id,),
                    )
                    conn.commit()
                    return True

                except Exception as e:
                    conn.rollback()
                    logger.error(f"Failed to process event {event_id}: {e}")
                    return False

Saga Operations Monitoring

When Sagas run in production, you need to track the status of each saga instance in real-time.

-- Saga status dashboard queries

-- 1. Saga status distribution by time period
SELECT
    date_trunc('hour', started_at) AS hour,
    status,
    COUNT(*) AS count,
    AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) AS avg_duration_seconds
FROM saga_executions
WHERE started_at >= NOW() - INTERVAL '24 hours'
GROUP BY 1, 2
ORDER BY 1 DESC, 2;

-- 2. Sagas with compensation failures (immediate response needed)
SELECT
    saga_id,
    started_at,
    error,
    jsonb_agg(
        jsonb_build_object(
            'step', step_name,
            'status', step_status,
            'error', step_error
        ) ORDER BY step_order
    ) AS step_details
FROM saga_executions e
JOIN saga_steps s ON e.saga_id = s.saga_id
WHERE e.status = 'FAILED'
  AND e.started_at >= NOW() - INTERVAL '7 days'
GROUP BY e.saga_id, e.started_at, e.error
ORDER BY e.started_at DESC;

-- 3. Failure rate by step (identifying bottlenecks)
SELECT
    step_name,
    COUNT(*) AS total_executions,
    SUM(CASE WHEN step_status = 'executed' THEN 1 ELSE 0 END) AS successes,
    SUM(CASE WHEN step_status != 'executed' THEN 1 ELSE 0 END) AS failures,
    ROUND(
        100.0 * SUM(CASE WHEN step_status != 'executed' THEN 1 ELSE 0 END) / COUNT(*),
        2
    ) AS failure_rate_pct
FROM saga_steps
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY step_name
ORDER BY failure_rate_pct DESC;

Deciding Whether to Adopt: Not Everything Needs a Saga

Saga + EDA is powerful but complex. Before adoption, you should honestly answer the following questions.

When Saga/EDA is appropriate:

  • When data consistency between services is essential, but eventual consistency is sufficient rather than strong consistency (ACID)
  • When independent deployment and scaling of each service is a business requirement
  • When teams are separated along service ownership boundaries and each team needs its own deployment cycle
  • When transaction volume is high enough that the locking costs of 2PC are unacceptable

When monolith transactions are better:

  • Early-stage products where service boundaries are unclear and change frequently
  • When team size is small (5 or fewer) and the operational burden of distributed systems outweighs the development speed benefits
  • When data consistency requirements are real-time, making eventual consistency business-unacceptable (e.g., bank account transfers)
  • When the team lacks the operational capability for message brokers (Kafka, RabbitMQ)

Practical Troubleshooting

What If Compensating Transactions Fail?

This is the most difficult problem with Sagas. When compensation fails, the system ends up in a "half state." The order is cancelled but the payment has not been refunded.

Countermeasures:

  1. Dead Letter Queue (DLQ): Send compensation failure events to a DLQ and have the operations team process them manually.
  2. Retry Policy: Retry 3-5 times with exponential backoff. Temporary network issues are resolved this way.
  3. Manual Recovery Runbook: Document step-by-step manual recovery procedures for cases that cannot be resolved even from the DLQ.
  4. Alerting: Set compensation failures as P1 alerts. If you think "I'll deal with it later," data inconsistencies accumulate.

When Event Ordering Is Not Guaranteed

Kafka guarantees ordering within a partition but not across partitions. Events for the same order going to different partitions can have their order reversed.

Countermeasures:

  1. Use the same partition key for events of the same aggregate (e.g., order_id).
  2. Check the sequence number of events on the consumer side, and if the order is wrong, wait briefly or send to a reprocessing queue.
  3. Include causation_id (the ID of the causing event) in events to track causal relationships.

Dead Letter Queue Keeps Growing

Symptom: Thousands of events are piling up in the DLQ and nobody is processing them.

Cause: There is no DLQ processing process, or the processing responsibility is unclear.

Countermeasure: (1) Set up alerts on DLQ count (warning at 100+ items, P1 at 1000+ items). (2) Include weekly DLQ review in team routines. (3) Implement triage logic that classifies events into those that can be auto-reprocessed and those requiring manual intervention.

References

Quiz
  1. What is the main reason the Saga pattern is used instead of 2PC (Two-Phase Commit)? Answer: ||2PC makes the coordinator a single point of failure, is vulnerable to network partitions, and causes throughput to plummet due to extended lock duration. Saga processes each step as an independent local transaction and recovers through compensating transactions on failure, resulting in higher availability and throughput.||

  2. What is the biggest operational difference between Choreography and Orchestration? Answer: ||In Choreography, the entire saga flow is distributed across multiple services, making it hard to grasp at a glance and complex to debug. In Orchestration, the entire flow exists explicitly in the orchestrator code, making tracking and debugging easy, but the orchestrator can become a single point of failure.||

  3. What core problem does the Outbox pattern solve? Answer: ||The problem that DB save and event publishing are not atomic. If saved to DB but event publishing fails, data inconsistency occurs. The Outbox pattern saves events to an outbox table within the same DB transaction, and a separate process reads and publishes them, ensuring atomicity.||

  4. Why is consumer idempotency essential? Answer: ||In distributed systems, events are delivered at-least-once, so the same event may arrive multiple times. Without idempotency, the same order could be charged twice or inventory could be double-deducted.||

  5. What are the minimum three things to do when a Saga's compensating transaction fails? Answer: ||Store the failure event in a DLQ, send a P1 alert, and have the operations team intervene following a manual recovery runbook to restore data consistency.||

  6. How do you ensure event ordering for the same order in Kafka? Answer: ||Set the partition key to the same aggregate (e.g., order_id) so that events for the same aggregate go to the same partition. Kafka guarantees ordering within a partition, so events with the same key are processed in order.||

  7. In what situation is the FULL event schema compatibility mode recommended? Answer: ||When the deployment order of producers and consumers cannot be guaranteed. FULL compatibility satisfies both BACKWARD (consumer first) and FORWARD (producer first), making it safe regardless of deployment order.||

  8. What are two representative situations where Saga/EDA adoption should be avoided? Answer: ||(1) In early product stages where service boundaries change frequently, monoliths have lower change costs. (2) When team size is small and the operational burden of distributed systems (Kafka, monitoring, DLQ processing, etc.) outweighs the development speed benefits.||