Skip to content
Published on

Async Processing Patterns Mastery: CQRS, Saga, Event Sourcing — Senior Backend Developer Essential Guide

Authors

1. Why Async? The Survival Strategy for Large-Scale Systems

Modern large-scale systems handle traffic volumes that synchronous request-response models simply cannot manage.

Real-World Examples:

CompanyScaleAsync Processing Approach
Netflix260M subscribers, millions of events/secApache Kafka-based event streaming
UberPetabytes of daily real-time dataApache Kafka + Apache Flink
LinkedIn7T+ messages/day via KafkaSelf-developed Kafka infrastructure
Shopify800K requests/sec on Black FridayAsync order processing pipeline

In synchronous architecture, when Service A calls Service B, and B calls C, the total latency is the sum of each service call. If any single service slows down, the entire chain blocks.

In asynchronous architecture, a message broker decouples services, enabling each to scale independently.

Synchronous:  Client -> A(50ms) -> B(100ms) -> C(200ms) = 350ms total wait
Asynchronous: Client -> A(50ms) -> Message Broker -> (B, C process in parallel)
              Client gets response after 50ms, B and C process asynchronously

Key Benefits of Going Async:

  1. Temporal decoupling — Producers and consumers do not need to be running simultaneously
  2. Independent scaling — Read and write workloads can be scaled separately
  3. Fault isolation — A single service failure does not cascade to the entire system
  4. Peak traffic absorption — Message queues act as buffers

However, async is not free. Eventual Consistency, message ordering, duplicate handling, and distributed transactions are new challenges. The patterns covered in this guide are the answers to these problems.


2. Event-Driven Architecture Fundamentals

Event-Driven Architecture (EDA) is the foundation of async patterns. State changes in the system are expressed as "events," and interested services subscribe and react to them.

2-1. Domain Events vs Integration Events

AspectDomain EventIntegration Event
ScopeWithin a single bounded contextBetween bounded contexts
ExampleOrderItemAddedOrderPlaced
DeliveryIn-memory event busMessage broker (Kafka, RabbitMQ)
Schema mgmtFreely changeable (internal)Contract management required
// Domain Event - within bounded context
public record OrderItemAdded(
    UUID orderId,
    UUID productId,
    int quantity,
    BigDecimal price,
    Instant occurredAt
) implements DomainEvent {}

// Integration Event - published to external services
public record OrderPlacedEvent(
    UUID orderId,
    UUID customerId,
    BigDecimal totalAmount,
    List<OrderLineItem> items,
    Instant occurredAt,
    int version  // schema version management
) implements IntegrationEvent {}

2-2. Event Bus vs Event Store

An Event Bus (e.g., Kafka, RabbitMQ) is a pipeline that delivers events. Once consumers receive events, they are typically deleted (or expire after TTL).

An Event Store permanently persists all events. Events can be replayed to reconstruct state at any point in time.

Event Bus (Kafka):
  Producer -> [Topic: orders] -> Consumer Group A (Order Service)
                               -> Consumer Group B (Notification Service)
                               -> Consumer Group C (Analytics Service)

