Skip to content
Published on

Event-Driven + Saga Architecture Tradeoffs

Authors
Event-Driven + Saga Architecture Tradeoffs

The Moment Distributed Transactions Become Necessary

In a monolith, you can handle order creation, inventory deduction, and payment processing all within a single DB transaction. ACID is guaranteed, so if something fails midway, a single rollback handles everything.

But the moment services are separated, this "obvious" capability becomes impossible. When the Order Service, Inventory Service, and Payment Service each have their own databases, you cannot wrap them in a single transaction. 2PC (Two-Phase Commit) is theoretically possible, but it is vulnerable to network partitions, the coordinator becomes a single point of failure, and lock duration increases causing throughput to plummet.

The Saga pattern is an alternative systematized by Chris Richardson (microservices.io/patterns/data/saga). The core idea is simple:

Decompose a distributed transaction into multiple local transactions, and if any intermediate step fails, execute compensating transactions to undo the already-completed transactions.

This article covers the real tradeoffs that arise when combining Saga with Event-Driven Architecture (EDA). It honestly analyzes the complexity, costs, and organizational requirements behind the slogan "separating enables scaling."

Two Coordination Approaches for Saga

Choreography: Event-Based Autonomous Coordination

Each service completes its local transaction and publishes a domain event, then the next service subscribes to that event and performs its own work. There is no central orchestrator.

OrderService              InventoryService          PaymentService
    |                        |                        |
    |-- OrderCreated ------->|                        |
    |                        |-- InventoryReserved --->|
    |                        |                        |-- PaymentProcessed
    |<----------- OrderConfirmed ----------------------|

On failure, compensation events flow in reverse:

PaymentService: PaymentFailed
  --> InventoryService: InventoryReleased
    --> OrderService: OrderCancelled

Advantages: No direct dependencies between services. No need to modify existing services when adding new ones. Disadvantages: The entire flow cannot be seen at a glance in code. When event chains become complex, tracking where failures occurred is difficult. Risk of circular dependencies.

Orchestration: Central Orchestrator Approach

The Saga Orchestrator manages the entire flow. It determines the next step based on success/failure of each stage and controls the compensation order upon failure.

SagaOrchestrator
    |-- (1) reserve_credit --> PaymentService
    |-- (2) reserve_inventory --> InventoryService
    |-- (3) create_shipment --> ShippingService
    |
    |  On (3) failure:
    |-- compensate (2) release_inventory
    |-- compensate (1) refund_credit

Advantages: The entire flow exists explicitly in the orchestrator code. Debugging and monitoring are easy. Disadvantages: The orchestrator can become a single point of failure. As the number of services grows, the orchestrator's complexity increases.

Saga Orchestrator Implementation

The following is a core implementation of a Saga Orchestrator usable in production.

"""
Saga Orchestrator: Manages execution and compensation of distributed transactions.

Each step implements execute() and compensate(),
and the orchestrator executes sequentially, then compensates in reverse order on failure.
"""
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import logging
import uuid
import time

logger = logging.getLogger(__name__)


class SagaStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    COMPENSATED = "compensated"
    FAILED = "failed"  # When compensation itself fails


class StepStatus(Enum):
    PENDING = "pending"
    EXECUTED = "executed"
    COMPENSATED = "compensated"
    COMPENSATION_FAILED = "compensation_failed"


@dataclass
class SagaStep:
    """Individual step of a Saga.

    execute() and compensate() are implemented in subclasses.
    Retry safety is ensured through idempotency_key.
    """
    name: str
    status: StepStatus = StepStatus.PENDING
    idempotency_key: str = ""
    executed_at: Optional[float] = None
    compensated_at: Optional[float] = None
    error: Optional[str] = None

    def __post_init__(self):
        if not self.idempotency_key:
            self.idempotency_key = str(uuid.uuid4())

    def execute(self, context: dict) -> dict:
        raise NotImplementedError

    def compensate(self, context: dict) -> None:
        raise NotImplementedError


@dataclass
class SagaExecution:
    """Saga execution record."""
    saga_id: str
    status: SagaStatus = SagaStatus.PENDING
    steps: list[SagaStep] = field(default_factory=list)
    context: dict = field(default_factory=dict)
    started_at: Optional[float] = None
    completed_at: Optional[float] = None
    error: Optional[str] = None


