Skip to content

Split View: 비동기 처리 패턴 완전 정복: CQRS, Saga, Event Sourcing — 시니어 백엔드 개발자의 필수 무기

✨ Learn with Quiz
|

비동기 처리 패턴 완전 정복: CQRS, Saga, Event Sourcing — 시니어 백엔드 개발자의 필수 무기

1. 왜 비동기인가? 대규모 시스템의 생존 전략

현대 대규모 시스템은 동기식 요청-응답 모델로는 감당할 수 없는 규모의 트래픽을 처리합니다.

실제 기업 사례:

기업규모비동기 처리 방식
Netflix2.6억 구독자, 초당 수백만 이벤트Apache Kafka 기반 이벤트 스트리밍
Uber일일 페타바이트 실시간 데이터Apache Kafka + Apache Flink
LinkedIn7조+ 메시지/일 Kafka 처리자체 개발 Kafka 인프라
Shopify블랙프라이데이 초당 80만 요청비동기 주문 처리 파이프라인

동기식 아키텍처에서 서비스 A가 서비스 B를 호출하고, B가 C를 호출하면 전체 레이턴시는 각 서비스 호출 시간의 합이 됩니다. 하나의 서비스가 느려지면 전체 체인이 블로킹됩니다.

비동기 아키텍처에서는 메시지 브로커를 통해 서비스 간 결합도를 낮추고, 각 서비스가 독립적으로 확장 가능합니다.

동기식:  ClientA(50ms)B(100ms)C(200ms) = 350ms 총 대기
비동기식: ClientA(50ms)Message Broker  (B, C 병렬 처리)
         Client는 50ms 후 응답, BC는 비동기로 처리

비동기 전환의 핵심 이점:

  1. 시간적 결합 제거 — 생산자와 소비자가 동시에 가동될 필요 없음
  2. 독립적 확장 — 읽기와 쓰기 워크로드를 별도로 스케일링
  3. 장애 격리 — 하나의 서비스 장애가 전체에 전파되지 않음
  4. 피크 트래픽 흡수 — 메시지 큐가 버퍼 역할

그러나 비동기는 공짜가 아닙니다. Eventual Consistency(최종 일관성), 메시지 순서 보장, 중복 처리, 분산 트랜잭션 같은 새로운 문제가 생깁니다. 이 글에서 다루는 패턴들은 바로 이 문제들에 대한 해답입니다.


2. Event-Driven Architecture 기초

이벤트 주도 아키텍처(EDA)는 비동기 패턴의 토대입니다. 시스템의 상태 변경을 "이벤트"로 표현하고, 관심 있는 서비스가 이를 구독하여 반응합니다.

2-1. Domain Events vs Integration Events

구분Domain EventIntegration Event
범위단일 바운디드 컨텍스트 내부바운디드 컨텍스트 간
예시OrderItemAddedOrderPlaced
전달 방식인메모리 이벤트 버스메시지 브로커 (Kafka, RabbitMQ)
스키마 관리내부적이므로 자유롭게 변경 가능계약(Contract) 관리 필수
// Domain Event - 바운디드 컨텍스트 내부
public record OrderItemAdded(
    UUID orderId,
    UUID productId,
    int quantity,
    BigDecimal price,
    Instant occurredAt
) implements DomainEvent {}

// Integration Event - 외부 서비스에 발행
public record OrderPlacedEvent(
    UUID orderId,
    UUID customerId,
    BigDecimal totalAmount,
    List<OrderLineItem> items,
    Instant occurredAt,
    int version  // 스키마 버전 관리
) implements IntegrationEvent {}

2-2. Event Bus vs Event Store

Event Bus (예: Kafka, RabbitMQ)는 이벤트를 전달하는 파이프라인입니다. 소비자가 이벤트를 수신하면 보통 브로커에서 삭제(또는 TTL 이후 만료)됩니다.

Event Store는 모든 이벤트를 영구적으로 보관하는 저장소입니다. 이벤트를 재생(replay)하여 어떤 시점의 상태든 복원할 수 있습니다.

Event Bus (Kafka):
  Producer[Topic: orders]Consumer Group A (주문 서비스)
Consumer Group B (알림 서비스)
Consumer Group C (분석 서비스)

Event Store:
  [Event #1: OrderCreated]
  [Event #2: ItemAdded]
  [Event #3: PaymentReceived]
  [Event #4: OrderShipped]
  → 이벤트 #1~#4를 재생하면 현재 주문 상태 복원 가능

2-3. Eventual Consistency 이해

분산 시스템에서 강한 일관성(Strong Consistency)을 유지하려면 분산 잠금, 2PC(2-Phase Commit) 등이 필요하지만 성능과 가용성이 크게 떨어집니다.

CAP 정리에 따라, 네트워크 파티션 상황에서 일관성(C)과 가용성(A) 중 하나만 선택해야 합니다. 대부분의 대규모 시스템은 가용성(A)을 선택하고 Eventual Consistency를 수용합니다.

Eventual Consistency란 "지금 당장은 데이터가 불일치할 수 있지만, 충분한 시간이 지나면 모든 노드가 동일한 상태로 수렴한다"는 것입니다.

실무에서의 의미:

  • 주문 후 재고가 즉시 줄어들지 않을 수 있음 (수 밀리초~수 초 지연)
  • 사용자에게 "주문이 처리 중입니다" 상태를 먼저 보여주고, 확정 후 업데이트
  • 일관성 창(Consistency Window)을 최소화하는 것이 핵심

3. CQRS 딥다이브

CQRS(Command Query Responsibility Segregation)는 읽기와 쓰기 모델을 완전히 분리하는 패턴입니다.

3-1. 기본 개념

전통적인 CRUD 모델에서는 하나의 데이터 모델이 읽기와 쓰기를 모두 담당합니다. 하지만 실제로 읽기와 쓰기는 매우 다른 요구사항을 가집니다.

전통적 CRUD:
  Client[단일 모델]Database
           (읽기/쓰기 모두)

CQRS:
  Client ──Command──→ [Write Model]Command DB (정규화)
Events
  Client ←──Query───← [Read Model]Read DB (비정규화, 최적화)

Write Model (명령 측):

  • 비즈니스 규칙 검증
  • 도메인 불변식(Invariant) 보장
  • 정규화된 스키마
  • 강한 일관성

Read Model (질의 측):

  • 조회에 최적화된 비정규화 뷰
  • 다양한 저장소 사용 가능 (Elasticsearch, Redis, MongoDB)
  • Eventual Consistency 허용

3-2. 언제 CQRS를 사용해야 하는가?

적합한 경우:

  • 읽기/쓰기 비율이 극단적으로 다를 때 (읽기가 100:1 이상)
  • 복잡한 도메인 로직 + 다양한 조회 요구사항
  • 읽기와 쓰기의 스케일링 요구가 다를 때
  • Event Sourcing과 함께 사용할 때

부적합한 경우:

  • 단순한 CRUD 애플리케이션
  • 강한 일관성이 절대적으로 필요한 경우
  • 도메인이 단순하고 조회 패턴이 예측 가능한 경우

3-3. Spring Boot + Kafka 구현

// === Command 측: 주문 생성 ===

@Service
@RequiredArgsConstructor
public class OrderCommandService {

    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    @Transactional
    public UUID createOrder(CreateOrderCommand command) {
        // 1. 비즈니스 규칙 검증
        validateOrder(command);

        // 2. 도메인 모델에서 처리
        Order order = Order.create(
            command.customerId(),
            command.items(),
            command.shippingAddress()
        );

        // 3. Write DB에 저장
        orderRepository.save(order);

        // 4. 이벤트 발행 (Read Model 동기화용)
        OrderCreatedEvent event = new OrderCreatedEvent(
            order.getId(),
            order.getCustomerId(),
            order.getTotalAmount(),
            order.getItems().stream()
                .map(this::toEventItem)
                .toList(),
            Instant.now()
        );
        kafkaTemplate.send("order-events", order.getId().toString(), event);

        return order.getId();
    }

    private void validateOrder(CreateOrderCommand command) {
        if (command.items().isEmpty()) {
            throw new InvalidOrderException("주문 항목이 비어있습니다");
        }
        // 재고 확인, 고객 상태 확인 등
    }
}
// === Query 측: 주문 조회 프로젝션 ===

@Service
@RequiredArgsConstructor
public class OrderQueryService {

    private final OrderReadRepository readRepository;

    public OrderDetailView getOrderDetail(UUID orderId) {
        return readRepository.findById(orderId)
            .orElseThrow(() -> new OrderNotFoundException(orderId));
    }

    public Page<OrderSummaryView> getCustomerOrders(
            UUID customerId, Pageable pageable) {
        return readRepository.findByCustomerId(customerId, pageable);
    }
}

// Read Model - 조회에 최적화된 비정규화 뷰
@Document(collection = "order_views")
public record OrderDetailView(
    UUID orderId,
    UUID customerId,
    String customerName,      // 비정규화: 고객명 포함
    String customerEmail,
    BigDecimal totalAmount,
    String status,
    List<OrderItemView> items,
    AddressView shippingAddress,
    Instant createdAt,
    Instant lastUpdatedAt
) {}
// === 이벤트 핸들러: Write → Read 동기화 ===

@Component
@RequiredArgsConstructor
public class OrderProjectionHandler {

    private final OrderReadRepository readRepository;
    private final CustomerQueryClient customerClient;

    @KafkaListener(topics = "order-events", groupId = "order-projection")
    public void handleOrderEvent(OrderEvent event) {
        switch (event) {
            case OrderCreatedEvent e -> handleOrderCreated(e);
            case OrderStatusChangedEvent e -> handleStatusChanged(e);
            case OrderCancelledEvent e -> handleOrderCancelled(e);
            default -> log.warn("알 수 없는 이벤트 타입: {}", event.getClass());
        }
    }

    private void handleOrderCreated(OrderCreatedEvent event) {
        // 고객 정보 조회하여 비정규화된 뷰 생성
        CustomerInfo customer = customerClient.getCustomer(event.customerId());

        OrderDetailView view = new OrderDetailView(
            event.orderId(),
            event.customerId(),
            customer.name(),
            customer.email(),
            event.totalAmount(),
            "CREATED",
            event.items().stream().map(this::toItemView).toList(),
            null,
            event.occurredAt(),
            event.occurredAt()
        );
        readRepository.save(view);
    }
}

3-4. Read Model 동기화 전략

전략지연일관성복잡도적합한 경우
이벤트 핸들러 (Kafka)수 ms ~ 수 초Eventual중간대부분의 경우
CDC (Debezium)수 초Eventual높음레거시 시스템 연동
폴링 (Scheduled)수 초 ~ 수 분Eventual낮음단순한 경우
동기식 업데이트즉시Strong낮음성능 요구 낮을 때

4. Saga 패턴 완전 정복

마이크로서비스에서 여러 서비스에 걸친 비즈니스 트랜잭션은 어떻게 관리할까요? 기존 모놀리스의 DB 트랜잭션은 사용할 수 없습니다. Saga 패턴은 분산 트랜잭션을 일련의 로컬 트랜잭션으로 분해합니다.

4-1. Choreography (안무형)

Choreography 방식에서는 중앙 조율자가 없습니다. 각 서비스가 이벤트를 발행하면 다음 서비스가 해당 이벤트를 구독하여 자신의 로컬 트랜잭션을 수행합니다.

이커머스 주문 흐름 (Choreography):

Order Service ──OrderPlaced──→ Payment Service
                            PaymentCompleted
                              Inventory Service
                            InventoryReserved
                              Shipping Service
                            ShipmentCreated
                              Order Service (상태 업데이트)

실패  (보상 트랜잭션):
Inventory Service ──InventoryReservationFailed──→ Payment Service (환불)
                                                 PaymentRefunded
                                                 Order Service (주문 취소)
// === Choreography Saga: 주문 서비스 ===

@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Transactional
    public UUID placeOrder(PlaceOrderCommand command) {
        Order order = Order.create(command);
        order.setStatus(OrderStatus.PENDING);
        orderRepository.save(order);

        // 이벤트 발행 - 결제 서비스가 구독
        kafkaTemplate.send("order-events", new OrderPlacedEvent(
            order.getId(),
            order.getCustomerId(),
            order.getTotalAmount(),
            order.getItems()
        ));

        return order.getId();
    }

    // 결제 완료 이벤트 수신
    @KafkaListener(topics = "payment-events", groupId = "order-service")
    public void onPaymentCompleted(PaymentCompletedEvent event) {
        Order order = orderRepository.findById(event.orderId()).orElseThrow();
        order.setStatus(OrderStatus.PAID);
        orderRepository.save(order);
    }

    // 결제 실패 이벤트 수신
    @KafkaListener(topics = "payment-events", groupId = "order-service")
    public void onPaymentFailed(PaymentFailedEvent event) {
        Order order = orderRepository.findById(event.orderId()).orElseThrow();
        order.setStatus(OrderStatus.CANCELLED);
        order.setCancellationReason(event.reason());
        orderRepository.save(order);
    }
}
// === Choreography Saga: 결제 서비스 ===

@Service
@RequiredArgsConstructor
public class PaymentService {

    private final PaymentRepository paymentRepository;
    private final PaymentGateway paymentGateway;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @KafkaListener(topics = "order-events", groupId = "payment-service")
    @Transactional
    public void onOrderPlaced(OrderPlacedEvent event) {
        try {
            // 결제 처리
            PaymentResult result = paymentGateway.charge(
                event.customerId(),
                event.totalAmount()
            );

            Payment payment = Payment.create(
                event.orderId(),
                event.customerId(),
                event.totalAmount(),
                result.transactionId()
            );
            paymentRepository.save(payment);

            // 성공 이벤트 발행 - 재고 서비스가 구독
            kafkaTemplate.send("payment-events", new PaymentCompletedEvent(
                event.orderId(),
                payment.getId(),
                result.transactionId()
            ));

        } catch (PaymentException e) {
            // 실패 이벤트 발행 - 주문 서비스가 구독하여 취소
            kafkaTemplate.send("payment-events", new PaymentFailedEvent(
                event.orderId(),
                e.getMessage()
            ));
        }
    }
}

Choreography 장단점:

장점단점
서비스 간 느슨한 결합전체 흐름 파악이 어려움
각 서비스가 독립적으로 배포 가능디버깅이 복잡함
단일 장애 지점(SPOF) 없음순환 의존성 발생 가능
구현이 비교적 단순서비스가 많아지면 이벤트 추적 난이도 상승

4-2. Orchestration (지휘형)

Orchestration 방식에서는 중앙 조율자(Orchestrator)가 전체 워크플로우를 관리합니다. 각 서비스에 명령을 보내고, 응답에 따라 다음 단계를 결정합니다.

이커머스 주문 흐름 (Orchestration):

                    Order Saga Orchestrator
                   /          |          \
                  ↓           ↓           ↓
          Payment Service  Inventory  Shipping
          (charge)         (reserve)  (create)
                  ↓           ↓           
          (result)        (result)    (result)
                   \          |          /
                    Order Saga Orchestrator
                         (최종 결정)
// === Orchestration Saga: Temporal 프레임워크 사용 ===

@WorkflowInterface
public interface OrderSagaWorkflow {

    @WorkflowMethod
    OrderResult processOrder(PlaceOrderCommand command);
}

@WorkflowImpl(workflows = OrderSagaWorkflow.class)
public class OrderSagaWorkflowImpl implements OrderSagaWorkflow {

    // 각 서비스의 Activity 참조
    private final PaymentActivities paymentActivities =
        Workflow.newActivityStub(PaymentActivities.class,
            ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofSeconds(30))
                .setRetryOptions(RetryOptions.newBuilder()
                    .setMaximumAttempts(3)
                    .build())
                .build());

    private final InventoryActivities inventoryActivities =
        Workflow.newActivityStub(InventoryActivities.class,
            ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofSeconds(30))
                .build());

    private final ShippingActivities shippingActivities =
        Workflow.newActivityStub(ShippingActivities.class,
            ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofSeconds(60))
                .build());

    @Override
    public OrderResult processOrder(PlaceOrderCommand command) {
        String paymentId = null;
        String reservationId = null;

        try {
            // Step 1: 결제
            paymentId = paymentActivities.processPayment(
                command.customerId(),
                command.totalAmount()
            );

            // Step 2: 재고 예약
            reservationId = inventoryActivities.reserveInventory(
                command.items()
            );

            // Step 3: 배송 생성
            String shipmentId = shippingActivities.createShipment(
                command.orderId(),
                command.shippingAddress()
            );

            return OrderResult.success(command.orderId(), shipmentId);

        } catch (Exception e) {
            // 보상 트랜잭션 실행
            compensate(paymentId, reservationId);
            return OrderResult.failed(command.orderId(), e.getMessage());
        }
    }

    private void compensate(String paymentId, String reservationId) {
        // 역순으로 보상 트랜잭션 실행
        if (reservationId != null) {
            inventoryActivities.cancelReservation(reservationId);
        }
        if (paymentId != null) {
            paymentActivities.refundPayment(paymentId);
        }
    }
}
// === Activity 구현 ===