Event Store:
  [Event #1: OrderCreated]
  [Event #2: ItemAdded]
  [Event #3: PaymentReceived]
  [Event #4: OrderShipped]
  -> Replay events #1-#4 to reconstruct current order state

2-3. Understanding Eventual Consistency

Maintaining strong consistency in distributed systems requires distributed locks, 2PC (2-Phase Commit), etc., which significantly degrade performance and availability.

According to the CAP theorem, during network partitions you must choose between Consistency (C) and Availability (A). Most large-scale systems choose Availability and accept Eventual Consistency.

Eventual Consistency means "data may be inconsistent right now, but given enough time, all nodes will converge to the same state."

Practical Implications:

  • Inventory might not decrease immediately after an order (milliseconds to seconds delay)
  • Show users "Order is being processed" first, then update after confirmation
  • Minimizing the consistency window is the key concern

3. CQRS Deep Dive

CQRS (Command Query Responsibility Segregation) completely separates the read and write models.

3-1. Core Concept

In traditional CRUD, a single data model handles both reads and writes. In reality, reads and writes have very different requirements.

Traditional CRUD:
  Client -> [Single Model] -> Database
            (both read/write)

CQRS:
  Client --Command--> [Write Model] -> Command DB (normalized)
                            | Events
  Client <--Query---- [Read Model]  <- Read DB (denormalized, optimized)

Write Model (Command Side):

  • Business rule validation
  • Domain invariant enforcement
  • Normalized schema
  • Strong consistency

Read Model (Query Side):

  • Denormalized views optimized for queries
  • Multiple storage options (Elasticsearch, Redis, MongoDB)
  • Eventual Consistency acceptable

3-2. When to Use CQRS

Good fit:

  • Extreme read/write ratio (reads 100:1 or more)
  • Complex domain logic + diverse query requirements
  • Different scaling needs for reads and writes
  • When used together with Event Sourcing

Not a good fit:

  • Simple CRUD applications
  • When strong consistency is absolutely required
  • Simple domain with predictable query patterns

3-3. Implementation with Spring Boot + Kafka

// === Command Side: Order Creation ===

@Service
@RequiredArgsConstructor
public class OrderCommandService {

    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    @Transactional
    public UUID createOrder(CreateOrderCommand command) {
        // 1. Business rule validation
        validateOrder(command);

        // 2. Process in domain model
        Order order = Order.create(
            command.customerId(),
            command.items(),
            command.shippingAddress()
        );

        // 3. Save to Write DB
        orderRepository.save(order);

        // 4. Publish event (for Read Model sync)
        OrderCreatedEvent event = new OrderCreatedEvent(
            order.getId(),
            order.getCustomerId(),
            order.getTotalAmount(),
            order.getItems().stream()
                .map(this::toEventItem)
                .toList(),
            Instant.now()
        );
        kafkaTemplate.send("order-events", order.getId().toString(), event);

        return order.getId();
    }

    private void validateOrder(CreateOrderCommand command) {
        if (command.items().isEmpty()) {
            throw new InvalidOrderException("Order items are empty");
        }
        // Check inventory, customer status, etc.
    }
}
// === Query Side: Order Query Projection ===

@Service
@RequiredArgsConstructor
public class OrderQueryService {

    private final OrderReadRepository readRepository;

    public OrderDetailView getOrderDetail(UUID orderId) {
        return readRepository.findById(orderId)
            .orElseThrow(() -> new OrderNotFoundException(orderId));
    }

    public Page<OrderSummaryView> getCustomerOrders(
            UUID customerId, Pageable pageable) {
        return readRepository.findByCustomerId(customerId, pageable);
    }
}

// Read Model - denormalized view optimized for queries
@Document(collection = "order_views")
public record OrderDetailView(
    UUID orderId,
    UUID customerId,
    String customerName,      // Denormalized: includes customer name
    String customerEmail,
    BigDecimal totalAmount,
    String status,
    List<OrderItemView> items,
    AddressView shippingAddress,
    Instant createdAt,
    Instant lastUpdatedAt
) {}
// === Event Handler: Write -> Read Sync ===

@Component
@RequiredArgsConstructor
public class OrderProjectionHandler {

    private final OrderReadRepository readRepository;
    private final CustomerQueryClient customerClient;

    @KafkaListener(topics = "order-events", groupId = "order-projection")
    public void handleOrderEvent(OrderEvent event) {
        switch (event) {
            case OrderCreatedEvent e -> handleOrderCreated(e);
            case OrderStatusChangedEvent e -> handleStatusChanged(e);
            case OrderCancelledEvent e -> handleOrderCancelled(e);
            default -> log.warn("Unknown event type: {}", event.getClass());
        }
    }

    private void handleOrderCreated(OrderCreatedEvent event) {
        // Fetch customer info to create denormalized view
        CustomerInfo customer = customerClient.getCustomer(event.customerId());

        OrderDetailView view = new OrderDetailView(
            event.orderId(),
            event.customerId(),
            customer.name(),
            customer.email(),
            event.totalAmount(),
            "CREATED",
            event.items().stream().map(this::toItemView).toList(),
            null,
            event.occurredAt(),
            event.occurredAt()
        );
        readRepository.save(view);
    }
}

3-4. Read Model Synchronization Strategies

StrategyLatencyConsistencyComplexityBest For
Event Handler (Kafka)ms to secondsEventualMediumMost cases
CDC (Debezium)SecondsEventualHighLegacy system integration
Polling (Scheduled)Seconds to minutesEventualLowSimple cases
Synchronous updateImmediateStrongLowLow performance requirements

4. Saga Pattern Mastery

How do you manage business transactions spanning multiple services in microservices? Traditional DB transactions from monoliths are not available. The Saga pattern decomposes distributed transactions into a series of local transactions.

4-1. Choreography

In the Choreography approach, there is no central coordinator. Each service publishes events, and the next service subscribes to those events to perform its local transaction.

E-commerce Order Flow (Choreography):

Order Service --OrderPlaced--> Payment Service
                                    |
                            PaymentCompleted
                                    |
                                    v
                              Inventory Service
                                    |
                            InventoryReserved
                                    |
                                    v
                              Shipping Service
                                    |
                            ShipmentCreated
                                    |
                                    v
                              Order Service (status update)

On Failure (Compensating Transactions):
Inventory Service --InventoryReservationFailed--> Payment Service (refund)
                                                        |
                                                 PaymentRefunded
                                                        |
                                                        v
                                                 Order Service (cancel order)
// === Choreography Saga: Order Service ===

@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Transactional
    public UUID placeOrder(PlaceOrderCommand command) {
        Order order = Order.create(command);
        order.setStatus(OrderStatus.PENDING);
        orderRepository.save(order);

        // Publish event - Payment Service subscribes
        kafkaTemplate.send("order-events", new OrderPlacedEvent(
            order.getId(),
            order.getCustomerId(),
            order.getTotalAmount(),
            order.getItems()
        ));

        return order.getId();
    }

    // Payment completed event listener
    @KafkaListener(topics = "payment-events", groupId = "order-service")
    public void onPaymentCompleted(PaymentCompletedEvent event) {
        Order order = orderRepository.findById(event.orderId()).orElseThrow();
        order.setStatus(OrderStatus.PAID);
        orderRepository.save(order);
    }

    // Payment failed event listener
    @KafkaListener(topics = "payment-events", groupId = "order-service")
    public void onPaymentFailed(PaymentFailedEvent event) {
        Order order = orderRepository.findById(event.orderId()).orElseThrow();
        order.setStatus(OrderStatus.CANCELLED);
        order.setCancellationReason(event.reason());
        orderRepository.save(order);
    }
}
// === Choreography Saga: Payment Service ===

@Service
@RequiredArgsConstructor
public class PaymentService {

    private final PaymentRepository paymentRepository;
    private final PaymentGateway paymentGateway;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @KafkaListener(topics = "order-events", groupId = "payment-service")
    @Transactional
    public void onOrderPlaced(OrderPlacedEvent event) {
        try {
            // Process payment
            PaymentResult result = paymentGateway.charge(
                event.customerId(),
                event.totalAmount()
            );

            Payment payment = Payment.create(
                event.orderId(),
                event.customerId(),
                event.totalAmount(),
                result.transactionId()
            );
            paymentRepository.save(payment);

            // Publish success event - Inventory Service subscribes
            kafkaTemplate.send("payment-events", new PaymentCompletedEvent(
                event.orderId(),
                payment.getId(),
                result.transactionId()
            ));

        } catch (PaymentException e) {
            // Publish failure event - Order Service subscribes and cancels
            kafkaTemplate.send("payment-events", new PaymentFailedEvent(
                event.orderId(),
                e.getMessage()
            ));
        }
    }
}

Choreography Pros and Cons:

ProsCons
Loose coupling between servicesDifficult to understand full flow
Each service independently deployableComplex debugging
No Single Point of Failure (SPOF)Possible circular dependencies
Relatively simple implementationEvent tracking complexity grows with services

4-2. Orchestration

In the Orchestration approach, a central coordinator (Orchestrator) manages the entire workflow. It sends commands to each service and decides the next step based on responses.

E-commerce Order Flow (Orchestration):

                    Order Saga Orchestrator
                   /          |          \
                  v           v           v
          Payment Service  Inventory  Shipping
          (charge)         (reserve)  (create)
                  v           v           v
          (result)        (result)    (result)
                   \          |          /
                    Order Saga Orchestrator
                         (final decision)
// === Orchestration Saga: Using Temporal Framework ===

@WorkflowInterface
public interface OrderSagaWorkflow {

    @WorkflowMethod
    OrderResult processOrder(PlaceOrderCommand command);
}

@WorkflowImpl(workflows = OrderSagaWorkflow.class)
public class OrderSagaWorkflowImpl implements OrderSagaWorkflow {

    // Activity stubs for each service
    private final PaymentActivities paymentActivities =
        Workflow.newActivityStub(PaymentActivities.class,
            ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofSeconds(30))
                .setRetryOptions(RetryOptions.newBuilder()
                    .setMaximumAttempts(3)
                    .build())
                .build());

    private final InventoryActivities inventoryActivities =
        Workflow.newActivityStub(InventoryActivities.class,
            ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofSeconds(30))
                .build());

    private final ShippingActivities shippingActivities =
        Workflow.newActivityStub(ShippingActivities.class,
            ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofSeconds(60))
                .build());

    @Override
    public OrderResult processOrder(PlaceOrderCommand command) {
        String paymentId = null;
        String reservationId = null;

        try {
            // Step 1: Payment
            paymentId = paymentActivities.processPayment(
                command.customerId(),
                command.totalAmount()
            );

            // Step 2: Inventory reservation
            reservationId = inventoryActivities.reserveInventory(
                command.items()
            );

            // Step 3: Create shipment
            String shipmentId = shippingActivities.createShipment(
                command.orderId(),
                command.shippingAddress()
            );

            return OrderResult.success(command.orderId(), shipmentId);

        } catch (Exception e) {
            // Execute compensating transactions
            compensate(paymentId, reservationId);
            return OrderResult.failed(command.orderId(), e.getMessage());
        }
    }

    private void compensate(String paymentId, String reservationId) {
        // Execute compensating transactions in reverse order
        if (reservationId != null) {
            inventoryActivities.cancelReservation(reservationId);
        }
        if (paymentId != null) {
            paymentActivities.refundPayment(paymentId);
        }
    }
}
// === Activity Implementation ===