class SagaOrchestrator:
    """Orchestrator that manages Saga execution.

    Execution flow:
    1. Execute each step sequentially
    2. If any step fails, compensate already-executed steps in reverse order
    3. If compensation also fails, record as FAILED and send alert
    """

    def __init__(self, steps: list[SagaStep], on_failure: Optional[callable] = None):
        self.steps = steps
        self.on_failure = on_failure  # Callback invoked on compensation failure

    def run(self, initial_context: dict | None = None) -> SagaExecution:
        execution = SagaExecution(
            saga_id=str(uuid.uuid4()),
            steps=self.steps,
            context=initial_context or {},
            started_at=time.time(),
        )
        execution.status = SagaStatus.RUNNING
        completed_steps: list[SagaStep] = []

        logger.info(f"Saga {execution.saga_id} started with {len(self.steps)} steps")

        try:
            for step in self.steps:
                logger.info(f"Saga {execution.saga_id}: executing '{step.name}'")
                try:
                    result = step.execute(execution.context)
                    step.status = StepStatus.EXECUTED
                    step.executed_at = time.time()

                    # Merge step result into context
                    if isinstance(result, dict):
                        execution.context.update(result)

                    completed_steps.append(step)

                except Exception as e:
                    step.error = str(e)
                    logger.error(
                        f"Saga {execution.saga_id}: step '{step.name}' failed: {e}"
                    )
                    # Start compensation
                    self._compensate(execution, completed_steps)
                    return execution

            execution.status = SagaStatus.COMPLETED
            execution.completed_at = time.time()
            logger.info(f"Saga {execution.saga_id} completed successfully")

        except Exception as e:
            execution.status = SagaStatus.FAILED
            execution.error = str(e)
            logger.critical(f"Saga {execution.saga_id} unexpected error: {e}")

        return execution

    def _compensate(self, execution: SagaExecution, completed_steps: list[SagaStep]):
        """Compensate completed steps in reverse order."""
        execution.status = SagaStatus.COMPENSATING
        compensation_failures = []

        for step in reversed(completed_steps):
            logger.info(
                f"Saga {execution.saga_id}: compensating '{step.name}'"
            )
            try:
                step.compensate(execution.context)
                step.status = StepStatus.COMPENSATED
                step.compensated_at = time.time()
            except Exception as e:
                step.status = StepStatus.COMPENSATION_FAILED
                step.error = f"Compensation failed: {e}"
                compensation_failures.append((step.name, str(e)))
                logger.critical(
                    f"Saga {execution.saga_id}: compensation of '{step.name}' FAILED: {e}"
                )

        if compensation_failures:
            execution.status = SagaStatus.FAILED
            execution.error = f"Compensation failures: {compensation_failures}"
            if self.on_failure:
                self.on_failure(execution, compensation_failures)
        else:
            execution.status = SagaStatus.COMPENSATED
            execution.completed_at = time.time()

Here is an example of actual step implementations:

class ReserveCreditStep(SagaStep):
    """Reserves credit in the payment service."""

    def __init__(self, payment_client):
        super().__init__(name="reserve_credit")
        self.payment_client = payment_client

    def execute(self, context: dict) -> dict:
        result = self.payment_client.reserve(
            customer_id=context["customer_id"],
            amount=context["order_total"],
            idempotency_key=self.idempotency_key,
        )
        return {"reservation_id": result.reservation_id}

    def compensate(self, context: dict) -> None:
        self.payment_client.release(
            reservation_id=context["reservation_id"],
            idempotency_key=f"{self.idempotency_key}_compensate",
        )


class ReserveInventoryStep(SagaStep):
    """Reserves products in the inventory service."""

    def __init__(self, inventory_client):
        super().__init__(name="reserve_inventory")
        self.inventory_client = inventory_client

    def execute(self, context: dict) -> dict:
        result = self.inventory_client.reserve(
            items=context["order_items"],
            idempotency_key=self.idempotency_key,
        )
        return {"inventory_reservation_id": result.reservation_id}

    def compensate(self, context: dict) -> None:
        self.inventory_client.release(
            reservation_id=context["inventory_reservation_id"],
            idempotency_key=f"{self.idempotency_key}_compensate",
        )

Event Contracts and Schema Management