@ActivityInterface
public interface PaymentActivities {
    String processPayment(UUID customerId, BigDecimal amount);
    void refundPayment(String paymentId);
}

@Component
@RequiredArgsConstructor
public class PaymentActivitiesImpl implements PaymentActivities {

    private final PaymentGateway paymentGateway;
    private final PaymentRepository paymentRepository;

    @Override
    public String processPayment(UUID customerId, BigDecimal amount) {
        PaymentResult result = paymentGateway.charge(customerId, amount);
        Payment payment = Payment.create(customerId, amount, result.transactionId());
        paymentRepository.save(payment);
        return payment.getId().toString();
    }

    @Override
    public void refundPayment(String paymentId) {
        Payment payment = paymentRepository.findById(UUID.fromString(paymentId))
            .orElseThrow();
        paymentGateway.refund(payment.getTransactionId());
        payment.setStatus(PaymentStatus.REFUNDED);
        paymentRepository.save(payment);
    }
}

Orchestration 장단점:

장점단점
전체 워크플로우가 한 곳에서 관리됨오케스트레이터가 SPOF 될 수 있음
디버깅과 모니터링이 용이서비스 간 결합도가 Choreography보다 높음
복잡한 보상 로직 관리가 쉬움오케스트레이터에 로직이 집중될 위험
Temporal/Cadence 등 프레임워크 지원추가 인프라(워크플로우 엔진) 필요

4-3. Choreography vs Orchestration 선택 가이드

기준ChoreographyOrchestration
서비스 수3~4개 이하5개 이상
워크플로우 복잡도선형적, 단순조건 분기, 병렬 처리
관찰 가능성 요구낮음높음
팀 구조독립적 팀중앙 플랫폼 팀
보상 로직단순복잡 (다중 경로)

4-4. Compensating Transactions (보상 트랜잭션)

보상 트랜잭션은 이미 커밋된 트랜잭션을 의미론적으로 되돌리는 작업입니다. 물리적 롤백이 아니라 역작용(reverse action)을 수행합니다.

// 보상 트랜잭션 예시
public enum CompensationAction {
    PAYMENT_CHARGE    -> PAYMENT_REFUND,
    INVENTORY_RESERVE -> INVENTORY_RELEASE,
    SHIPPING_CREATE   -> SHIPPING_CANCEL,
    COUPON_APPLY      -> COUPON_RESTORE,
    POINTS_DEDUCT     -> POINTS_RESTORE
}

Semantic Compensation vs Exact Compensation:

  • Exact Compensation: 정확히 원래 상태로 복원 (예: 결제 취소 → 전액 환불)
  • Semantic Compensation: 비즈니스적으로 의미 있는 보상 (예: 항공권 취소 → 부분 환불 + 크레딧)

실무에서는 Semantic Compensation이 더 일반적입니다. 시간이 지나면 정확한 복원이 불가능한 경우가 많기 때문입니다.


5. Event Sourcing

Event Sourcing은 엔티티의 현재 상태를 저장하는 대신, 상태 변경을 일으킨 모든 이벤트를 순서대로 저장하는 패턴입니다.

5-1. 핵심 개념

전통적 CRUD:
  Account 테이블: id=1, balance=750

Event Sourcing:
  Event #1: AccountCreated(id=1, initialBalance=1000)
  Event #2: MoneyWithdrawn(id=1, amount=200)
  Event #3: MoneyDeposited(id=1, amount=150)
  Event #4: MoneyWithdrawn(id=1, amount=200)
  → 재생: 1000 - 200 + 150 - 200 = 750 (현재 잔고)

모든 상태 변경이 불변(immutable) 이벤트 로그에 추가(append-only)됩니다. 삭제나 수정은 없습니다.

5-2. Event Store 구현

// === Event Store 인터페이스 ===

public interface EventStore {
    void saveEvents(UUID aggregateId, List<DomainEvent> events, int expectedVersion);
    List<DomainEvent> getEvents(UUID aggregateId);
    List<DomainEvent> getEvents(UUID aggregateId, int fromVersion);
}

// === PostgreSQL 기반 Event Store 구현 ===

@Repository
@RequiredArgsConstructor
public class PostgresEventStore implements EventStore {

    private final JdbcTemplate jdbcTemplate;
    private final ObjectMapper objectMapper;

    @Override
    @Transactional
    public void saveEvents(UUID aggregateId, List<DomainEvent> events,
                          int expectedVersion) {
        // 낙관적 동시성 제어
        Integer currentVersion = jdbcTemplate.queryForObject(
            "SELECT COALESCE(MAX(version), 0) FROM event_store WHERE aggregate_id = ?",
            Integer.class, aggregateId
        );

        if (currentVersion != expectedVersion) {
            throw new ConcurrencyException(
                "동시성 충돌: 예상 버전 " + expectedVersion +
                ", 실제 버전 " + currentVersion
            );
        }

        int version = expectedVersion;
        for (DomainEvent event : events) {
            version++;
            jdbcTemplate.update(
                """
                INSERT INTO event_store
                (event_id, aggregate_id, aggregate_type, event_type,
                 event_data, version, created_at)
                VALUES (?, ?, ?, ?, ?::jsonb, ?, ?)
                """,
                UUID.randomUUID(),
                aggregateId,
                event.getAggregateType(),
                event.getClass().getSimpleName(),
                objectMapper.writeValueAsString(event),
                version,
                Instant.now()
            );
        }
    }

    @Override
    public List<DomainEvent> getEvents(UUID aggregateId) {
        return jdbcTemplate.query(
            "SELECT * FROM event_store WHERE aggregate_id = ? ORDER BY version",
            this::mapRowToEvent,
            aggregateId
        );
    }
}
-- Event Store 테이블 스키마
CREATE TABLE event_store (
    event_id       UUID PRIMARY KEY,
    aggregate_id   UUID NOT NULL,
    aggregate_type VARCHAR(255) NOT NULL,
    event_type     VARCHAR(255) NOT NULL,
    event_data     JSONB NOT NULL,
    version        INTEGER NOT NULL,
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    UNIQUE (aggregate_id, version)  -- 낙관적 동시성 제어
);

CREATE INDEX idx_event_store_aggregate ON event_store (aggregate_id, version);
CREATE INDEX idx_event_store_type ON event_store (event_type, created_at);

5-3. Aggregate에서 이벤트 적용

// === Event-Sourced Aggregate ===

public class OrderAggregate {

    private UUID id;
    private UUID customerId;
    private OrderStatus status;
    private BigDecimal totalAmount;
    private List<OrderItem> items = new ArrayList<>();
    private int version = 0;

    // 미적용 이벤트 목록
    private final List<DomainEvent> uncommittedEvents = new ArrayList<>();

    // 이벤트 재생으로 상태 복원
    public static OrderAggregate fromHistory(List<DomainEvent> history) {
        OrderAggregate aggregate = new OrderAggregate();
        for (DomainEvent event : history) {
            aggregate.apply(event, false);
        }
        return aggregate;
    }