@ActivityInterface
public interface PaymentActivities {
    String processPayment(UUID customerId, BigDecimal amount);
    void refundPayment(String paymentId);
}

@Component
@RequiredArgsConstructor
public class PaymentActivitiesImpl implements PaymentActivities {

    private final PaymentGateway paymentGateway;
    private final PaymentRepository paymentRepository;

    @Override
    public String processPayment(UUID customerId, BigDecimal amount) {
        PaymentResult result = paymentGateway.charge(customerId, amount);
        Payment payment = Payment.create(customerId, amount, result.transactionId());
        paymentRepository.save(payment);
        return payment.getId().toString();
    }

    @Override
    public void refundPayment(String paymentId) {
        Payment payment = paymentRepository.findById(UUID.fromString(paymentId))
            .orElseThrow();
        paymentGateway.refund(payment.getTransactionId());
        payment.setStatus(PaymentStatus.REFUNDED);
        paymentRepository.save(payment);
    }
}

Orchestration Pros and Cons:

ProsCons
Entire workflow managed in one placeOrchestrator can become SPOF
Easy debugging and monitoringHigher coupling than Choreography
Easier complex compensation logicRisk of logic concentration in orchestrator
Framework support (Temporal/Cadence)Additional infrastructure (workflow engine) needed

4-3. Choreography vs Orchestration Selection Guide

CriteriaChoreographyOrchestration
Number of services3-4 or fewer5 or more
Workflow complexityLinear, simpleConditional branching, parallel processing
Observability needsLowHigh
Team structureIndependent teamsCentral platform team
Compensation logicSimpleComplex (multiple paths)

4-4. Compensating Transactions

Compensating transactions semantically undo already committed transactions. They do not physically roll back but perform reverse actions.

// Compensating transaction mapping
public enum CompensationAction {
    PAYMENT_CHARGE    -> PAYMENT_REFUND,
    INVENTORY_RESERVE -> INVENTORY_RELEASE,
    SHIPPING_CREATE   -> SHIPPING_CANCEL,
    COUPON_APPLY      -> COUPON_RESTORE,
    POINTS_DEDUCT     -> POINTS_RESTORE
}

Semantic Compensation vs Exact Compensation:

  • Exact Compensation: Restores precisely to original state (e.g., payment cancel -> full refund)
  • Semantic Compensation: Provides a business-meaningful compensation (e.g., flight cancellation -> partial refund + credit)