In Event-Driven Architecture, events are contracts between services. Without managing these contracts, a change in one service's event format can silently break consumer services.

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "$id": "https://events.example.com/order/created/v2",
  "title": "OrderCreated",
  "description": "Event published when an order is created",
  "type": "object",
  "required": ["event_id", "event_type", "event_version", "timestamp", "data"],
  "properties": {
    "event_id": {
      "type": "string",
      "format": "uuid",
      "description": "Unique event ID (used as idempotency key)"
    },
    "event_type": {
      "type": "string",
      "const": "order.created"
    },
    "event_version": {
      "type": "integer",
      "const": 2,
      "description": "Schema version. Used for compatibility determination"
    },
    "timestamp": {
      "type": "string",
      "format": "date-time"
    },
    "data": {
      "type": "object",
      "required": ["order_id", "customer_id", "items", "total_amount"],
      "properties": {
        "order_id": { "type": "string" },
        "customer_id": { "type": "string" },
        "items": {
          "type": "array",
          "items": {
            "type": "object",
            "required": ["sku", "quantity", "unit_price"],
            "properties": {
              "sku": { "type": "string" },
              "quantity": { "type": "integer", "minimum": 1 },
              "unit_price": { "type": "number", "minimum": 0 }
            }
          }
        },
        "total_amount": { "type": "number", "minimum": 0 },
        "currency": { "type": "string", "default": "KRW" },
        "shipping_address": {
          "type": "object",
          "description": "Field added in v2. v1 consumers ignore this field."
        }
      }
    }
  }
}

For schema compatibility policies, it is helpful to refer to the Confluent Schema Registry compatibility modes.

Compatibility ModeAllowed ChangesWhen to Use
BACKWARDAdd fields (optional), delete fieldsWhen consumers are deployed first
FORWARDAdd fields, delete fields with defaultsWhen producers are deployed first
FULLSatisfies both of the aboveWhen producer/consumer deployment order cannot be guaranteed
NONEAll changes allowed (dangerous)Development environments only

Outbox Pattern: Ensuring Atomicity of Event Publishing

One of the most common mistakes is that the two operations of "saving to DB and publishing an event" are not atomic.

# Dangerous pattern
1. Save order to DB     -- Success
2. Publish event to Kafka -- Failure (network issue)
=> Order is created but no event, so inventory is not deducted and payment is not processed

The Outbox pattern stores events together in a separate outbox table, and a separate process polls the outbox to deliver messages to the message broker.

-- outbox table schema
CREATE TABLE outbox (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type  VARCHAR(100) NOT NULL,    -- 'Order', 'Payment', etc.
    aggregate_id    VARCHAR(100) NOT NULL,    -- Order ID, Payment ID
    event_type      VARCHAR(200) NOT NULL,    -- 'order.created'
    event_version   INT NOT NULL DEFAULT 1,
    payload         JSONB NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMPTZ,              -- NULL means not yet published
    retry_count     INT NOT NULL DEFAULT 0,
    max_retries     INT NOT NULL DEFAULT 5
);

-- Index for querying unpublished events
CREATE INDEX idx_outbox_unpublished
    ON outbox (created_at)
    WHERE published_at IS NULL AND retry_count < max_retries;

-- Process order creation and event in a single transaction
BEGIN;
    INSERT INTO orders (id, customer_id, total, status)
    VALUES ('ord-123', 'cust-456', 50000, 'created');

    INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
    VALUES (
        'Order',
        'ord-123',
        'order.created',
        '{"order_id": "ord-123", "customer_id": "cust-456", "total": 50000}'::jsonb
    );
COMMIT;

The Outbox publisher runs as a separate process, periodically polling for unpublished events.

"""
Outbox Publisher: Delivers unpublished events from the outbox table to the broker.

Guarantees at-least-once delivery, and consumers must
handle idempotency on their side.
"""
import time
import json
import logging

logger = logging.getLogger(__name__)