    // 새 명령 처리
    public void placeOrder(UUID customerId, List<OrderItem> items) {
        if (this.status != null) {
            throw new IllegalStateException("이미 생성된 주문입니다");
        }

        BigDecimal total = items.stream()
            .map(item -> item.price().multiply(BigDecimal.valueOf(item.quantity())))
            .reduce(BigDecimal.ZERO, BigDecimal::add);

        apply(new OrderPlacedEvent(this.id, customerId, total, items, Instant.now()), true);
    }

    public void cancelOrder(String reason) {
        if (this.status == OrderStatus.SHIPPED) {
            throw new IllegalStateException("배송된 주문은 취소할 수 없습니다");
        }
        apply(new OrderCancelledEvent(this.id, reason, Instant.now()), true);
    }

    // 이벤트 적용 (상태 변경)
    private void apply(DomainEvent event, boolean isNew) {
        when(event);  // 상태 변경
        this.version++;
        if (isNew) {
            uncommittedEvents.add(event);
        }
    }

    // 이벤트별 상태 변경 로직
    private void when(DomainEvent event) {
        switch (event) {
            case OrderPlacedEvent e -> {
                this.id = e.orderId();
                this.customerId = e.customerId();
                this.totalAmount = e.totalAmount();
                this.items = new ArrayList<>(e.items());
                this.status = OrderStatus.PLACED;
            }
            case OrderCancelledEvent e -> {
                this.status = OrderStatus.CANCELLED;
            }
            case OrderShippedEvent e -> {
                this.status = OrderStatus.SHIPPED;
            }
            default -> throw new IllegalArgumentException("알 수 없는 이벤트: " + event);
        }
    }

    public List<DomainEvent> getUncommittedEvents() {
        return Collections.unmodifiableList(uncommittedEvents);
    }

    public void clearUncommittedEvents() {
        uncommittedEvents.clear();
    }
}

5-4. Snapshot 패턴

이벤트가 수만 개 쌓이면 매번 전체를 재생하는 것은 비효율적입니다. Snapshot을 사용하여 특정 시점의 상태를 저장해두면 해당 시점 이후의 이벤트만 재생하면 됩니다.

// Snapshot 저장 및 복원
@Service
public class SnapshotService {

    private static final int SNAPSHOT_INTERVAL = 100; // 100개 이벤트마다

    public OrderAggregate loadAggregate(UUID aggregateId) {
        // 1. 최신 스냅샷 조회
        Optional<Snapshot> snapshot = snapshotRepository
            .findLatest(aggregateId);

        if (snapshot.isPresent()) {
            // 2. 스냅샷에서 상태 복원
            OrderAggregate aggregate = deserialize(snapshot.get().getData());
            int fromVersion = snapshot.get().getVersion();

            // 3. 스냅샷 이후 이벤트만 재생
            List<DomainEvent> newEvents = eventStore
                .getEvents(aggregateId, fromVersion + 1);
            for (DomainEvent event : newEvents) {
                aggregate.apply(event, false);
            }
            return aggregate;

        } else {
            // 스냅샷 없으면 전체 재생
            List<DomainEvent> allEvents = eventStore.getEvents(aggregateId);
            return OrderAggregate.fromHistory(allEvents);
        }
    }

    public void saveIfNeeded(OrderAggregate aggregate) {
        if (aggregate.getVersion() % SNAPSHOT_INTERVAL == 0) {
            snapshotRepository.save(new Snapshot(
                aggregate.getId(),
                aggregate.getVersion(),
                serialize(aggregate),
                Instant.now()
            ));
        }
    }
}

5-5. Event Sourcing vs CRUD 비교

기준CRUDEvent Sourcing
저장 방식현재 상태만 저장모든 상태 변경 이벤트 저장
감사 추적별도 구현 필요내장 (이벤트 자체가 감사 로그)
시간 여행불가능가능 (특정 시점 상태 복원)
저장소 용량상대적으로 적음이벤트 누적으로 증가
복잡도낮음높음 (이벤트 설계, 스냅샷 등)
동시성 처리비관적/낙관적 잠금이벤트 버전 기반 낙관적 제어
성능읽기 최적화 쉬움이벤트 재생 비용 (스냅샷으로 완화)
스키마 변경마이그레이션 필요이벤트 업캐스팅(Upcasting)
CQRS 연계선택사항자연스러운 조합

5-6. Event Sourcing을 사용하지 말아야 할 때

  • 단순한 CRUD 도메인 (블로그, 설정 관리)
  • 강한 일관성이 필수인 경우 (실시간 잔고 확인이 필요한 ATM)
  • 이벤트 설계가 불안정한 초기 단계 (잦은 변경 시 이벤트 업캐스팅 비용)
  • 팀의 경험이 부족한 경우 (학습 곡선이 가파름)
  • 규제 요건으로 데이터 삭제가 필요한 경우 (GDPR right to be forgotten과 충돌)

6. Outbox Pattern (신뢰성의 핵심)

6-1. 문제: 이중 쓰기 (Dual Write)

분산 시스템에서 가장 흔한 실수 중 하나는 데이터베이스 저장과 메시지 발행을 별도로 수행하는 것입니다.

// 위험한 코드: 이중 쓰기 문제
@Transactional
public void createOrder(Order order) {
    orderRepository.save(order);           // Step 1: DB 저장
    kafkaTemplate.send("orders", event);   // Step 2: 메시지 발행
    // Step 1 성공 + Step 2 실패 = DB에는 있지만 이벤트는 발행 안 됨
    // Step 1 성공 + Step 2 성공 + DB 커밋 실패 = 이벤트는 발행됐지만 DB에 없음
}

DB 트랜잭션과 메시지 브로커 전송은 서로 다른 인프라이므로 원자적으로 처리할 수 없습니다.

6-2. 해결: Outbox 패턴

Outbox 패턴은 이벤트를 DB 트랜잭션 내에서 outbox 테이블에 저장하고, 별도 프로세스가 이를 메시지 브로커로 전달합니다.

1. 비즈니스 로직 + outbox INSERT = 단일 DB 트랜잭션 (원자성 보장)
2. 별도 프로세스(CDC/Poller)가 outbox 테이블 읽어서 Kafka로 발행
3. 발행 성공 후 outbox 레코드 처리 완료로 표시
-- Outbox 테이블 스키마
CREATE TABLE outbox (
    id              UUID PRIMARY KEY,
    aggregate_type  VARCHAR(255) NOT NULL,
    aggregate_id    UUID NOT NULL,
    event_type      VARCHAR(255) NOT NULL,
    payload         JSONB NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMPTZ,
    status          VARCHAR(20) DEFAULT 'PENDING'
);

CREATE INDEX idx_outbox_pending ON outbox (status, created_at)
    WHERE status = 'PENDING';
// === Outbox 패턴 구현 ===

@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;

    @Transactional  // 단일 트랜잭션으로 보장
    public UUID createOrder(CreateOrderCommand command) {
        // 1. 비즈니스 로직 수행
        Order order = Order.create(command);
        orderRepository.save(order);

        // 2. 같은 트랜잭션에서 outbox에 이벤트 저장
        OutboxMessage message = OutboxMessage.builder()
            .aggregateType("Order")
            .aggregateId(order.getId())
            .eventType("OrderCreated")
            .payload(objectMapper.writeValueAsString(
                new OrderCreatedEvent(order.getId(), order.getTotalAmount())
            ))
            .build();
        outboxRepository.save(message);

        return order.getId();
        // 트랜잭션 커밋 시 order와 outbox 모두 함께 저장됨
    }
}

6-3. CDC (Debezium) 방식

Debezium은 DB의 트랜잭션 로그(WAL)를 모니터링하여 변경을 캡처하고 Kafka로 전달합니다.

{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "dbz_password",
    "database.dbname": "orderdb",
    "database.server.name": "order-service",
    "table.include.list": "public.outbox",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.by.field": "aggregate_type",
    "transforms.outbox.route.topic.replacement": "events.order"
  }
}

6-4. Polling vs CDC 비교

기준PollingCDC (Debezium)
지연폴링 주기에 의존 (수 초~수 분)거의 실시간 (수 ms)
DB 부하주기적 쿼리로 부하 발생WAL 읽기로 부하 최소
구현 복잡도낮음 (스케줄러 + 쿼리)높음 (Debezium 인프라 필요)
순서 보장outbox 테이블 순서WAL 순서 그대로
운영 부담낮음Kafka Connect 클러스터 운영 필요
확장성단일 폴러 병목 가능높은 확장성

7. 멱등성 패턴

분산 시스템에서 메시지는 최소 한 번(at-least-once) 전달됩니다. 네트워크 장애, 브로커 재시작, 소비자 재배포 등으로 중복 수신이 발생합니다. 멱등성은 같은 메시지를 여러 번 처리해도 결과가 한 번 처리한 것과 동일하도록 보장합니다.

7-1. Producer 멱등성 (Kafka)

# Kafka Producer 설정
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.properties.acks=all
spring.kafka.producer.properties.max.in.flight.requests.per.connection=5
spring.kafka.producer.properties.retries=2147483647

Kafka의 멱등 프로듀서는 각 메시지에 시퀀스 번호를 부여하여 브로커가 중복을 감지합니다.

7-2. Consumer 멱등성

// === 멱등 소비자 구현 ===

@Component
@RequiredArgsConstructor
public class IdempotentOrderEventHandler {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderProjectionRepository orderProjectionRepository;

    @KafkaListener(topics = "order-events", groupId = "order-projection")
    @Transactional
    public void handleOrderEvent(
            @Payload OrderEvent event,
            @Header(KafkaHeaders.RECEIVED_KEY) String key,
            @Header("eventId") String eventId) {

        // 1. 이미 처리된 이벤트인지 확인
        if (processedEventRepository.existsById(eventId)) {
            log.info("이벤트 중복 수신 무시: eventId={}", eventId);
            return;
        }

        // 2. 비즈니스 로직 처리
        processEvent(event);

        // 3. 처리 완료 기록 (같은 트랜잭션)
        processedEventRepository.save(new ProcessedEvent(
            eventId,
            event.getClass().getSimpleName(),
            Instant.now()
        ));
    }

    private void processEvent(OrderEvent event) {
        switch (event) {
            case OrderCreatedEvent e -> createOrderProjection(e);
            case OrderStatusChangedEvent e -> updateOrderStatus(e);
            default -> log.warn("알 수 없는 이벤트 타입");
        }
    }
}
-- 이벤트 처리 이력 테이블
CREATE TABLE processed_events (
    event_id    VARCHAR(255) PRIMARY KEY,
    event_type  VARCHAR(255) NOT NULL,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

7-3. Unique Constraint vs Redis SET NX

접근법장점단점
DB Unique Constraint트랜잭션과 함께 원자적DB 부하 증가
Redis SET NX매우 빠른 조회Redis 장애 시 보장 안 됨
DB + Redis 조합빠른 1차 필터 + 확실한 보장구현 복잡
// Redis SET NX를 이용한 빠른 중복 체크
@Component
public class RedisIdempotencyGuard {

    private final StringRedisTemplate redisTemplate;
    private static final Duration TTL = Duration.ofHours(24);

    public boolean tryAcquire(String idempotencyKey) {
        Boolean result = redisTemplate.opsForValue()
            .setIfAbsent(
                "idempotency:" + idempotencyKey,
                "processed",
                TTL
            );
        return Boolean.TRUE.equals(result);
    }
}

8. Dead Letter Queue 전략

메시지 처리에 실패하면 무한 재시도는 시스템을 마비시킵니다. DLQ(Dead Letter Queue)는 처리 불가능한 메시지를 별도 큐로 격리하여 시스템의 안정성을 보장합니다.

8-1. Retry with Exponential Backoff

// === Spring Kafka 재시도 + DLQ 설정 ===

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public DefaultErrorHandler errorHandler(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // DLQ로 보내는 recoverer
        DeadLetterPublishingRecoverer recoverer =
            new DeadLetterPublishingRecoverer(kafkaTemplate,
                (record, ex) -> new TopicPartition(
                    record.topic() + ".DLQ", record.partition()));

        // Exponential Backoff 재시도 설정
        ExponentialBackOff backOff = new ExponentialBackOff();
        backOff.setInitialInterval(1000L);    // 1초
        backOff.setMultiplier(2.0);            // 2배씩 증가
        backOff.setMaxInterval(60000L);        // 최대 60초
        backOff.setMaxElapsedTime(300000L);    // 최대 5분

        DefaultErrorHandler errorHandler =
            new DefaultErrorHandler(recoverer, backOff);

        // 재시도하지 않을 예외 (비즈니스 오류)
        errorHandler.addNotRetryableExceptions(
            InvalidOrderException.class,
            DuplicateEventException.class
        );

        return errorHandler;
    }
}