In practice, Semantic Compensation is more common because exact restoration is often impossible after time passes.


5. Event Sourcing

Event Sourcing stores all events that caused state changes in order, instead of storing only the current state of an entity.

5-1. Core Concept

Traditional CRUD:
  Account table: id=1, balance=750

Event Sourcing:
  Event #1: AccountCreated(id=1, initialBalance=1000)
  Event #2: MoneyWithdrawn(id=1, amount=200)
  Event #3: MoneyDeposited(id=1, amount=150)
  Event #4: MoneyWithdrawn(id=1, amount=200)
  -> Replay: 1000 - 200 + 150 - 200 = 750 (current balance)

All state changes are appended to an immutable, append-only event log. No deletions or updates.

5-2. Event Store Implementation

// === Event Store Interface ===

public interface EventStore {
    void saveEvents(UUID aggregateId, List<DomainEvent> events, int expectedVersion);
    List<DomainEvent> getEvents(UUID aggregateId);
    List<DomainEvent> getEvents(UUID aggregateId, int fromVersion);
}

// === PostgreSQL-based Event Store Implementation ===

@Repository
@RequiredArgsConstructor
public class PostgresEventStore implements EventStore {

    private final JdbcTemplate jdbcTemplate;
    private final ObjectMapper objectMapper;

    @Override
    @Transactional
    public void saveEvents(UUID aggregateId, List<DomainEvent> events,
                          int expectedVersion) {
        // Optimistic concurrency control
        Integer currentVersion = jdbcTemplate.queryForObject(
            "SELECT COALESCE(MAX(version), 0) FROM event_store WHERE aggregate_id = ?",
            Integer.class, aggregateId
        );

        if (currentVersion != expectedVersion) {
            throw new ConcurrencyException(
                "Concurrency conflict: expected version " + expectedVersion +
                ", actual version " + currentVersion
            );
        }

        int version = expectedVersion;
        for (DomainEvent event : events) {
            version++;
            jdbcTemplate.update(
                """
                INSERT INTO event_store
                (event_id, aggregate_id, aggregate_type, event_type,
                 event_data, version, created_at)
                VALUES (?, ?, ?, ?, ?::jsonb, ?, ?)
                """,
                UUID.randomUUID(),
                aggregateId,
                event.getAggregateType(),
                event.getClass().getSimpleName(),
                objectMapper.writeValueAsString(event),
                version,
                Instant.now()
            );
        }
    }

    @Override
    public List<DomainEvent> getEvents(UUID aggregateId) {
        return jdbcTemplate.query(
            "SELECT * FROM event_store WHERE aggregate_id = ? ORDER BY version",
            this::mapRowToEvent,
            aggregateId
        );
    }
}
-- Event Store table schema
CREATE TABLE event_store (
    event_id       UUID PRIMARY KEY,
    aggregate_id   UUID NOT NULL,
    aggregate_type VARCHAR(255) NOT NULL,
    event_type     VARCHAR(255) NOT NULL,
    event_data     JSONB NOT NULL,
    version        INTEGER NOT NULL,
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    UNIQUE (aggregate_id, version)  -- optimistic concurrency control
);

CREATE INDEX idx_event_store_aggregate ON event_store (aggregate_id, version);
CREATE INDEX idx_event_store_type ON event_store (event_type, created_at);

5-3. Applying Events on Aggregates

// === Event-Sourced Aggregate ===

public class OrderAggregate {

    private UUID id;
    private UUID customerId;
    private OrderStatus status;
    private BigDecimal totalAmount;
    private List<OrderItem> items = new ArrayList<>();
    private int version = 0;

    // Uncommitted events list
    private final List<DomainEvent> uncommittedEvents = new ArrayList<>();

    // Restore state from event history
    public static OrderAggregate fromHistory(List<DomainEvent> history) {
        OrderAggregate aggregate = new OrderAggregate();
        for (DomainEvent event : history) {
            aggregate.apply(event, false);
        }
        return aggregate;
    }

    // Handle new commands
    public void placeOrder(UUID customerId, List<OrderItem> items) {
        if (this.status != null) {
            throw new IllegalStateException("Order already created");
        }

        BigDecimal total = items.stream()
            .map(item -> item.price().multiply(BigDecimal.valueOf(item.quantity())))
            .reduce(BigDecimal.ZERO, BigDecimal::add);

        apply(new OrderPlacedEvent(this.id, customerId, total, items, Instant.now()), true);
    }

    public void cancelOrder(String reason) {
        if (this.status == OrderStatus.SHIPPED) {
            throw new IllegalStateException("Cannot cancel shipped orders");
        }
        apply(new OrderCancelledEvent(this.id, reason, Instant.now()), true);
    }

    // Apply event (state change)
    private void apply(DomainEvent event, boolean isNew) {
        when(event);  // State mutation
        this.version++;
        if (isNew) {
            uncommittedEvents.add(event);
        }
    }

    // State change logic per event type
    private void when(DomainEvent event) {
        switch (event) {
            case OrderPlacedEvent e -> {
                this.id = e.orderId();
                this.customerId = e.customerId();
                this.totalAmount = e.totalAmount();
                this.items = new ArrayList<>(e.items());
                this.status = OrderStatus.PLACED;
            }
            case OrderCancelledEvent e -> {
                this.status = OrderStatus.CANCELLED;
            }
            case OrderShippedEvent e -> {
                this.status = OrderStatus.SHIPPED;
            }
            default -> throw new IllegalArgumentException("Unknown event: " + event);
        }
    }

    public List<DomainEvent> getUncommittedEvents() {
        return Collections.unmodifiableList(uncommittedEvents);
    }