class OutboxPublisher:
    def __init__(self, db_pool, kafka_producer, poll_interval_seconds: float = 1.0):
        self.db_pool = db_pool
        self.producer = kafka_producer
        self.poll_interval = poll_interval_seconds

    def run(self):
        """Main loop. Continuously polls and publishes unpublished events."""
        logger.info("OutboxPublisher started")
        while True:
            try:
                published_count = self._poll_and_publish()
                if published_count == 0:
                    time.sleep(self.poll_interval)
            except Exception as e:
                logger.error(f"OutboxPublisher error: {e}")
                time.sleep(self.poll_interval * 5)

    def _poll_and_publish(self, batch_size: int = 100) -> int:
        """Queries a batch of unpublished events and publishes them."""
        with self.db_pool.connection() as conn:
            with conn.cursor() as cur:
                cur.execute("""
                    SELECT id, aggregate_type, aggregate_id,
                           event_type, event_version, payload
                    FROM outbox
                    WHERE published_at IS NULL
                      AND retry_count < max_retries
                    ORDER BY created_at
                    LIMIT %s
                    FOR UPDATE SKIP LOCKED
                """, (batch_size,))

                rows = cur.fetchall()
                if not rows:
                    return 0

                published_ids = []
                failed_ids = []

                for row in rows:
                    event_id, agg_type, agg_id, evt_type, evt_ver, payload = row
                    topic = f"{agg_type.lower()}-events"

                    try:
                        self.producer.send(
                            topic=topic,
                            key=agg_id.encode(),
                            value=json.dumps({
                                "event_id": str(event_id),
                                "event_type": evt_type,
                                "event_version": evt_ver,
                                "data": payload,
                            }).encode(),
                        )
                        published_ids.append(event_id)
                    except Exception as e:
                        logger.warning(f"Failed to publish {event_id}: {e}")
                        failed_ids.append(event_id)

                # Mark successful events
                if published_ids:
                    cur.execute("""
                        UPDATE outbox SET published_at = NOW()
                        WHERE id = ANY(%s)
                    """, (published_ids,))

                # Increment retry count for failed events
                if failed_ids:
                    cur.execute("""
                        UPDATE outbox SET retry_count = retry_count + 1
                        WHERE id = ANY(%s)
                    """, (failed_ids,))

                conn.commit()
                return len(published_ids)

Idempotency: Handling Duplicate Events

In distributed systems, events are delivered "at-least-once," meaning the same event may arrive multiple times. Consumers must be idempotent.

class IdempotentEventHandler:
    """Handler wrapper that safely processes duplicate events.

    Records processing history using event_id as key,
    and ignores already-processed events.
    """

    def __init__(self, db_pool, handler_fn):
        self.db_pool = db_pool
        self.handler_fn = handler_fn

    def handle(self, event: dict) -> bool:
        event_id = event["event_id"]

        with self.db_pool.connection() as conn:
            with conn.cursor() as cur:
                # Check if already processed
                cur.execute(
                    "SELECT 1 FROM processed_events WHERE event_id = %s",
                    (event_id,),
                )
                if cur.fetchone():
                    logger.info(f"Duplicate event {event_id}, skipping")
                    return True

                try:
                    # Execute business logic and record event processing in a single transaction
                    self.handler_fn(event, cur)

                    cur.execute(
                        "INSERT INTO processed_events (event_id, processed_at) VALUES (%s, NOW())",
                        (event_id,),
                    )
                    conn.commit()
                    return True

                except Exception as e:
                    conn.rollback()
                    logger.error(f"Failed to process event {event_id}: {e}")
                    return False

Saga Operations Monitoring

When Sagas run in production, you need to track the status of each saga instance in real-time.

-- Saga status dashboard queries

-- 1. Saga status distribution by time period
SELECT
    date_trunc('hour', started_at) AS hour,
    status,
    COUNT(*) AS count,
    AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) AS avg_duration_seconds
FROM saga_executions
WHERE started_at >= NOW() - INTERVAL '24 hours'
GROUP BY 1, 2
ORDER BY 1 DESC, 2;

-- 2. Sagas with compensation failures (immediate response needed)
SELECT
    saga_id,
    started_at,
    error,
    jsonb_agg(
        jsonb_build_object(
            'step', step_name,
            'status', step_status,
            'error', step_error
        ) ORDER BY step_order
    ) AS step_details
FROM saga_executions e
JOIN saga_steps s ON e.saga_id = s.saga_id
WHERE e.status = 'FAILED'
  AND e.started_at >= NOW() - INTERVAL '7 days'
GROUP BY e.saga_id, e.started_at, e.error
ORDER BY e.started_at DESC;

-- 3. Failure rate by step (identifying bottlenecks)
SELECT
    step_name,
    COUNT(*) AS total_executions,
    SUM(CASE WHEN step_status = 'executed' THEN 1 ELSE 0 END) AS successes,
    SUM(CASE WHEN step_status != 'executed' THEN 1 ELSE 0 END) AS failures,
    ROUND(
        100.0 * SUM(CASE WHEN step_status != 'executed' THEN 1 ELSE 0 END) / COUNT(*),
        2
    ) AS failure_rate_pct
FROM saga_steps
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY step_name
ORDER BY failure_rate_pct DESC;

Deciding Whether to Adopt: Not Everything Needs a Saga

Saga + EDA is powerful but complex. Before adoption, you should honestly answer the following questions.