8-2. DLQ 모니터링 및 알림

// === DLQ 모니터링 서비스 ===

@Component
@RequiredArgsConstructor
public class DlqMonitoringService {

    private final MeterRegistry meterRegistry;
    private final AlertService alertService;

    @KafkaListener(topics = "order-events.DLQ", groupId = "dlq-monitor")
    public void monitorDlq(
            ConsumerRecord<String, Object> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String errorMessage,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic) {

        // 메트릭 기록
        meterRegistry.counter("dlq.messages.received",
            "originalTopic", originalTopic,
            "errorType", classifyError(errorMessage)
        ).increment();

        // 알림 발송
        alertService.sendAlert(DlqAlert.builder()
            .originalTopic(originalTopic)
            .messageKey(record.key())
            .errorMessage(errorMessage)
            .timestamp(Instant.ofEpochMilli(record.timestamp()))
            .severity(classifySeverity(errorMessage))
            .build()
        );

        log.error("DLQ 메시지 수신: topic={}, key={}, error={}",
            originalTopic, record.key(), errorMessage);
    }
}

8-3. DLQ 재처리 파이프라인

// === DLQ 메시지 재처리 ===

@RestController
@RequestMapping("/api/dlq")
@RequiredArgsConstructor
public class DlqReprocessController {

    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final DlqMessageRepository dlqRepository;

    // 단건 재처리
    @PostMapping("/reprocess/single")
    public ResponseEntity<String> reprocessSingle(
            @RequestParam String messageId) {
        DlqMessage message = dlqRepository.findById(messageId).orElseThrow();

        // 원래 토픽으로 재발행
        kafkaTemplate.send(
            message.getOriginalTopic(),
            message.getKey(),
            message.getValue()
        );

        message.setStatus("REPROCESSED");
        message.setReprocessedAt(Instant.now());
        dlqRepository.save(message);

        return ResponseEntity.ok("재처리 요청 완료: " + messageId);
    }

    // 배치 재처리
    @PostMapping("/reprocess/batch")
    public ResponseEntity<String> reprocessBatch(
            @RequestParam String originalTopic,
            @RequestParam(defaultValue = "100") int batchSize) {
        List<DlqMessage> messages = dlqRepository
            .findByOriginalTopicAndStatus(originalTopic, "PENDING",
                PageRequest.of(0, batchSize));

        int count = 0;
        for (DlqMessage message : messages) {
            kafkaTemplate.send(message.getOriginalTopic(),
                message.getKey(), message.getValue());
            message.setStatus("REPROCESSED");
            count++;
        }
        dlqRepository.saveAll(messages);

        return ResponseEntity.ok(count + "건 재처리 요청 완료");
    }
}

9. Exactly-Once Semantics

메시지 전달 보장 수준은 세 가지가 있습니다.

보장 수준설명복잡도
At-most-once최대 한 번 전달 (유실 가능)낮음
At-least-once최소 한 번 전달 (중복 가능)중간
Exactly-once정확히 한 번 전달높음

9-1. Kafka Exactly-Once 구현

Kafka는 0.11부터 Exactly-Once Semantics(EOS)를 지원합니다.

// === Kafka Exactly-Once 설정 ===

@Configuration
public class KafkaExactlyOnceConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTransactionManager<String, Object> kafkaTransactionManager(
            ProducerFactory<String, Object> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new DefaultKafkaConsumerFactory<>(config);
    }
}

9-2. End-to-End Exactly-Once

진정한 End-to-End Exactly-Once는 Producer부터 Consumer의 최종 상태 저장까지 보장해야 합니다.

End-to-End Exactly-Once 체인:

Producer (idempotent + transactions)
Kafka Broker (transaction log)
Consumer (read_committed + manual offset)
Database (idempotency key + business logic)

모든 체인이 함께 동작해야 Exactly-Once가 보장됨
// === End-to-End Exactly-Once 처리 ===

@Component
@RequiredArgsConstructor
public class ExactlyOnceOrderProcessor {

    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final OrderRepository orderRepository;
    private final ProcessedOffsetRepository offsetRepository;

    @KafkaListener(topics = "payment-completed")
    @Transactional("chainedKafkaTransactionManager")
    public void processPaymentCompleted(
            ConsumerRecord<String, PaymentCompletedEvent> record) {

        String offsetKey = record.topic() + "-" +
            record.partition() + "-" + record.offset();

        // 이미 처리된 오프셋인지 확인
        if (offsetRepository.existsById(offsetKey)) {
            return;
        }

        // 비즈니스 로직
        PaymentCompletedEvent event = record.value();
        Order order = orderRepository.findById(event.orderId()).orElseThrow();
        order.markAsPaid(event.paymentId());
        orderRepository.save(order);

        // 다음 단계 이벤트 발행 (같은 Kafka 트랜잭션)
        kafkaTemplate.send("order-paid",
            new OrderPaidEvent(order.getId(), event.paymentId()));

        // 처리된 오프셋 기록 (같은 DB 트랜잭션)
        offsetRepository.save(new ProcessedOffset(offsetKey, Instant.now()));
    }
}

10. Reactive Streams vs Virtual Threads (2025)

10-1. Project Reactor: 배압(Backpressure)이 핵심

리액티브 스트림은 논블로킹 I/O와 배압(Backpressure) 메커니즘을 핵심 설계 원칙으로 합니다.

// === Reactor 기반 비동기 처리 ===

@Service
public class ReactiveOrderService {

    public Flux<OrderEvent> processOrderStream() {
        return kafkaReceiver.receive()
            .flatMap(record -> {
                OrderEvent event = record.value();
                return processEvent(event)
                    .then(record.receiverOffset().commit())
                    .thenReturn(event);
            }, 16)  // 동시 처리 수 제한 (배압)
            .onErrorResume(e -> {
                log.error("처리 실패", e);
                return Mono.empty();
            })
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
    }

    private Mono<Void> processEvent(OrderEvent event) {
        return Mono.fromCallable(() -> {
            // 비즈니스 로직
            return null;
        }).subscribeOn(Schedulers.boundedElastic()).then();
    }
}

10-2. Java 21 Virtual Threads: 더 단순한 코드

Virtual Threads는 기존 동기식 코드 스타일을 유지하면서 높은 동시성을 달성합니다.

// === Virtual Threads 기반 비동기 처리 ===

@Configuration
public class VirtualThreadConfig {

    @Bean
    public TomcatProtocolHandlerCustomizer<?> protocolHandlerCustomizer() {
        return protocolHandler -> {
            protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
        };
    }
}

@Service
public class VirtualThreadOrderService {

    // 동기식 코드지만 Virtual Thread 위에서 실행
    // → 블로킹 I/O 시 OS 스레드가 아닌 Virtual Thread만 블로킹
    public OrderResult processOrder(UUID orderId) {
        // 병렬 처리: StructuredTaskScope 활용
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

            Subtask<PaymentResult> paymentTask = scope.fork(() ->
                paymentService.processPayment(orderId)
            );
            Subtask<InventoryResult> inventoryTask = scope.fork(() ->
                inventoryService.checkInventory(orderId)
            );

            scope.join();
            scope.throwIfFailed();

            return new OrderResult(
                paymentTask.get(),
                inventoryTask.get()
            );
        }
    }
}

10-3. 선택 가이드: 의사결정 매트릭스

기준Reactive (Reactor/WebFlux)Virtual Threads
코드 복잡도높음 (리액티브 연산자 학습)낮음 (기존 동기 코드)
배압 제어내장 (Flux/Mono)수동 구현 필요
디버깅어려움 (스택 트레이스 복잡)쉬움 (전통적 스택 트레이스)
메모리 효율매우 좋음좋음 (OS 스레드보다)
I/O 집약 작업최적매우 좋음
CPU 집약 작업부적합부적합 (플랫폼 스레드 사용)
기존 코드 마이그레이션전면 재작성 필요설정 변경만으로 가능
생태계 성숙도높음 (5년+)성장 중 (2023~)

권장 가이드:

  • 새 프로젝트 + 단순 I/O: Virtual Threads
  • 스트리밍 데이터 처리: Reactive Streams
  • 기존 Spring MVC 프로젝트: Virtual Threads (마이그레이션 비용 최소)
  • 실시간 데이터 파이프라인: Reactive Streams

11. 실전 아키텍처: 결제 시스템 설계

모든 패턴을 결합한 이커머스 결제 시스템의 전체 아키텍처를 설계합니다.

                         이커머스 결제 시스템 전체 아키텍처
                         ================================

[Client][API Gateway]
                |
        [Order Service] ─── Write DB (PostgreSQL)
           |                     |
           | (Outbox Pattern)    | (CDC - Debezium)
           |           |              [Kafka: order-events]
           |                /        |        \
           ↓               ↓        ↓         ↓
    [Saga Orchestrator]  [Payment] [Inventory] [Notification]
    (Temporal)           Service   Service     Service
           |                |        |
           | commands       |        |
           ↓                ↓        ↓
    [payment-commands]  [payment-  [inventory-
                        events]    events]
                           |
                    [DLQ + Monitoring]
                           |
                    [Alerting (PagerDuty)]

Read Side:
    [Kafka: order-events][Order Projection]Read DB (MongoDB)
[Analytics Service]Data Warehouse
[Search Service]Elasticsearch

적용된 패턴 요약

패턴적용 위치목적
CQRSOrder Service주문 쓰기/읽기 분리
Saga (Orchestration)Temporal Orchestrator결제-재고-배송 트랜잭션 관리
Event SourcingOrder Aggregate주문 상태 변경 이력 관리
Outbox PatternOrder Service → KafkaDB-메시지 원자성 보장
멱등성모든 Consumer중복 메시지 처리 방지
DLQ모든 Kafka Consumer처리 실패 메시지 격리
Exactly-OncePayment 처리이중 결제 방지

퀴즈

Q1. CQRS에서 Write Model과 Read Model이 서로 다른 데이터베이스를 사용할 때, 두 모델 간의 데이터 일관성을 보장하기 위해 가장 일반적으로 사용되는 접근 방식은?

정답: 이벤트 기반 비동기 동기화 (Eventual Consistency)

Write Model에서 상태 변경 시 이벤트를 발행하고, Read Model의 프로젝션 핸들러가 이 이벤트를 구독하여 Read DB를 업데이트합니다. 두 모델 사이에는 일시적 불일치(수 ms~수 초)가 존재할 수 있지만, 최종적으로 일관성이 보장됩니다.

강한 일관성이 필요한 경우에는 CQRS의 적합성을 재검토해야 합니다.

Q2. Choreography Saga에서 서비스 A가 이벤트를 발행하고 서비스 B가 처리에 실패했을 때, 서비스 A는 어떻게 이를 인지할 수 있나요?

정답: 서비스 B가 실패 이벤트(예: PaymentFailed)를 발행하면 서비스 A가 이를 구독하여 인지합니다. Choreography에서는 각 서비스가 성공/실패 이벤트를 모두 발행해야 하며, 관련 서비스들이 이를 구독하여 적절히 대응합니다.

이것이 Choreography의 단점 중 하나입니다. 서비스가 많아질수록 이벤트 흐름 추적이 복잡해지며, 타임아웃 기반 감지가 추가로 필요할 수 있습니다.