    public void clearUncommittedEvents() {
        uncommittedEvents.clear();
    }
}

5-4. Snapshot Pattern

When tens of thousands of events accumulate, replaying all of them every time is inefficient. Snapshots save the aggregate state at specific intervals, so only events after the snapshot need to be replayed.

// Snapshot save and restore
@Service
public class SnapshotService {

    private static final int SNAPSHOT_INTERVAL = 100; // every 100 events

    public OrderAggregate loadAggregate(UUID aggregateId) {
        // 1. Get latest snapshot
        Optional<Snapshot> snapshot = snapshotRepository
            .findLatest(aggregateId);

        if (snapshot.isPresent()) {
            // 2. Restore state from snapshot
            OrderAggregate aggregate = deserialize(snapshot.get().getData());
            int fromVersion = snapshot.get().getVersion();

            // 3. Replay only events after snapshot
            List<DomainEvent> newEvents = eventStore
                .getEvents(aggregateId, fromVersion + 1);
            for (DomainEvent event : newEvents) {
                aggregate.apply(event, false);
            }
            return aggregate;

        } else {
            // No snapshot, replay all
            List<DomainEvent> allEvents = eventStore.getEvents(aggregateId);
            return OrderAggregate.fromHistory(allEvents);
        }
    }

    public void saveIfNeeded(OrderAggregate aggregate) {
        if (aggregate.getVersion() % SNAPSHOT_INTERVAL == 0) {
            snapshotRepository.save(new Snapshot(
                aggregate.getId(),
                aggregate.getVersion(),
                serialize(aggregate),
                Instant.now()
            ));
        }
    }
}

5-5. Event Sourcing vs CRUD Comparison

CriteriaCRUDEvent Sourcing
StorageCurrent state onlyAll state change events
Audit trailRequires separate implementationBuilt-in (events are the audit log)
Time travelNot possiblePossible (reconstruct state at any point)
Storage volumeRelatively smallGrows with event accumulation
ComplexityLowHigh (event design, snapshots, etc.)
ConcurrencyPessimistic/optimistic lockingEvent version-based optimistic control
PerformanceEasy read optimizationReplay cost (mitigated by snapshots)
Schema changesMigration requiredEvent upcasting
CQRS integrationOptionalNatural combination

5-6. When NOT to Use Event Sourcing

  • Simple CRUD domains (blogs, configuration management)
  • Strong consistency is mandatory (ATMs requiring real-time balance checks)
  • Unstable event design in early stages (frequent changes mean high upcasting cost)
  • Team lacks experience (steep learning curve)
  • Regulatory requirements for data deletion (conflicts with GDPR right to be forgotten)

6. Outbox Pattern (The Core of Reliability)

6-1. The Problem: Dual Write

One of the most common mistakes in distributed systems is performing database saves and message publishing as separate operations.

// Dangerous code: Dual write problem
@Transactional
public void createOrder(Order order) {
    orderRepository.save(order);           // Step 1: DB save
    kafkaTemplate.send("orders", event);   // Step 2: Message publish
    // Step 1 success + Step 2 failure = In DB but event not published
    // Step 1 success + Step 2 success + DB commit failure = Event published but not in DB
}

DB transactions and message broker sends are different infrastructure, so they cannot be processed atomically.

6-2. Solution: Outbox Pattern

The Outbox Pattern stores events in an outbox table within the DB transaction, and a separate process delivers them to the message broker.

1. Business logic + outbox INSERT = single DB transaction (atomicity guaranteed)
2. Separate process (CDC/Poller) reads outbox table and publishes to Kafka
3. After successful publish, mark outbox record as completed
-- Outbox table schema
CREATE TABLE outbox (
    id              UUID PRIMARY KEY,
    aggregate_type  VARCHAR(255) NOT NULL,
    aggregate_id    UUID NOT NULL,
    event_type      VARCHAR(255) NOT NULL,
    payload         JSONB NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMPTZ,
    status          VARCHAR(20) DEFAULT 'PENDING'
);

CREATE INDEX idx_outbox_pending ON outbox (status, created_at)
    WHERE status = 'PENDING';
// === Outbox Pattern Implementation ===

@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;

    @Transactional  // Guaranteed by single transaction
    public UUID createOrder(CreateOrderCommand command) {
        // 1. Execute business logic
        Order order = Order.create(command);
        orderRepository.save(order);

        // 2. Save event to outbox in same transaction
        OutboxMessage message = OutboxMessage.builder()
            .aggregateType("Order")
            .aggregateId(order.getId())
            .eventType("OrderCreated")
            .payload(objectMapper.writeValueAsString(
                new OrderCreatedEvent(order.getId(), order.getTotalAmount())
            ))
            .build();
        outboxRepository.save(message);

        return order.getId();
        // When transaction commits, both order and outbox are saved together
    }
}

6-3. CDC (Debezium) Approach

Debezium monitors the DB transaction log (WAL) to capture changes and deliver them to Kafka.

{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "dbz_password",
    "database.dbname": "orderdb",
    "database.server.name": "order-service",
    "table.include.list": "public.outbox",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.by.field": "aggregate_type",
    "transforms.outbox.route.topic.replacement": "events.order"
  }
}

6-4. Polling vs CDC Comparison

CriteriaPollingCDC (Debezium)
LatencyDepends on polling interval (seconds-minutes)Near real-time (ms)
DB loadPeriodic queries cause loadMinimal (reads WAL)
Implementation complexityLow (scheduler + query)High (Debezium infrastructure)
Order guaranteeOutbox table orderWAL order preserved
Operational overheadLowKafka Connect cluster needed
ScalabilitySingle poller bottleneckHighly scalable

7. Idempotency Patterns