When Saga/EDA is appropriate:

  • When data consistency between services is essential, but eventual consistency is sufficient rather than strong consistency (ACID)
  • When independent deployment and scaling of each service is a business requirement
  • When teams are separated along service ownership boundaries and each team needs its own deployment cycle
  • When transaction volume is high enough that the locking costs of 2PC are unacceptable

When monolith transactions are better:

  • Early-stage products where service boundaries are unclear and change frequently
  • When team size is small (5 or fewer) and the operational burden of distributed systems outweighs the development speed benefits
  • When data consistency requirements are real-time, making eventual consistency business-unacceptable (e.g., bank account transfers)
  • When the team lacks the operational capability for message brokers (Kafka, RabbitMQ)

Practical Troubleshooting

What If Compensating Transactions Fail?

This is the most difficult problem with Sagas. When compensation fails, the system ends up in a "half state." The order is cancelled but the payment has not been refunded.

Countermeasures:

  1. Dead Letter Queue (DLQ): Send compensation failure events to a DLQ and have the operations team process them manually.
  2. Retry Policy: Retry 3-5 times with exponential backoff. Temporary network issues are resolved this way.
  3. Manual Recovery Runbook: Document step-by-step manual recovery procedures for cases that cannot be resolved even from the DLQ.
  4. Alerting: Set compensation failures as P1 alerts. If you think "I'll deal with it later," data inconsistencies accumulate.

When Event Ordering Is Not Guaranteed

Kafka guarantees ordering within a partition but not across partitions. Events for the same order going to different partitions can have their order reversed.

Countermeasures:

  1. Use the same partition key for events of the same aggregate (e.g., order_id).
  2. Check the sequence number of events on the consumer side, and if the order is wrong, wait briefly or send to a reprocessing queue.
  3. Include causation_id (the ID of the causing event) in events to track causal relationships.

Dead Letter Queue Keeps Growing

Symptom: Thousands of events are piling up in the DLQ and nobody is processing them.

Cause: There is no DLQ processing process, or the processing responsibility is unclear.

Countermeasure: (1) Set up alerts on DLQ count (warning at 100+ items, P1 at 1000+ items). (2) Include weekly DLQ review in team routines. (3) Implement triage logic that classifies events into those that can be auto-reprocessed and those requiring manual intervention.

References

Quiz
  1. What is the main reason the Saga pattern is used instead of 2PC (Two-Phase Commit)? Answer: ||2PC makes the coordinator a single point of failure, is vulnerable to network partitions, and causes throughput to plummet due to extended lock duration. Saga processes each step as an independent local transaction and recovers through compensating transactions on failure, resulting in higher availability and throughput.||

  2. What is the biggest operational difference between Choreography and Orchestration? Answer: ||In Choreography, the entire saga flow is distributed across multiple services, making it hard to grasp at a glance and complex to debug. In Orchestration, the entire flow exists explicitly in the orchestrator code, making tracking and debugging easy, but the orchestrator can become a single point of failure.||

  3. What core problem does the Outbox pattern solve? Answer: ||The problem that DB save and event publishing are not atomic. If saved to DB but event publishing fails, data inconsistency occurs. The Outbox pattern saves events to an outbox table within the same DB transaction, and a separate process reads and publishes them, ensuring atomicity.||

  4. Why is consumer idempotency essential? Answer: ||In distributed systems, events are delivered at-least-once, so the same event may arrive multiple times. Without idempotency, the same order could be charged twice or inventory could be double-deducted.||

  5. What are the minimum three things to do when a Saga's compensating transaction fails? Answer: ||Store the failure event in a DLQ, send a P1 alert, and have the operations team intervene following a manual recovery runbook to restore data consistency.||

  6. How do you ensure event ordering for the same order in Kafka? Answer: ||Set the partition key to the same aggregate (e.g., order_id) so that events for the same aggregate go to the same partition. Kafka guarantees ordering within a partition, so events with the same key are processed in order.||

  7. In what situation is the FULL event schema compatibility mode recommended? Answer: ||When the deployment order of producers and consumers cannot be guaranteed. FULL compatibility satisfies both BACKWARD (consumer first) and FORWARD (producer first), making it safe regardless of deployment order.||

  8. What are two representative situations where Saga/EDA adoption should be avoided? Answer: ||(1) In early product stages where service boundaries change frequently, monoliths have lower change costs. (2) When team size is small and the operational burden of distributed systems (Kafka, monitoring, DLQ processing, etc.) outweighs the development speed benefits.||