Q3. Event Sourcing에서 이벤트가 100만 개 쌓인 Aggregate를 로드할 때 성능 문제를 해결하는 핵심 기법은?

정답: Snapshot 패턴

주기적으로(예: 100개 이벤트마다) Aggregate의 현재 상태를 스냅샷으로 저장합니다. 로드 시 가장 최근 스냅샷에서 시작하여 그 이후의 이벤트만 재생하면 됩니다.

예: 100만 개 이벤트 중 마지막 스냅샷이 999,900번째라면 100개의 이벤트만 재생하면 됩니다.

Q4. Outbox Pattern에서 CDC(Change Data Capture) 방식이 Polling 방식보다 선호되는 주요 이유는?

정답: CDC는 DB의 트랜잭션 로그(WAL)를 직접 읽기 때문에 거의 실시간(수 ms)으로 변경을 감지합니다. Polling은 주기적으로 쿼리를 실행하므로 지연이 크고(수 초~수 분), 빈번한 쿼리로 DB에 부하를 줍니다.

또한 CDC는 WAL 순서 그대로 이벤트를 전달하여 순서 보장이 자연스럽고, DB 트랜잭션 로그를 읽으므로 추가적인 쿼리 부하가 거의 없습니다.

Q5. Kafka에서 Exactly-Once Semantics를 달성하려면 Producer, Broker, Consumer 각각에서 어떤 설정이 필요한가요?

정답:

  • Producer: enable.idempotence=true + transactional.id 설정 (멱등 프로듀서 + 트랜잭션)
  • Broker: 트랜잭션 로그 유지 (기본 활성화)
  • Consumer: isolation.level=read_committed + 수동 오프셋 커밋 + 멱등 처리

End-to-End Exactly-Once를 위해서는 Consumer가 DB에 결과를 저장할 때도 멱등성을 보장해야 합니다. Kafka 트랜잭션은 Kafka 내부의 Exactly-Once만 보장하며, 외부 시스템(DB)까지 포함하려면 Consumer 측 멱등성이 필수입니다.


참고 자료

  1. Martin Fowler — "Event Sourcing" (martinfowler.com)
  2. Chris Richardson — "Microservices Patterns" (Manning, 2018)
  3. Vaughn Vernon — "Implementing Domain-Driven Design" (Addison-Wesley, 2013)
  4. Greg Young — "CQRS Documents" (cqrs.files.wordpress.com)
  5. Apache Kafka Documentation — "Exactly-Once Semantics" (kafka.apache.org)
  6. Debezium Documentation — "Outbox Event Router" (debezium.io)
  7. Temporal.io Documentation — "Saga Pattern with Temporal" (docs.temporal.io)
  8. Netflix Tech Blog — "Evolution of the Netflix Data Pipeline" (netflixtechblog.com)
  9. Uber Engineering Blog — "Building Reliable Reprocessing and Dead Letter Queues" (eng.uber.com)
  10. Confluent Blog — "Exactly-Once Semantics Are Possible" (confluent.io)
  11. AWS Architecture Blog — "Saga Orchestration Pattern" (aws.amazon.com)
  12. Microsoft Azure Docs — "CQRS Pattern" (learn.microsoft.com)
  13. Spring for Apache Kafka Reference — "Error Handling and DLQ" (docs.spring.io)
  14. JEP 444 — "Virtual Threads" (openjdk.org)
  15. Project Reactor Reference — "Backpressure" (projectreactor.io)
  16. Pat Helland — "Idempotence Is Not a Medical Condition" (ACM Queue, 2012)

Async Processing Patterns Mastery: CQRS, Saga, Event Sourcing — Senior Backend Developer Essential Guide

1. Why Async? The Survival Strategy for Large-Scale Systems

Modern large-scale systems handle traffic volumes that synchronous request-response models simply cannot manage.

Real-World Examples:

CompanyScaleAsync Processing Approach
Netflix260M subscribers, millions of events/secApache Kafka-based event streaming
UberPetabytes of daily real-time dataApache Kafka + Apache Flink
LinkedIn7T+ messages/day via KafkaSelf-developed Kafka infrastructure
Shopify800K requests/sec on Black FridayAsync order processing pipeline

In synchronous architecture, when Service A calls Service B, and B calls C, the total latency is the sum of each service call. If any single service slows down, the entire chain blocks.

In asynchronous architecture, a message broker decouples services, enabling each to scale independently.

Synchronous:  Client -> A(50ms) -> B(100ms) -> C(200ms) = 350ms total wait
Asynchronous: Client -> A(50ms) -> Message Broker -> (B, C process in parallel)
              Client gets response after 50ms, B and C process asynchronously

Key Benefits of Going Async:

  1. Temporal decoupling — Producers and consumers do not need to be running simultaneously
  2. Independent scaling — Read and write workloads can be scaled separately
  3. Fault isolation — A single service failure does not cascade to the entire system
  4. Peak traffic absorption — Message queues act as buffers

However, async is not free. Eventual Consistency, message ordering, duplicate handling, and distributed transactions are new challenges. The patterns covered in this guide are the answers to these problems.


2. Event-Driven Architecture Fundamentals

Event-Driven Architecture (EDA) is the foundation of async patterns. State changes in the system are expressed as "events," and interested services subscribe and react to them.

2-1. Domain Events vs Integration Events

AspectDomain EventIntegration Event
ScopeWithin a single bounded contextBetween bounded contexts
ExampleOrderItemAddedOrderPlaced
DeliveryIn-memory event busMessage broker (Kafka, RabbitMQ)
Schema mgmtFreely changeable (internal)Contract management required
// Domain Event - within bounded context
public record OrderItemAdded(
    UUID orderId,
    UUID productId,
    int quantity,
    BigDecimal price,
    Instant occurredAt
) implements DomainEvent {}

// Integration Event - published to external services
public record OrderPlacedEvent(
    UUID orderId,
    UUID customerId,
    BigDecimal totalAmount,
    List<OrderLineItem> items,
    Instant occurredAt,
    int version  // schema version management
) implements IntegrationEvent {}

2-2. Event Bus vs Event Store

An Event Bus (e.g., Kafka, RabbitMQ) is a pipeline that delivers events. Once consumers receive events, they are typically deleted (or expire after TTL).

An Event Store permanently persists all events. Events can be replayed to reconstruct state at any point in time.

Event Bus (Kafka):
  Producer -> [Topic: orders] -> Consumer Group A (Order Service)
                               -> Consumer Group B (Notification Service)
                               -> Consumer Group C (Analytics Service)