In distributed systems, messages are delivered at-least-once. Network failures, broker restarts, consumer redeployments cause duplicate delivery. Idempotency guarantees that processing the same message multiple times produces the same result as processing it once.

7-1. Producer Idempotency (Kafka)

# Kafka Producer configuration
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.properties.acks=all
spring.kafka.producer.properties.max.in.flight.requests.per.connection=5
spring.kafka.producer.properties.retries=2147483647

Kafka's idempotent producer assigns sequence numbers to each message, enabling the broker to detect duplicates.

7-2. Consumer Idempotency

// === Idempotent Consumer Implementation ===

@Component
@RequiredArgsConstructor
public class IdempotentOrderEventHandler {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderProjectionRepository orderProjectionRepository;

    @KafkaListener(topics = "order-events", groupId = "order-projection")
    @Transactional
    public void handleOrderEvent(
            @Payload OrderEvent event,
            @Header(KafkaHeaders.RECEIVED_KEY) String key,
            @Header("eventId") String eventId) {

        // 1. Check if event already processed
        if (processedEventRepository.existsById(eventId)) {
            log.info("Ignoring duplicate event: eventId={}", eventId);
            return;
        }

        // 2. Process business logic
        processEvent(event);

        // 3. Record as processed (same transaction)
        processedEventRepository.save(new ProcessedEvent(
            eventId,
            event.getClass().getSimpleName(),
            Instant.now()
        ));
    }

    private void processEvent(OrderEvent event) {
        switch (event) {
            case OrderCreatedEvent e -> createOrderProjection(e);
            case OrderStatusChangedEvent e -> updateOrderStatus(e);
            default -> log.warn("Unknown event type");
        }
    }
}
-- Processed events table
CREATE TABLE processed_events (
    event_id    VARCHAR(255) PRIMARY KEY,
    event_type  VARCHAR(255) NOT NULL,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

7-3. Unique Constraint vs Redis SET NX

ApproachProsCons
DB Unique ConstraintAtomic with transactionIncreases DB load
Redis SET NXVery fast lookupsNot guaranteed if Redis fails
DB + Redis comboFast primary filter + reliable guaranteeComplex implementation
// Redis SET NX for fast duplicate check
@Component
public class RedisIdempotencyGuard {

    private final StringRedisTemplate redisTemplate;
    private static final Duration TTL = Duration.ofHours(24);

    public boolean tryAcquire(String idempotencyKey) {
        Boolean result = redisTemplate.opsForValue()
            .setIfAbsent(
                "idempotency:" + idempotencyKey,
                "processed",
                TTL
            );
        return Boolean.TRUE.equals(result);
    }
}

8. Dead Letter Queue Strategy

When message processing fails, infinite retries can paralyze the system. DLQ (Dead Letter Queue) isolates unprocessable messages into a separate queue to ensure system stability.

8-1. Retry with Exponential Backoff

// === Spring Kafka Retry + DLQ Configuration ===

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public DefaultErrorHandler errorHandler(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // Recoverer that sends to DLQ
        DeadLetterPublishingRecoverer recoverer =
            new DeadLetterPublishingRecoverer(kafkaTemplate,
                (record, ex) -> new TopicPartition(
                    record.topic() + ".DLQ", record.partition()));

        // Exponential Backoff retry configuration
        ExponentialBackOff backOff = new ExponentialBackOff();
        backOff.setInitialInterval(1000L);    // 1 second
        backOff.setMultiplier(2.0);            // 2x increase
        backOff.setMaxInterval(60000L);        // max 60 seconds
        backOff.setMaxElapsedTime(300000L);    // max 5 minutes

        DefaultErrorHandler errorHandler =
            new DefaultErrorHandler(recoverer, backOff);

        // Non-retryable exceptions (business errors)
        errorHandler.addNotRetryableExceptions(
            InvalidOrderException.class,
            DuplicateEventException.class
        );

        return errorHandler;
    }
}

8-2. DLQ Monitoring and Alerting

// === DLQ Monitoring Service ===

@Component
@RequiredArgsConstructor
public class DlqMonitoringService {

    private final MeterRegistry meterRegistry;
    private final AlertService alertService;

    @KafkaListener(topics = "order-events.DLQ", groupId = "dlq-monitor")
    public void monitorDlq(
            ConsumerRecord<String, Object> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String errorMessage,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic) {

        // Record metrics
        meterRegistry.counter("dlq.messages.received",
            "originalTopic", originalTopic,
            "errorType", classifyError(errorMessage)
        ).increment();

        // Send alert
        alertService.sendAlert(DlqAlert.builder()
            .originalTopic(originalTopic)
            .messageKey(record.key())
            .errorMessage(errorMessage)
            .timestamp(Instant.ofEpochMilli(record.timestamp()))
            .severity(classifySeverity(errorMessage))
            .build()
        );

        log.error("DLQ message received: topic={}, key={}, error={}",
            originalTopic, record.key(), errorMessage);
    }
}

8-3. DLQ Reprocessing Pipeline

// === DLQ Message Reprocessing ===

@RestController
@RequestMapping("/api/dlq")
@RequiredArgsConstructor
public class DlqReprocessController {

    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final DlqMessageRepository dlqRepository;

    // Single message reprocessing
    @PostMapping("/reprocess/single")
    public ResponseEntity<String> reprocessSingle(
            @RequestParam String messageId) {
        DlqMessage message = dlqRepository.findById(messageId).orElseThrow();

        // Republish to original topic
        kafkaTemplate.send(
            message.getOriginalTopic(),
            message.getKey(),
            message.getValue()
        );

        message.setStatus("REPROCESSED");
        message.setReprocessedAt(Instant.now());
        dlqRepository.save(message);

        return ResponseEntity.ok("Reprocessing requested: " + messageId);
    }

    // Batch reprocessing
    @PostMapping("/reprocess/batch")
    public ResponseEntity<String> reprocessBatch(
            @RequestParam String originalTopic,
            @RequestParam(defaultValue = "100") int batchSize) {
        List<DlqMessage> messages = dlqRepository
            .findByOriginalTopicAndStatus(originalTopic, "PENDING",
                PageRequest.of(0, batchSize));

        int count = 0;
        for (DlqMessage message : messages) {
            kafkaTemplate.send(message.getOriginalTopic(),
                message.getKey(), message.getValue());
            message.setStatus("REPROCESSED");
            count++;
        }
        dlqRepository.saveAll(messages);

        return ResponseEntity.ok(count + " messages reprocessing requested");
    }
}

9. Exactly-Once Semantics

There are three levels of message delivery guarantees:

Guarantee LevelDescriptionComplexity
At-most-onceDelivered at most once (may be lost)Low
At-least-onceDelivered at least once (may be duplicated)Medium
Exactly-onceDelivered exactly onceHigh

9-1. Kafka Exactly-Once Implementation

Kafka supports Exactly-Once Semantics (EOS) since version 0.11.

// === Kafka Exactly-Once Configuration ===

@Configuration
public class KafkaExactlyOnceConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTransactionManager<String, Object> kafkaTransactionManager(
            ProducerFactory<String, Object> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new DefaultKafkaConsumerFactory<>(config);
    }
}

9-2. End-to-End Exactly-Once

True End-to-End Exactly-Once must guarantee from Producer through Consumer to the final state persistence.

End-to-End Exactly-Once Chain:

Producer (idempotent + transactions)
    -> Kafka Broker (transaction log)
        -> Consumer (read_committed + manual offset)
            -> Database (idempotency key + business logic)

All links in the chain must work together for Exactly-Once guarantee
// === End-to-End Exactly-Once Processing ===

@Component
@RequiredArgsConstructor
public class ExactlyOnceOrderProcessor {

    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final OrderRepository orderRepository;
    private final ProcessedOffsetRepository offsetRepository;

    @KafkaListener(topics = "payment-completed")
    @Transactional("chainedKafkaTransactionManager")
    public void processPaymentCompleted(
            ConsumerRecord<String, PaymentCompletedEvent> record) {

        String offsetKey = record.topic() + "-" +
            record.partition() + "-" + record.offset();

        // Check if offset already processed
        if (offsetRepository.existsById(offsetKey)) {
            return;
        }

        // Business logic
        PaymentCompletedEvent event = record.value();
        Order order = orderRepository.findById(event.orderId()).orElseThrow();
        order.markAsPaid(event.paymentId());
        orderRepository.save(order);

        // Publish next step event (same Kafka transaction)
        kafkaTemplate.send("order-paid",
            new OrderPaidEvent(order.getId(), event.paymentId()));

        // Record processed offset (same DB transaction)
        offsetRepository.save(new ProcessedOffset(offsetKey, Instant.now()));
    }
}

10. Reactive Streams vs Virtual Threads (2025)

10-1. Project Reactor: Backpressure as Core Design

Reactive Streams have non-blocking I/O and backpressure mechanisms as core design principles.

// === Reactor-based Async Processing ===

@Service
public class ReactiveOrderService {

    public Flux<OrderEvent> processOrderStream() {
        return kafkaReceiver.receive()
            .flatMap(record -> {
                OrderEvent event = record.value();
                return processEvent(event)
                    .then(record.receiverOffset().commit())
                    .thenReturn(event);
            }, 16)  // Concurrency limit (backpressure)
            .onErrorResume(e -> {
                log.error("Processing failed", e);
                return Mono.empty();
            })
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
    }

    private Mono<Void> processEvent(OrderEvent event) {
        return Mono.fromCallable(() -> {
            // Business logic
            return null;
        }).subscribeOn(Schedulers.boundedElastic()).then();
    }
}

10-2. Java 21 Virtual Threads: Simpler Code

Virtual Threads achieve high concurrency while maintaining traditional synchronous code style.

// === Virtual Threads-based Async Processing ===

@Configuration
public class VirtualThreadConfig {

    @Bean
    public TomcatProtocolHandlerCustomizer<?> protocolHandlerCustomizer() {
        return protocolHandler -> {
            protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
        };
    }
}

@Service
public class VirtualThreadOrderService {

    // Synchronous code but running on Virtual Threads
    // -> blocking I/O only blocks Virtual Thread, not OS thread
    public OrderResult processOrder(UUID orderId) {
        // Parallel processing: using StructuredTaskScope
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

            Subtask<PaymentResult> paymentTask = scope.fork(() ->
                paymentService.processPayment(orderId)
            );
            Subtask<InventoryResult> inventoryTask = scope.fork(() ->
                inventoryService.checkInventory(orderId)
            );

            scope.join();
            scope.throwIfFailed();

            return new OrderResult(
                paymentTask.get(),
                inventoryTask.get()
            );
        }
    }
}

10-3. Selection Guide: Decision Matrix

CriteriaReactive (Reactor/WebFlux)Virtual Threads
Code complexityHigh (reactive operator learning)Low (existing sync code)
Backpressure controlBuilt-in (Flux/Mono)Manual implementation needed
DebuggingDifficult (complex stack traces)Easy (traditional stack traces)
Memory efficiencyExcellentGood (better than OS threads)
I/O-intensive workloadsOptimalVery good
CPU-intensive workloadsNot idealNot ideal (use platform threads)
Legacy code migrationComplete rewrite neededConfiguration change only
Ecosystem maturityHigh (5+ years)Growing (2023~)