Event Store:
  [Event #1: OrderCreated]
  [Event #2: ItemAdded]
  [Event #3: PaymentReceived]
  [Event #4: OrderShipped]
  -> Replay events #1-#4 to reconstruct current order state

2-3. Understanding Eventual Consistency

Maintaining strong consistency in distributed systems requires distributed locks, 2PC (2-Phase Commit), etc., which significantly degrade performance and availability.

According to the CAP theorem, during network partitions you must choose between Consistency (C) and Availability (A). Most large-scale systems choose Availability and accept Eventual Consistency.

Eventual Consistency means "data may be inconsistent right now, but given enough time, all nodes will converge to the same state."

Practical Implications:

  • Inventory might not decrease immediately after an order (milliseconds to seconds delay)
  • Show users "Order is being processed" first, then update after confirmation
  • Minimizing the consistency window is the key concern

3. CQRS Deep Dive

CQRS (Command Query Responsibility Segregation) completely separates the read and write models.

3-1. Core Concept

In traditional CRUD, a single data model handles both reads and writes. In reality, reads and writes have very different requirements.

Traditional CRUD:
  Client -> [Single Model] -> Database
            (both read/write)

CQRS:
  Client --Command--> [Write Model] -> Command DB (normalized)
                            | Events
  Client <--Query---- [Read Model]  <- Read DB (denormalized, optimized)

Write Model (Command Side):

  • Business rule validation
  • Domain invariant enforcement
  • Normalized schema
  • Strong consistency

Read Model (Query Side):

  • Denormalized views optimized for queries
  • Multiple storage options (Elasticsearch, Redis, MongoDB)
  • Eventual Consistency acceptable

3-2. When to Use CQRS

Good fit:

  • Extreme read/write ratio (reads 100:1 or more)
  • Complex domain logic + diverse query requirements
  • Different scaling needs for reads and writes
  • When used together with Event Sourcing

Not a good fit:

  • Simple CRUD applications
  • When strong consistency is absolutely required
  • Simple domain with predictable query patterns

3-3. Implementation with Spring Boot + Kafka

// === Command Side: Order Creation ===

@Service
@RequiredArgsConstructor
public class OrderCommandService {

    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    @Transactional
    public UUID createOrder(CreateOrderCommand command) {
        // 1. Business rule validation
        validateOrder(command);

        // 2. Process in domain model
        Order order = Order.create(
            command.customerId(),
            command.items(),
            command.shippingAddress()
        );

        // 3. Save to Write DB
        orderRepository.save(order);

        // 4. Publish event (for Read Model sync)
        OrderCreatedEvent event = new OrderCreatedEvent(
            order.getId(),
            order.getCustomerId(),
            order.getTotalAmount(),
            order.getItems().stream()
                .map(this::toEventItem)
                .toList(),
            Instant.now()
        );
        kafkaTemplate.send("order-events", order.getId().toString(), event);

        return order.getId();
    }

    private void validateOrder(CreateOrderCommand command) {
        if (command.items().isEmpty()) {
            throw new InvalidOrderException("Order items are empty");
        }
        // Check inventory, customer status, etc.
    }
}
// === Query Side: Order Query Projection ===

@Service
@RequiredArgsConstructor
public class OrderQueryService {

    private final OrderReadRepository readRepository;

    public OrderDetailView getOrderDetail(UUID orderId) {
        return readRepository.findById(orderId)
            .orElseThrow(() -> new OrderNotFoundException(orderId));
    }

    public Page<OrderSummaryView> getCustomerOrders(
            UUID customerId, Pageable pageable) {
        return readRepository.findByCustomerId(customerId, pageable);
    }
}

// Read Model - denormalized view optimized for queries
@Document(collection = "order_views")
public record OrderDetailView(
    UUID orderId,
    UUID customerId,
    String customerName,      // Denormalized: includes customer name
    String customerEmail,
    BigDecimal totalAmount,
    String status,
    List<OrderItemView> items,
    AddressView shippingAddress,
    Instant createdAt,
    Instant lastUpdatedAt
) {}
// === Event Handler: Write -> Read Sync ===

@Component
@RequiredArgsConstructor
public class OrderProjectionHandler {

    private final OrderReadRepository readRepository;
    private final CustomerQueryClient customerClient;

    @KafkaListener(topics = "order-events", groupId = "order-projection")
    public void handleOrderEvent(OrderEvent event) {
        switch (event) {
            case OrderCreatedEvent e -> handleOrderCreated(e);
            case OrderStatusChangedEvent e -> handleStatusChanged(e);
            case OrderCancelledEvent e -> handleOrderCancelled(e);
            default -> log.warn("Unknown event type: {}", event.getClass());
        }
    }

    private void handleOrderCreated(OrderCreatedEvent event) {
        // Fetch customer info to create denormalized view
        CustomerInfo customer = customerClient.getCustomer(event.customerId());

        OrderDetailView view = new OrderDetailView(
            event.orderId(),
            event.customerId(),
            customer.name(),
            customer.email(),
            event.totalAmount(),
            "CREATED",
            event.items().stream().map(this::toItemView).toList(),
            null,
            event.occurredAt(),
            event.occurredAt()
        );
        readRepository.save(view);
    }
}

3-4. Read Model Synchronization Strategies

StrategyLatencyConsistencyComplexityBest For
Event Handler (Kafka)ms to secondsEventualMediumMost cases
CDC (Debezium)SecondsEventualHighLegacy system integration
Polling (Scheduled)Seconds to minutesEventualLowSimple cases
Synchronous updateImmediateStrongLowLow performance requirements

4. Saga Pattern Mastery

How do you manage business transactions spanning multiple services in microservices? Traditional DB transactions from monoliths are not available. The Saga pattern decomposes distributed transactions into a series of local transactions.

4-1. Choreography

In the Choreography approach, there is no central coordinator. Each service publishes events, and the next service subscribes to those events to perform its local transaction.

E-commerce Order Flow (Choreography):

Order Service --OrderPlaced--> Payment Service
                                    |
                            PaymentCompleted
                                    |
                                    v
                              Inventory Service
                                    |
                            InventoryReserved
                                    |
                                    v
                              Shipping Service
                                    |
                            ShipmentCreated
                                    |
                                    v
                              Order Service (status update)

On Failure (Compensating Transactions):
Inventory Service --InventoryReservationFailed--> Payment Service (refund)
                                                        |
                                                 PaymentRefunded
                                                        |
                                                        v
                                                 Order Service (cancel order)
// === Choreography Saga: Order Service ===

@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Transactional
    public UUID placeOrder(PlaceOrderCommand command) {
        Order order = Order.create(command);
        order.setStatus(OrderStatus.PENDING);
        orderRepository.save(order);

        // Publish event - Payment Service subscribes
        kafkaTemplate.send("order-events", new OrderPlacedEvent(
            order.getId(),
            order.getCustomerId(),
            order.getTotalAmount(),
            order.getItems()
        ));

        return order.getId();
    }

    // Payment completed event listener
    @KafkaListener(topics = "payment-events", groupId = "order-service")
    public void onPaymentCompleted(PaymentCompletedEvent event) {
        Order order = orderRepository.findById(event.orderId()).orElseThrow();
        order.setStatus(OrderStatus.PAID);
        orderRepository.save(order);
    }

    // Payment failed event listener
    @KafkaListener(topics = "payment-events", groupId = "order-service")
    public void onPaymentFailed(PaymentFailedEvent event) {
        Order order = orderRepository.findById(event.orderId()).orElseThrow();
        order.setStatus(OrderStatus.CANCELLED);
        order.setCancellationReason(event.reason());
        orderRepository.save(order);
    }
}
// === Choreography Saga: Payment Service ===

@Service
@RequiredArgsConstructor
public class PaymentService {

    private final PaymentRepository paymentRepository;
    private final PaymentGateway paymentGateway;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @KafkaListener(topics = "order-events", groupId = "payment-service")
    @Transactional
    public void onOrderPlaced(OrderPlacedEvent event) {
        try {
            // Process payment
            PaymentResult result = paymentGateway.charge(
                event.customerId(),
                event.totalAmount()
            );

            Payment payment = Payment.create(
                event.orderId(),
                event.customerId(),
                event.totalAmount(),
                result.transactionId()
            );
            paymentRepository.save(payment);

            // Publish success event - Inventory Service subscribes
            kafkaTemplate.send("payment-events", new PaymentCompletedEvent(
                event.orderId(),
                payment.getId(),
                result.transactionId()
            ));

        } catch (PaymentException e) {
            // Publish failure event - Order Service subscribes and cancels
            kafkaTemplate.send("payment-events", new PaymentFailedEvent(
                event.orderId(),
                e.getMessage()
            ));
        }
    }
}

Choreography Pros and Cons:

ProsCons
Loose coupling between servicesDifficult to understand full flow
Each service independently deployableComplex debugging
No Single Point of Failure (SPOF)Possible circular dependencies
Relatively simple implementationEvent tracking complexity grows with services

4-2. Orchestration

In the Orchestration approach, a central coordinator (Orchestrator) manages the entire workflow. It sends commands to each service and decides the next step based on responses.

E-commerce Order Flow (Orchestration):

                    Order Saga Orchestrator
                   /          |          \
                  v           v           v
          Payment Service  Inventory  Shipping
          (charge)         (reserve)  (create)
                  v           v           v
          (result)        (result)    (result)
                   \          |          /
                    Order Saga Orchestrator
                         (final decision)
// === Orchestration Saga: Using Temporal Framework ===

@WorkflowInterface
public interface OrderSagaWorkflow {

    @WorkflowMethod
    OrderResult processOrder(PlaceOrderCommand command);
}

@WorkflowImpl(workflows = OrderSagaWorkflow.class)
public class OrderSagaWorkflowImpl implements OrderSagaWorkflow {

    // Activity stubs for each service
    private final PaymentActivities paymentActivities =
        Workflow.newActivityStub(PaymentActivities.class,
            ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofSeconds(30))
                .setRetryOptions(RetryOptions.newBuilder()
                    .setMaximumAttempts(3)
                    .build())
                .build());

    private final InventoryActivities inventoryActivities =
        Workflow.newActivityStub(InventoryActivities.class,
            ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofSeconds(30))
                .build());

    private final ShippingActivities shippingActivities =
        Workflow.newActivityStub(ShippingActivities.class,
            ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofSeconds(60))
                .build());

    @Override
    public OrderResult processOrder(PlaceOrderCommand command) {
        String paymentId = null;
        String reservationId = null;

        try {
            // Step 1: Payment
            paymentId = paymentActivities.processPayment(
                command.customerId(),
                command.totalAmount()
            );

            // Step 2: Inventory reservation
            reservationId = inventoryActivities.reserveInventory(
                command.items()
            );

            // Step 3: Create shipment
            String shipmentId = shippingActivities.createShipment(
                command.orderId(),
                command.shippingAddress()
            );

            return OrderResult.success(command.orderId(), shipmentId);

        } catch (Exception e) {
            // Execute compensating transactions
            compensate(paymentId, reservationId);
            return OrderResult.failed(command.orderId(), e.getMessage());
        }
    }

    private void compensate(String paymentId, String reservationId) {
        // Execute compensating transactions in reverse order
        if (reservationId != null) {
            inventoryActivities.cancelReservation(reservationId);
        }
        if (paymentId != null) {
            paymentActivities.refundPayment(paymentId);
        }
    }
}
// === Activity Implementation ===

@ActivityInterface
public interface PaymentActivities {
    String processPayment(UUID customerId, BigDecimal amount);
    void refundPayment(String paymentId);
}

@Component
@RequiredArgsConstructor
public class PaymentActivitiesImpl implements PaymentActivities {

    private final PaymentGateway paymentGateway;
    private final PaymentRepository paymentRepository;

    @Override
    public String processPayment(UUID customerId, BigDecimal amount) {
        PaymentResult result = paymentGateway.charge(customerId, amount);
        Payment payment = Payment.create(customerId, amount, result.transactionId());
        paymentRepository.save(payment);
        return payment.getId().toString();
    }

    @Override
    public void refundPayment(String paymentId) {
        Payment payment = paymentRepository.findById(UUID.fromString(paymentId))
            .orElseThrow();
        paymentGateway.refund(payment.getTransactionId());
        payment.setStatus(PaymentStatus.REFUNDED);
        paymentRepository.save(payment);
    }
}

Orchestration Pros and Cons:

ProsCons
Entire workflow managed in one placeOrchestrator can become SPOF
Easy debugging and monitoringHigher coupling than Choreography
Easier complex compensation logicRisk of logic concentration in orchestrator
Framework support (Temporal/Cadence)Additional infrastructure (workflow engine) needed

4-3. Choreography vs Orchestration Selection Guide

CriteriaChoreographyOrchestration
Number of services3-4 or fewer5 or more
Workflow complexityLinear, simpleConditional branching, parallel processing
Observability needsLowHigh
Team structureIndependent teamsCentral platform team
Compensation logicSimpleComplex (multiple paths)

4-4. Compensating Transactions

Compensating transactions semantically undo already committed transactions. They do not physically roll back but perform reverse actions.

// Compensating transaction mapping
public enum CompensationAction {
    PAYMENT_CHARGE    -> PAYMENT_REFUND,
    INVENTORY_RESERVE -> INVENTORY_RELEASE,
    SHIPPING_CREATE   -> SHIPPING_CANCEL,
    COUPON_APPLY      -> COUPON_RESTORE,
    POINTS_DEDUCT     -> POINTS_RESTORE
}

Semantic Compensation vs Exact Compensation:

  • Exact Compensation: Restores precisely to original state (e.g., payment cancel -> full refund)
  • Semantic Compensation: Provides a business-meaningful compensation (e.g., flight cancellation -> partial refund + credit)

In practice, Semantic Compensation is more common because exact restoration is often impossible after time passes.


5. Event Sourcing

Event Sourcing stores all events that caused state changes in order, instead of storing only the current state of an entity.

5-1. Core Concept

Traditional CRUD:
  Account table: id=1, balance=750

Event Sourcing:
  Event #1: AccountCreated(id=1, initialBalance=1000)
  Event #2: MoneyWithdrawn(id=1, amount=200)
  Event #3: MoneyDeposited(id=1, amount=150)
  Event #4: MoneyWithdrawn(id=1, amount=200)
  -> Replay: 1000 - 200 + 150 - 200 = 750 (current balance)

All state changes are appended to an immutable, append-only event log. No deletions or updates.

5-2. Event Store Implementation

// === Event Store Interface ===

public interface EventStore {
    void saveEvents(UUID aggregateId, List<DomainEvent> events, int expectedVersion);
    List<DomainEvent> getEvents(UUID aggregateId);
    List<DomainEvent> getEvents(UUID aggregateId, int fromVersion);
}

// === PostgreSQL-based Event Store Implementation ===

@Repository
@RequiredArgsConstructor
public class PostgresEventStore implements EventStore {

    private final JdbcTemplate jdbcTemplate;
    private final ObjectMapper objectMapper;

    @Override
    @Transactional
    public void saveEvents(UUID aggregateId, List<DomainEvent> events,
                          int expectedVersion) {
        // Optimistic concurrency control
        Integer currentVersion = jdbcTemplate.queryForObject(
            "SELECT COALESCE(MAX(version), 0) FROM event_store WHERE aggregate_id = ?",
            Integer.class, aggregateId
        );

        if (currentVersion != expectedVersion) {
            throw new ConcurrencyException(
                "Concurrency conflict: expected version " + expectedVersion +
                ", actual version " + currentVersion
            );
        }

        int version = expectedVersion;
        for (DomainEvent event : events) {
            version++;
            jdbcTemplate.update(
                """
                INSERT INTO event_store
                (event_id, aggregate_id, aggregate_type, event_type,
                 event_data, version, created_at)
                VALUES (?, ?, ?, ?, ?::jsonb, ?, ?)
                """,
                UUID.randomUUID(),
                aggregateId,
                event.getAggregateType(),
                event.getClass().getSimpleName(),
                objectMapper.writeValueAsString(event),
                version,
                Instant.now()
            );
        }
    }

    @Override
    public List<DomainEvent> getEvents(UUID aggregateId) {
        return jdbcTemplate.query(
            "SELECT * FROM event_store WHERE aggregate_id = ? ORDER BY version",
            this::mapRowToEvent,
            aggregateId
        );
    }
}
-- Event Store table schema
CREATE TABLE event_store (
    event_id       UUID PRIMARY KEY,
    aggregate_id   UUID NOT NULL,
    aggregate_type VARCHAR(255) NOT NULL,
    event_type     VARCHAR(255) NOT NULL,
    event_data     JSONB NOT NULL,
    version        INTEGER NOT NULL,
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    UNIQUE (aggregate_id, version)  -- optimistic concurrency control
);

CREATE INDEX idx_event_store_aggregate ON event_store (aggregate_id, version);
CREATE INDEX idx_event_store_type ON event_store (event_type, created_at);

5-3. Applying Events on Aggregates

// === Event-Sourced Aggregate ===

public class OrderAggregate {

    private UUID id;
    private UUID customerId;
    private OrderStatus status;
    private BigDecimal totalAmount;
    private List<OrderItem> items = new ArrayList<>();
    private int version = 0;

    // Uncommitted events list
    private final List<DomainEvent> uncommittedEvents = new ArrayList<>();

    // Restore state from event history
    public static OrderAggregate fromHistory(List<DomainEvent> history) {
        OrderAggregate aggregate = new OrderAggregate();
        for (DomainEvent event : history) {
            aggregate.apply(event, false);
        }
        return aggregate;
    }