Recommendations:

  • New project + simple I/O: Virtual Threads
  • Streaming data processing: Reactive Streams
  • Existing Spring MVC project: Virtual Threads (minimal migration cost)
  • Real-time data pipeline: Reactive Streams

11. Production Architecture: Payment System Design

A complete e-commerce payment system architecture combining all patterns.

                     E-commerce Payment System Architecture
                     ======================================

[Client] -> [API Gateway]
                |
                v
        [Order Service] --- Write DB (PostgreSQL)
           |                     |
           | (Outbox Pattern)    | (CDC - Debezium)
           |                     v
           |              [Kafka: order-events]
           |                /        |        \
           v               v        v         v
    [Saga Orchestrator]  [Payment] [Inventory] [Notification]
    (Temporal)           Service   Service     Service
           |                |        |
           | commands       |        |
           v                v        v
    [payment-commands]  [payment-  [inventory-
                        events]    events]
                           |
                    [DLQ + Monitoring]
                           |
                    [Alerting (PagerDuty)]

Read Side:
    [Kafka: order-events] -> [Order Projection] -> Read DB (MongoDB)
                           -> [Analytics Service] -> Data Warehouse
                           -> [Search Service] -> Elasticsearch

Applied Patterns Summary

PatternApplied AtPurpose
CQRSOrder ServiceSeparate order read/write
Saga (Orchestration)Temporal OrchestratorPayment-inventory-shipping transaction mgmt
Event SourcingOrder AggregateOrder state change history
Outbox PatternOrder Service to KafkaDB-message atomicity guarantee
IdempotencyAll ConsumersPrevent duplicate message processing
DLQAll Kafka ConsumersIsolate failed messages
Exactly-OncePayment processingPrevent double charging

Quiz

Q1. When the Write Model and Read Model in CQRS use different databases, what is the most commonly used approach to ensure data consistency between the two models?

Answer: Event-based asynchronous synchronization (Eventual Consistency)

When the Write Model changes state, it publishes events. The Read Model's projection handler subscribes to these events and updates the Read DB. There may be temporary inconsistency (ms to seconds) between the two models, but consistency is eventually guaranteed.

If strong consistency is required, reconsider whether CQRS is the right fit.

Q2. In a Choreography Saga, when Service A publishes an event and Service B fails to process it, how does Service A become aware of the failure?

Answer: Service B publishes a failure event (e.g., PaymentFailed), and Service A subscribes to it to become aware. In Choreography, each service must publish both success and failure events, and related services subscribe to respond appropriately.

This is one of the downsides of Choreography. As the number of services grows, event flow tracking becomes increasingly complex, and timeout-based detection may be needed as an additional mechanism.

Q3. When an Aggregate in Event Sourcing has 1 million accumulated events, what is the key technique to solve the performance issue of loading it?

Answer: Snapshot Pattern

Periodically (e.g., every 100 events), save the current state of the Aggregate as a snapshot. When loading, start from the most recent snapshot and only replay events after that point.

Example: Out of 1 million events, if the last snapshot is at event 999,900, only 100 events need to be replayed.

Q4. In the Outbox Pattern, what is the main reason the CDC (Change Data Capture) approach is preferred over the Polling approach?

Answer: CDC reads the DB transaction log (WAL) directly, detecting changes in near real-time (milliseconds). Polling executes queries periodically, causing higher latency (seconds to minutes) and additional DB load from frequent queries.

Additionally, CDC delivers events in WAL order, naturally preserving ordering, and since it reads the transaction log, there is virtually no additional query overhead.

Q5. To achieve Exactly-Once Semantics in Kafka, what configuration is needed at the Producer, Broker, and Consumer levels respectively?

Answer:

  • Producer: enable.idempotence=true + transactional.id configuration (idempotent producer + transactions)
  • Broker: Transaction log maintenance (enabled by default)
  • Consumer: isolation.level=read_committed + manual offset commit + idempotent processing

For End-to-End Exactly-Once, the Consumer must also ensure idempotency when saving results to the DB. Kafka transactions only guarantee Exactly-Once within Kafka; to include external systems (DB), consumer-side idempotency is essential.


References

  1. Martin Fowler — "Event Sourcing" (martinfowler.com)
  2. Chris Richardson — "Microservices Patterns" (Manning, 2018)
  3. Vaughn Vernon — "Implementing Domain-Driven Design" (Addison-Wesley, 2013)
  4. Greg Young — "CQRS Documents" (cqrs.files.wordpress.com)
  5. Apache Kafka Documentation — "Exactly-Once Semantics" (kafka.apache.org)
  6. Debezium Documentation — "Outbox Event Router" (debezium.io)
  7. Temporal.io Documentation — "Saga Pattern with Temporal" (docs.temporal.io)
  8. Netflix Tech Blog — "Evolution of the Netflix Data Pipeline" (netflixtechblog.com)
  9. Uber Engineering Blog — "Building Reliable Reprocessing and Dead Letter Queues" (eng.uber.com)
  10. Confluent Blog — "Exactly-Once Semantics Are Possible" (confluent.io)
  11. AWS Architecture Blog — "Saga Orchestration Pattern" (aws.amazon.com)
  12. Microsoft Azure Docs — "CQRS Pattern" (learn.microsoft.com)
  13. Spring for Apache Kafka Reference — "Error Handling and DLQ" (docs.spring.io)
  14. JEP 444 — "Virtual Threads" (openjdk.org)
  15. Project Reactor Reference — "Backpressure" (projectreactor.io)
  16. Pat Helland — "Idempotence Is Not a Medical Condition" (ACM Queue, 2012)