    // Handle new commands
    public void placeOrder(UUID customerId, List<OrderItem> items) {
        if (this.status != null) {
            throw new IllegalStateException("Order already created");
        }

        BigDecimal total = items.stream()
            .map(item -> item.price().multiply(BigDecimal.valueOf(item.quantity())))
            .reduce(BigDecimal.ZERO, BigDecimal::add);

        apply(new OrderPlacedEvent(this.id, customerId, total, items, Instant.now()), true);
    }

    public void cancelOrder(String reason) {
        if (this.status == OrderStatus.SHIPPED) {
            throw new IllegalStateException("Cannot cancel shipped orders");
        }
        apply(new OrderCancelledEvent(this.id, reason, Instant.now()), true);
    }

    // Apply event (state change)
    private void apply(DomainEvent event, boolean isNew) {
        when(event);  // State mutation
        this.version++;
        if (isNew) {
            uncommittedEvents.add(event);
        }
    }

    // State change logic per event type
    private void when(DomainEvent event) {
        switch (event) {
            case OrderPlacedEvent e -> {
                this.id = e.orderId();
                this.customerId = e.customerId();
                this.totalAmount = e.totalAmount();
                this.items = new ArrayList<>(e.items());
                this.status = OrderStatus.PLACED;
            }
            case OrderCancelledEvent e -> {
                this.status = OrderStatus.CANCELLED;
            }
            case OrderShippedEvent e -> {
                this.status = OrderStatus.SHIPPED;
            }
            default -> throw new IllegalArgumentException("Unknown event: " + event);
        }
    }

    public List<DomainEvent> getUncommittedEvents() {
        return Collections.unmodifiableList(uncommittedEvents);
    }

    public void clearUncommittedEvents() {
        uncommittedEvents.clear();
    }
}

5-4. Snapshot Pattern

When tens of thousands of events accumulate, replaying all of them every time is inefficient. Snapshots save the aggregate state at specific intervals, so only events after the snapshot need to be replayed.

// Snapshot save and restore
@Service
public class SnapshotService {

    private static final int SNAPSHOT_INTERVAL = 100; // every 100 events

    public OrderAggregate loadAggregate(UUID aggregateId) {
        // 1. Get latest snapshot
        Optional<Snapshot> snapshot = snapshotRepository
            .findLatest(aggregateId);

        if (snapshot.isPresent()) {
            // 2. Restore state from snapshot
            OrderAggregate aggregate = deserialize(snapshot.get().getData());
            int fromVersion = snapshot.get().getVersion();

            // 3. Replay only events after snapshot
            List<DomainEvent> newEvents = eventStore
                .getEvents(aggregateId, fromVersion + 1);
            for (DomainEvent event : newEvents) {
                aggregate.apply(event, false);
            }
            return aggregate;

        } else {
            // No snapshot, replay all
            List<DomainEvent> allEvents = eventStore.getEvents(aggregateId);
            return OrderAggregate.fromHistory(allEvents);
        }
    }

    public void saveIfNeeded(OrderAggregate aggregate) {
        if (aggregate.getVersion() % SNAPSHOT_INTERVAL == 0) {
            snapshotRepository.save(new Snapshot(
                aggregate.getId(),
                aggregate.getVersion(),
                serialize(aggregate),
                Instant.now()
            ));
        }
    }
}

5-5. Event Sourcing vs CRUD Comparison

CriteriaCRUDEvent Sourcing
StorageCurrent state onlyAll state change events
Audit trailRequires separate implementationBuilt-in (events are the audit log)
Time travelNot possiblePossible (reconstruct state at any point)
Storage volumeRelatively smallGrows with event accumulation
ComplexityLowHigh (event design, snapshots, etc.)
ConcurrencyPessimistic/optimistic lockingEvent version-based optimistic control
PerformanceEasy read optimizationReplay cost (mitigated by snapshots)
Schema changesMigration requiredEvent upcasting
CQRS integrationOptionalNatural combination

5-6. When NOT to Use Event Sourcing

  • Simple CRUD domains (blogs, configuration management)
  • Strong consistency is mandatory (ATMs requiring real-time balance checks)
  • Unstable event design in early stages (frequent changes mean high upcasting cost)
  • Team lacks experience (steep learning curve)
  • Regulatory requirements for data deletion (conflicts with GDPR right to be forgotten)

6. Outbox Pattern (The Core of Reliability)

6-1. The Problem: Dual Write

One of the most common mistakes in distributed systems is performing database saves and message publishing as separate operations.

// Dangerous code: Dual write problem
@Transactional
public void createOrder(Order order) {
    orderRepository.save(order);           // Step 1: DB save
    kafkaTemplate.send("orders", event);   // Step 2: Message publish
    // Step 1 success + Step 2 failure = In DB but event not published
    // Step 1 success + Step 2 success + DB commit failure = Event published but not in DB
}

DB transactions and message broker sends are different infrastructure, so they cannot be processed atomically.

6-2. Solution: Outbox Pattern

The Outbox Pattern stores events in an outbox table within the DB transaction, and a separate process delivers them to the message broker.

1. Business logic + outbox INSERT = single DB transaction (atomicity guaranteed)
2. Separate process (CDC/Poller) reads outbox table and publishes to Kafka
3. After successful publish, mark outbox record as completed
-- Outbox table schema
CREATE TABLE outbox (
    id              UUID PRIMARY KEY,
    aggregate_type  VARCHAR(255) NOT NULL,
    aggregate_id    UUID NOT NULL,
    event_type      VARCHAR(255) NOT NULL,
    payload         JSONB NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMPTZ,
    status          VARCHAR(20) DEFAULT 'PENDING'
);

CREATE INDEX idx_outbox_pending ON outbox (status, created_at)
    WHERE status = 'PENDING';
// === Outbox Pattern Implementation ===

@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;

    @Transactional  // Guaranteed by single transaction
    public UUID createOrder(CreateOrderCommand command) {
        // 1. Execute business logic
        Order order = Order.create(command);
        orderRepository.save(order);

        // 2. Save event to outbox in same transaction
        OutboxMessage message = OutboxMessage.builder()
            .aggregateType("Order")
            .aggregateId(order.getId())
            .eventType("OrderCreated")
            .payload(objectMapper.writeValueAsString(
                new OrderCreatedEvent(order.getId(), order.getTotalAmount())
            ))
            .build();
        outboxRepository.save(message);

        return order.getId();
        // When transaction commits, both order and outbox are saved together
    }
}

6-3. CDC (Debezium) Approach

Debezium monitors the DB transaction log (WAL) to capture changes and deliver them to Kafka.

{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "dbz_password",
    "database.dbname": "orderdb",
    "database.server.name": "order-service",
    "table.include.list": "public.outbox",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.by.field": "aggregate_type",
    "transforms.outbox.route.topic.replacement": "events.order"
  }
}

6-4. Polling vs CDC Comparison

CriteriaPollingCDC (Debezium)
LatencyDepends on polling interval (seconds-minutes)Near real-time (ms)
DB loadPeriodic queries cause loadMinimal (reads WAL)
Implementation complexityLow (scheduler + query)High (Debezium infrastructure)
Order guaranteeOutbox table orderWAL order preserved
Operational overheadLowKafka Connect cluster needed
ScalabilitySingle poller bottleneckHighly scalable

7. Idempotency Patterns

In distributed systems, messages are delivered at-least-once. Network failures, broker restarts, consumer redeployments cause duplicate delivery. Idempotency guarantees that processing the same message multiple times produces the same result as processing it once.

7-1. Producer Idempotency (Kafka)

# Kafka Producer configuration
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.properties.acks=all
spring.kafka.producer.properties.max.in.flight.requests.per.connection=5
spring.kafka.producer.properties.retries=2147483647

Kafka's idempotent producer assigns sequence numbers to each message, enabling the broker to detect duplicates.

7-2. Consumer Idempotency

// === Idempotent Consumer Implementation ===

@Component
@RequiredArgsConstructor
public class IdempotentOrderEventHandler {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderProjectionRepository orderProjectionRepository;

    @KafkaListener(topics = "order-events", groupId = "order-projection")
    @Transactional
    public void handleOrderEvent(
            @Payload OrderEvent event,
            @Header(KafkaHeaders.RECEIVED_KEY) String key,
            @Header("eventId") String eventId) {

        // 1. Check if event already processed
        if (processedEventRepository.existsById(eventId)) {
            log.info("Ignoring duplicate event: eventId={}", eventId);
            return;
        }

        // 2. Process business logic
        processEvent(event);

        // 3. Record as processed (same transaction)
        processedEventRepository.save(new ProcessedEvent(
            eventId,
            event.getClass().getSimpleName(),
            Instant.now()
        ));
    }

    private void processEvent(OrderEvent event) {
        switch (event) {
            case OrderCreatedEvent e -> createOrderProjection(e);
            case OrderStatusChangedEvent e -> updateOrderStatus(e);
            default -> log.warn("Unknown event type");
        }
    }
}
-- Processed events table
CREATE TABLE processed_events (
    event_id    VARCHAR(255) PRIMARY KEY,
    event_type  VARCHAR(255) NOT NULL,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

7-3. Unique Constraint vs Redis SET NX

ApproachProsCons
DB Unique ConstraintAtomic with transactionIncreases DB load
Redis SET NXVery fast lookupsNot guaranteed if Redis fails
DB + Redis comboFast primary filter + reliable guaranteeComplex implementation
// Redis SET NX for fast duplicate check
@Component
public class RedisIdempotencyGuard {

    private final StringRedisTemplate redisTemplate;
    private static final Duration TTL = Duration.ofHours(24);

    public boolean tryAcquire(String idempotencyKey) {
        Boolean result = redisTemplate.opsForValue()
            .setIfAbsent(
                "idempotency:" + idempotencyKey,
                "processed",
                TTL
            );
        return Boolean.TRUE.equals(result);
    }
}

8. Dead Letter Queue Strategy

When message processing fails, infinite retries can paralyze the system. DLQ (Dead Letter Queue) isolates unprocessable messages into a separate queue to ensure system stability.

8-1. Retry with Exponential Backoff

// === Spring Kafka Retry + DLQ Configuration ===

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public DefaultErrorHandler errorHandler(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // Recoverer that sends to DLQ
        DeadLetterPublishingRecoverer recoverer =
            new DeadLetterPublishingRecoverer(kafkaTemplate,
                (record, ex) -> new TopicPartition(
                    record.topic() + ".DLQ", record.partition()));

        // Exponential Backoff retry configuration
        ExponentialBackOff backOff = new ExponentialBackOff();
        backOff.setInitialInterval(1000L);    // 1 second
        backOff.setMultiplier(2.0);            // 2x increase
        backOff.setMaxInterval(60000L);        // max 60 seconds
        backOff.setMaxElapsedTime(300000L);    // max 5 minutes

        DefaultErrorHandler errorHandler =
            new DefaultErrorHandler(recoverer, backOff);

        // Non-retryable exceptions (business errors)
        errorHandler.addNotRetryableExceptions(
            InvalidOrderException.class,
            DuplicateEventException.class
        );

        return errorHandler;
    }
}

8-2. DLQ Monitoring and Alerting

// === DLQ Monitoring Service ===

@Component
@RequiredArgsConstructor
public class DlqMonitoringService {

    private final MeterRegistry meterRegistry;
    private final AlertService alertService;

    @KafkaListener(topics = "order-events.DLQ", groupId = "dlq-monitor")
    public void monitorDlq(
            ConsumerRecord<String, Object> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String errorMessage,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic) {

        // Record metrics
        meterRegistry.counter("dlq.messages.received",
            "originalTopic", originalTopic,
            "errorType", classifyError(errorMessage)
        ).increment();

        // Send alert
        alertService.sendAlert(DlqAlert.builder()
            .originalTopic(originalTopic)
            .messageKey(record.key())
            .errorMessage(errorMessage)
            .timestamp(Instant.ofEpochMilli(record.timestamp()))
            .severity(classifySeverity(errorMessage))
            .build()
        );

        log.error("DLQ message received: topic={}, key={}, error={}",
            originalTopic, record.key(), errorMessage);
    }
}

8-3. DLQ Reprocessing Pipeline

// === DLQ Message Reprocessing ===

@RestController
@RequestMapping("/api/dlq")
@RequiredArgsConstructor
public class DlqReprocessController {

    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final DlqMessageRepository dlqRepository;

    // Single message reprocessing
    @PostMapping("/reprocess/single")
    public ResponseEntity<String> reprocessSingle(
            @RequestParam String messageId) {
        DlqMessage message = dlqRepository.findById(messageId).orElseThrow();

        // Republish to original topic
        kafkaTemplate.send(
            message.getOriginalTopic(),
            message.getKey(),
            message.getValue()
        );

        message.setStatus("REPROCESSED");
        message.setReprocessedAt(Instant.now());
        dlqRepository.save(message);

        return ResponseEntity.ok("Reprocessing requested: " + messageId);
    }

    // Batch reprocessing
    @PostMapping("/reprocess/batch")
    public ResponseEntity<String> reprocessBatch(
            @RequestParam String originalTopic,
            @RequestParam(defaultValue = "100") int batchSize) {
        List<DlqMessage> messages = dlqRepository
            .findByOriginalTopicAndStatus(originalTopic, "PENDING",
                PageRequest.of(0, batchSize));

        int count = 0;
        for (DlqMessage message : messages) {
            kafkaTemplate.send(message.getOriginalTopic(),
                message.getKey(), message.getValue());
            message.setStatus("REPROCESSED");
            count++;
        }
        dlqRepository.saveAll(messages);

        return ResponseEntity.ok(count + " messages reprocessing requested");
    }
}

9. Exactly-Once Semantics

There are three levels of message delivery guarantees:

Guarantee LevelDescriptionComplexity
At-most-onceDelivered at most once (may be lost)Low
At-least-onceDelivered at least once (may be duplicated)Medium
Exactly-onceDelivered exactly onceHigh

9-1. Kafka Exactly-Once Implementation

Kafka supports Exactly-Once Semantics (EOS) since version 0.11.

// === Kafka Exactly-Once Configuration ===

@Configuration
public class KafkaExactlyOnceConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTransactionManager<String, Object> kafkaTransactionManager(
            ProducerFactory<String, Object> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new DefaultKafkaConsumerFactory<>(config);
    }
}

9-2. End-to-End Exactly-Once

True End-to-End Exactly-Once must guarantee from Producer through Consumer to the final state persistence.

End-to-End Exactly-Once Chain:

Producer (idempotent + transactions)
    -> Kafka Broker (transaction log)
        -> Consumer (read_committed + manual offset)
            -> Database (idempotency key + business logic)

All links in the chain must work together for Exactly-Once guarantee
// === End-to-End Exactly-Once Processing ===

@Component
@RequiredArgsConstructor
public class ExactlyOnceOrderProcessor {

    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final OrderRepository orderRepository;
    private final ProcessedOffsetRepository offsetRepository;

    @KafkaListener(topics = "payment-completed")
    @Transactional("chainedKafkaTransactionManager")
    public void processPaymentCompleted(
            ConsumerRecord<String, PaymentCompletedEvent> record) {

        String offsetKey = record.topic() + "-" +
            record.partition() + "-" + record.offset();

        // Check if offset already processed
        if (offsetRepository.existsById(offsetKey)) {
            return;
        }

        // Business logic
        PaymentCompletedEvent event = record.value();
        Order order = orderRepository.findById(event.orderId()).orElseThrow();
        order.markAsPaid(event.paymentId());
        orderRepository.save(order);

        // Publish next step event (same Kafka transaction)
        kafkaTemplate.send("order-paid",
            new OrderPaidEvent(order.getId(), event.paymentId()));

        // Record processed offset (same DB transaction)
        offsetRepository.save(new ProcessedOffset(offsetKey, Instant.now()));
    }
}

10. Reactive Streams vs Virtual Threads (2025)

10-1. Project Reactor: Backpressure as Core Design

Reactive Streams have non-blocking I/O and backpressure mechanisms as core design principles.

// === Reactor-based Async Processing ===

@Service
public class ReactiveOrderService {

    public Flux<OrderEvent> processOrderStream() {
        return kafkaReceiver.receive()
            .flatMap(record -> {
                OrderEvent event = record.value();
                return processEvent(event)
                    .then(record.receiverOffset().commit())
                    .thenReturn(event);
            }, 16)  // Concurrency limit (backpressure)
            .onErrorResume(e -> {
                log.error("Processing failed", e);
                return Mono.empty();
            })
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
    }

    private Mono<Void> processEvent(OrderEvent event) {
        return Mono.fromCallable(() -> {
            // Business logic
            return null;
        }).subscribeOn(Schedulers.boundedElastic()).then();
    }
}

10-2. Java 21 Virtual Threads: Simpler Code

Virtual Threads achieve high concurrency while maintaining traditional synchronous code style.

// === Virtual Threads-based Async Processing ===

@Configuration
public class VirtualThreadConfig {

    @Bean
    public TomcatProtocolHandlerCustomizer<?> protocolHandlerCustomizer() {
        return protocolHandler -> {
            protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
        };
    }
}

@Service
public class VirtualThreadOrderService {

    // Synchronous code but running on Virtual Threads
    // -> blocking I/O only blocks Virtual Thread, not OS thread
    public OrderResult processOrder(UUID orderId) {
        // Parallel processing: using StructuredTaskScope
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

            Subtask<PaymentResult> paymentTask = scope.fork(() ->
                paymentService.processPayment(orderId)
            );
            Subtask<InventoryResult> inventoryTask = scope.fork(() ->
                inventoryService.checkInventory(orderId)
            );

            scope.join();
            scope.throwIfFailed();

            return new OrderResult(
                paymentTask.get(),
                inventoryTask.get()
            );
        }
    }
}

10-3. Selection Guide: Decision Matrix

CriteriaReactive (Reactor/WebFlux)Virtual Threads
Code complexityHigh (reactive operator learning)Low (existing sync code)
Backpressure controlBuilt-in (Flux/Mono)Manual implementation needed
DebuggingDifficult (complex stack traces)Easy (traditional stack traces)
Memory efficiencyExcellentGood (better than OS threads)
I/O-intensive workloadsOptimalVery good
CPU-intensive workloadsNot idealNot ideal (use platform threads)
Legacy code migrationComplete rewrite neededConfiguration change only
Ecosystem maturityHigh (5+ years)Growing (2023~)

Recommendations:

  • New project + simple I/O: Virtual Threads
  • Streaming data processing: Reactive Streams
  • Existing Spring MVC project: Virtual Threads (minimal migration cost)
  • Real-time data pipeline: Reactive Streams

11. Production Architecture: Payment System Design

A complete e-commerce payment system architecture combining all patterns.

                     E-commerce Payment System Architecture
                     ======================================

[Client] -> [API Gateway]
                |
                v
        [Order Service] --- Write DB (PostgreSQL)
           |                     |
           | (Outbox Pattern)    | (CDC - Debezium)
           |                     v
           |              [Kafka: order-events]
           |                /        |        \
           v               v        v         v
    [Saga Orchestrator]  [Payment] [Inventory] [Notification]
    (Temporal)           Service   Service     Service
           |                |        |
           | commands       |        |
           v                v        v
    [payment-commands]  [payment-  [inventory-
                        events]    events]
                           |
                    [DLQ + Monitoring]
                           |
                    [Alerting (PagerDuty)]

Read Side:
    [Kafka: order-events] -> [Order Projection] -> Read DB (MongoDB)
                           -> [Analytics Service] -> Data Warehouse
                           -> [Search Service] -> Elasticsearch

Applied Patterns Summary

PatternApplied AtPurpose
CQRSOrder ServiceSeparate order read/write
Saga (Orchestration)Temporal OrchestratorPayment-inventory-shipping transaction mgmt
Event SourcingOrder AggregateOrder state change history
Outbox PatternOrder Service to KafkaDB-message atomicity guarantee
IdempotencyAll ConsumersPrevent duplicate message processing
DLQAll Kafka ConsumersIsolate failed messages
Exactly-OncePayment processingPrevent double charging

Quiz

Q1. When the Write Model and Read Model in CQRS use different databases, what is the most commonly used approach to ensure data consistency between the two models?

Answer: Event-based asynchronous synchronization (Eventual Consistency)

When the Write Model changes state, it publishes events. The Read Model's projection handler subscribes to these events and updates the Read DB. There may be temporary inconsistency (ms to seconds) between the two models, but consistency is eventually guaranteed.

If strong consistency is required, reconsider whether CQRS is the right fit.

Q2. In a Choreography Saga, when Service A publishes an event and Service B fails to process it, how does Service A become aware of the failure?

Answer: Service B publishes a failure event (e.g., PaymentFailed), and Service A subscribes to it to become aware. In Choreography, each service must publish both success and failure events, and related services subscribe to respond appropriately.

This is one of the downsides of Choreography. As the number of services grows, event flow tracking becomes increasingly complex, and timeout-based detection may be needed as an additional mechanism.

Q3. When an Aggregate in Event Sourcing has 1 million accumulated events, what is the key technique to solve the performance issue of loading it?

Answer: Snapshot Pattern

Periodically (e.g., every 100 events), save the current state of the Aggregate as a snapshot. When loading, start from the most recent snapshot and only replay events after that point.

Example: Out of 1 million events, if the last snapshot is at event 999,900, only 100 events need to be replayed.

Q4. In the Outbox Pattern, what is the main reason the CDC (Change Data Capture) approach is preferred over the Polling approach?

Answer: CDC reads the DB transaction log (WAL) directly, detecting changes in near real-time (milliseconds). Polling executes queries periodically, causing higher latency (seconds to minutes) and additional DB load from frequent queries.

Additionally, CDC delivers events in WAL order, naturally preserving ordering, and since it reads the transaction log, there is virtually no additional query overhead.

Q5. To achieve Exactly-Once Semantics in Kafka, what configuration is needed at the Producer, Broker, and Consumer levels respectively?

Answer:

  • Producer: enable.idempotence=true + transactional.id configuration (idempotent producer + transactions)
  • Broker: Transaction log maintenance (enabled by default)
  • Consumer: isolation.level=read_committed + manual offset commit + idempotent processing

For End-to-End Exactly-Once, the Consumer must also ensure idempotency when saving results to the DB. Kafka transactions only guarantee Exactly-Once within Kafka; to include external systems (DB), consumer-side idempotency is essential.


References

  1. Martin Fowler — "Event Sourcing" (martinfowler.com)
  2. Chris Richardson — "Microservices Patterns" (Manning, 2018)
  3. Vaughn Vernon — "Implementing Domain-Driven Design" (Addison-Wesley, 2013)
  4. Greg Young — "CQRS Documents" (cqrs.files.wordpress.com)
  5. Apache Kafka Documentation — "Exactly-Once Semantics" (kafka.apache.org)
  6. Debezium Documentation — "Outbox Event Router" (debezium.io)
  7. Temporal.io Documentation — "Saga Pattern with Temporal" (docs.temporal.io)
  8. Netflix Tech Blog — "Evolution of the Netflix Data Pipeline" (netflixtechblog.com)
  9. Uber Engineering Blog — "Building Reliable Reprocessing and Dead Letter Queues" (eng.uber.com)
  10. Confluent Blog — "Exactly-Once Semantics Are Possible" (confluent.io)
  11. AWS Architecture Blog — "Saga Orchestration Pattern" (aws.amazon.com)
  12. Microsoft Azure Docs — "CQRS Pattern" (learn.microsoft.com)
  13. Spring for Apache Kafka Reference — "Error Handling and DLQ" (docs.spring.io)
  14. JEP 444 — "Virtual Threads" (openjdk.org)
  15. Project Reactor Reference — "Backpressure" (projectreactor.io)
  16. Pat Helland — "Idempotence Is Not a Medical Condition" (ACM Queue, 2012)