Skip to content
Published on

비동기 처리 패턴 완전 정복: CQRS, Saga, Event Sourcing — 시니어 백엔드 개발자의 필수 무기

Authors

1. 왜 비동기인가? 대규모 시스템의 생존 전략

현대 대규모 시스템은 동기식 요청-응답 모델로는 감당할 수 없는 규모의 트래픽을 처리합니다.

실제 기업 사례:

기업규모비동기 처리 방식
Netflix2.6억 구독자, 초당 수백만 이벤트Apache Kafka 기반 이벤트 스트리밍
Uber일일 페타바이트 실시간 데이터Apache Kafka + Apache Flink
LinkedIn7조+ 메시지/일 Kafka 처리자체 개발 Kafka 인프라
Shopify블랙프라이데이 초당 80만 요청비동기 주문 처리 파이프라인

동기식 아키텍처에서 서비스 A가 서비스 B를 호출하고, B가 C를 호출하면 전체 레이턴시는 각 서비스 호출 시간의 합이 됩니다. 하나의 서비스가 느려지면 전체 체인이 블로킹됩니다.

비동기 아키텍처에서는 메시지 브로커를 통해 서비스 간 결합도를 낮추고, 각 서비스가 독립적으로 확장 가능합니다.

동기식:  ClientA(50ms)B(100ms)C(200ms) = 350ms 총 대기
비동기식: ClientA(50ms)Message Broker  (B, C 병렬 처리)
         Client는 50ms 후 응답, BC는 비동기로 처리

비동기 전환의 핵심 이점:

  1. 시간적 결합 제거 — 생산자와 소비자가 동시에 가동될 필요 없음
  2. 독립적 확장 — 읽기와 쓰기 워크로드를 별도로 스케일링
  3. 장애 격리 — 하나의 서비스 장애가 전체에 전파되지 않음
  4. 피크 트래픽 흡수 — 메시지 큐가 버퍼 역할

그러나 비동기는 공짜가 아닙니다. Eventual Consistency(최종 일관성), 메시지 순서 보장, 중복 처리, 분산 트랜잭션 같은 새로운 문제가 생깁니다. 이 글에서 다루는 패턴들은 바로 이 문제들에 대한 해답입니다.


2. Event-Driven Architecture 기초

이벤트 주도 아키텍처(EDA)는 비동기 패턴의 토대입니다. 시스템의 상태 변경을 "이벤트"로 표현하고, 관심 있는 서비스가 이를 구독하여 반응합니다.

2-1. Domain Events vs Integration Events

구분Domain EventIntegration Event
범위단일 바운디드 컨텍스트 내부바운디드 컨텍스트 간
예시OrderItemAddedOrderPlaced
전달 방식인메모리 이벤트 버스메시지 브로커 (Kafka, RabbitMQ)
스키마 관리내부적이므로 자유롭게 변경 가능계약(Contract) 관리 필수
// Domain Event - 바운디드 컨텍스트 내부
public record OrderItemAdded(
    UUID orderId,
    UUID productId,
    int quantity,
    BigDecimal price,
    Instant occurredAt
) implements DomainEvent {}

// Integration Event - 외부 서비스에 발행
public record OrderPlacedEvent(
    UUID orderId,
    UUID customerId,
    BigDecimal totalAmount,
    List<OrderLineItem> items,
    Instant occurredAt,
    int version  // 스키마 버전 관리
) implements IntegrationEvent {}

2-2. Event Bus vs Event Store

Event Bus (예: Kafka, RabbitMQ)는 이벤트를 전달하는 파이프라인입니다. 소비자가 이벤트를 수신하면 보통 브로커에서 삭제(또는 TTL 이후 만료)됩니다.

Event Store는 모든 이벤트를 영구적으로 보관하는 저장소입니다. 이벤트를 재생(replay)하여 어떤 시점의 상태든 복원할 수 있습니다.

Event Bus (Kafka):
  Producer[Topic: orders]Consumer Group A (주문 서비스)
Consumer Group B (알림 서비스)
Consumer Group C (분석 서비스)

Event Store:
  [Event #1: OrderCreated]
  [Event #2: ItemAdded]
  [Event #3: PaymentReceived]
  [Event #4: OrderShipped]
  → 이벤트 #1~#4를 재생하면 현재 주문 상태 복원 가능

2-3. Eventual Consistency 이해

분산 시스템에서 강한 일관성(Strong Consistency)을 유지하려면 분산 잠금, 2PC(2-Phase Commit) 등이 필요하지만 성능과 가용성이 크게 떨어집니다.

CAP 정리에 따라, 네트워크 파티션 상황에서 일관성(C)과 가용성(A) 중 하나만 선택해야 합니다. 대부분의 대규모 시스템은 가용성(A)을 선택하고 Eventual Consistency를 수용합니다.

Eventual Consistency란 "지금 당장은 데이터가 불일치할 수 있지만, 충분한 시간이 지나면 모든 노드가 동일한 상태로 수렴한다"는 것입니다.

실무에서의 의미:

  • 주문 후 재고가 즉시 줄어들지 않을 수 있음 (수 밀리초~수 초 지연)
  • 사용자에게 "주문이 처리 중입니다" 상태를 먼저 보여주고, 확정 후 업데이트
  • 일관성 창(Consistency Window)을 최소화하는 것이 핵심

3. CQRS 딥다이브

CQRS(Command Query Responsibility Segregation)는 읽기와 쓰기 모델을 완전히 분리하는 패턴입니다.

3-1. 기본 개념

전통적인 CRUD 모델에서는 하나의 데이터 모델이 읽기와 쓰기를 모두 담당합니다. 하지만 실제로 읽기와 쓰기는 매우 다른 요구사항을 가집니다.

전통적 CRUD:
  Client[단일 모델]Database
           (읽기/쓰기 모두)

CQRS:
  Client ──Command──→ [Write Model]Command DB (정규화)
Events
  Client ←──Query───← [Read Model]Read DB (비정규화, 최적화)

Write Model (명령 측):

  • 비즈니스 규칙 검증
  • 도메인 불변식(Invariant) 보장
  • 정규화된 스키마
  • 강한 일관성

Read Model (질의 측):

  • 조회에 최적화된 비정규화 뷰
  • 다양한 저장소 사용 가능 (Elasticsearch, Redis, MongoDB)
  • Eventual Consistency 허용

3-2. 언제 CQRS를 사용해야 하는가?

적합한 경우:

  • 읽기/쓰기 비율이 극단적으로 다를 때 (읽기가 100:1 이상)
  • 복잡한 도메인 로직 + 다양한 조회 요구사항
  • 읽기와 쓰기의 스케일링 요구가 다를 때
  • Event Sourcing과 함께 사용할 때

부적합한 경우:

  • 단순한 CRUD 애플리케이션
  • 강한 일관성이 절대적으로 필요한 경우
  • 도메인이 단순하고 조회 패턴이 예측 가능한 경우

3-3. Spring Boot + Kafka 구현

// === Command 측: 주문 생성 ===

@Service
@RequiredArgsConstructor
public class OrderCommandService {

    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    @Transactional
    public UUID createOrder(CreateOrderCommand command) {
        // 1. 비즈니스 규칙 검증
        validateOrder(command);

        // 2. 도메인 모델에서 처리
        Order order = Order.create(
            command.customerId(),
            command.items(),
            command.shippingAddress()
        );

        // 3. Write DB에 저장
        orderRepository.save(order);

        // 4. 이벤트 발행 (Read Model 동기화용)
        OrderCreatedEvent event = new OrderCreatedEvent(
            order.getId(),
            order.getCustomerId(),
            order.getTotalAmount(),
            order.getItems().stream()
                .map(this::toEventItem)
                .toList(),
            Instant.now()
        );
        kafkaTemplate.send("order-events", order.getId().toString(), event);

        return order.getId();
    }

    private void validateOrder(CreateOrderCommand command) {
        if (command.items().isEmpty()) {
            throw new InvalidOrderException("주문 항목이 비어있습니다");
        }
        // 재고 확인, 고객 상태 확인 등
    }
}
// === Query 측: 주문 조회 프로젝션 ===

@Service
@RequiredArgsConstructor
public class OrderQueryService {

    private final OrderReadRepository readRepository;

    public OrderDetailView getOrderDetail(UUID orderId) {
        return readRepository.findById(orderId)
            .orElseThrow(() -> new OrderNotFoundException(orderId));
    }

    public Page<OrderSummaryView> getCustomerOrders(
            UUID customerId, Pageable pageable) {
        return readRepository.findByCustomerId(customerId, pageable);
    }
}

// Read Model - 조회에 최적화된 비정규화 뷰
@Document(collection = "order_views")
public record OrderDetailView(
    UUID orderId,
    UUID customerId,
    String customerName,      // 비정규화: 고객명 포함
    String customerEmail,
    BigDecimal totalAmount,
    String status,
    List<OrderItemView> items,
    AddressView shippingAddress,
    Instant createdAt,
    Instant lastUpdatedAt
) {}
// === 이벤트 핸들러: Write → Read 동기화 ===

@Component
@RequiredArgsConstructor
public class OrderProjectionHandler {

    private final OrderReadRepository readRepository;
    private final CustomerQueryClient customerClient;

    @KafkaListener(topics = "order-events", groupId = "order-projection")
    public void handleOrderEvent(OrderEvent event) {
        switch (event) {
            case OrderCreatedEvent e -> handleOrderCreated(e);
            case OrderStatusChangedEvent e -> handleStatusChanged(e);
            case OrderCancelledEvent e -> handleOrderCancelled(e);
            default -> log.warn("알 수 없는 이벤트 타입: {}", event.getClass());
        }
    }

    private void handleOrderCreated(OrderCreatedEvent event) {
        // 고객 정보 조회하여 비정규화된 뷰 생성
        CustomerInfo customer = customerClient.getCustomer(event.customerId());

        OrderDetailView view = new OrderDetailView(
            event.orderId(),
            event.customerId(),
            customer.name(),
            customer.email(),
            event.totalAmount(),
            "CREATED",
            event.items().stream().map(this::toItemView).toList(),
            null,
            event.occurredAt(),
            event.occurredAt()
        );
        readRepository.save(view);
    }
}

3-4. Read Model 동기화 전략

전략지연일관성복잡도적합한 경우
이벤트 핸들러 (Kafka)수 ms ~ 수 초Eventual중간대부분의 경우
CDC (Debezium)수 초Eventual높음레거시 시스템 연동
폴링 (Scheduled)수 초 ~ 수 분Eventual낮음단순한 경우
동기식 업데이트즉시Strong낮음성능 요구 낮을 때

4. Saga 패턴 완전 정복

마이크로서비스에서 여러 서비스에 걸친 비즈니스 트랜잭션은 어떻게 관리할까요? 기존 모놀리스의 DB 트랜잭션은 사용할 수 없습니다. Saga 패턴은 분산 트랜잭션을 일련의 로컬 트랜잭션으로 분해합니다.

4-1. Choreography (안무형)

Choreography 방식에서는 중앙 조율자가 없습니다. 각 서비스가 이벤트를 발행하면 다음 서비스가 해당 이벤트를 구독하여 자신의 로컬 트랜잭션을 수행합니다.

이커머스 주문 흐름 (Choreography):

Order Service ──OrderPlaced──→ Payment Service
                            PaymentCompleted
                              Inventory Service
                            InventoryReserved
                              Shipping Service
                            ShipmentCreated
                              Order Service (상태 업데이트)

실패  (보상 트랜잭션):
Inventory Service ──InventoryReservationFailed──→ Payment Service (환불)
                                                 PaymentRefunded
                                                 Order Service (주문 취소)
// === Choreography Saga: 주문 서비스 ===

@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Transactional
    public UUID placeOrder(PlaceOrderCommand command) {
        Order order = Order.create(command);
        order.setStatus(OrderStatus.PENDING);
        orderRepository.save(order);

        // 이벤트 발행 - 결제 서비스가 구독
        kafkaTemplate.send("order-events", new OrderPlacedEvent(
            order.getId(),
            order.getCustomerId(),
            order.getTotalAmount(),
            order.getItems()
        ));

        return order.getId();
    }

    // 결제 완료 이벤트 수신
    @KafkaListener(topics = "payment-events", groupId = "order-service")
    public void onPaymentCompleted(PaymentCompletedEvent event) {
        Order order = orderRepository.findById(event.orderId()).orElseThrow();
        order.setStatus(OrderStatus.PAID);
        orderRepository.save(order);
    }

    // 결제 실패 이벤트 수신
    @KafkaListener(topics = "payment-events", groupId = "order-service")
    public void onPaymentFailed(PaymentFailedEvent event) {
        Order order = orderRepository.findById(event.orderId()).orElseThrow();
        order.setStatus(OrderStatus.CANCELLED);
        order.setCancellationReason(event.reason());
        orderRepository.save(order);
    }
}
// === Choreography Saga: 결제 서비스 ===

@Service
@RequiredArgsConstructor
public class PaymentService {

    private final PaymentRepository paymentRepository;
    private final PaymentGateway paymentGateway;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @KafkaListener(topics = "order-events", groupId = "payment-service")
    @Transactional
    public void onOrderPlaced(OrderPlacedEvent event) {
        try {
            // 결제 처리
            PaymentResult result = paymentGateway.charge(
                event.customerId(),
                event.totalAmount()
            );

            Payment payment = Payment.create(
                event.orderId(),
                event.customerId(),
                event.totalAmount(),
                result.transactionId()
            );
            paymentRepository.save(payment);

            // 성공 이벤트 발행 - 재고 서비스가 구독
            kafkaTemplate.send("payment-events", new PaymentCompletedEvent(
                event.orderId(),
                payment.getId(),
                result.transactionId()
            ));

        } catch (PaymentException e) {
            // 실패 이벤트 발행 - 주문 서비스가 구독하여 취소
            kafkaTemplate.send("payment-events", new PaymentFailedEvent(
                event.orderId(),
                e.getMessage()
            ));
        }
    }
}

Choreography 장단점:

장점단점
서비스 간 느슨한 결합전체 흐름 파악이 어려움
각 서비스가 독립적으로 배포 가능디버깅이 복잡함
단일 장애 지점(SPOF) 없음순환 의존성 발생 가능
구현이 비교적 단순서비스가 많아지면 이벤트 추적 난이도 상승

4-2. Orchestration (지휘형)

Orchestration 방식에서는 중앙 조율자(Orchestrator)가 전체 워크플로우를 관리합니다. 각 서비스에 명령을 보내고, 응답에 따라 다음 단계를 결정합니다.

이커머스 주문 흐름 (Orchestration):

                    Order Saga Orchestrator
                   /          |          \
                  ↓           ↓           ↓
          Payment Service  Inventory  Shipping
          (charge)         (reserve)  (create)
                  ↓           ↓           
          (result)        (result)    (result)
                   \          |          /
                    Order Saga Orchestrator
                         (최종 결정)
// === Orchestration Saga: Temporal 프레임워크 사용 ===

@WorkflowInterface
public interface OrderSagaWorkflow {

    @WorkflowMethod
    OrderResult processOrder(PlaceOrderCommand command);
}

@WorkflowImpl(workflows = OrderSagaWorkflow.class)
public class OrderSagaWorkflowImpl implements OrderSagaWorkflow {

    // 각 서비스의 Activity 참조
    private final PaymentActivities paymentActivities =
        Workflow.newActivityStub(PaymentActivities.class,
            ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofSeconds(30))
                .setRetryOptions(RetryOptions.newBuilder()
                    .setMaximumAttempts(3)
                    .build())
                .build());

    private final InventoryActivities inventoryActivities =
        Workflow.newActivityStub(InventoryActivities.class,
            ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofSeconds(30))
                .build());

    private final ShippingActivities shippingActivities =
        Workflow.newActivityStub(ShippingActivities.class,
            ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofSeconds(60))
                .build());

    @Override
    public OrderResult processOrder(PlaceOrderCommand command) {
        String paymentId = null;
        String reservationId = null;

        try {
            // Step 1: 결제
            paymentId = paymentActivities.processPayment(
                command.customerId(),
                command.totalAmount()
            );

            // Step 2: 재고 예약
            reservationId = inventoryActivities.reserveInventory(
                command.items()
            );

            // Step 3: 배송 생성
            String shipmentId = shippingActivities.createShipment(
                command.orderId(),
                command.shippingAddress()
            );

            return OrderResult.success(command.orderId(), shipmentId);

        } catch (Exception e) {
            // 보상 트랜잭션 실행
            compensate(paymentId, reservationId);
            return OrderResult.failed(command.orderId(), e.getMessage());
        }
    }

    private void compensate(String paymentId, String reservationId) {
        // 역순으로 보상 트랜잭션 실행
        if (reservationId != null) {
            inventoryActivities.cancelReservation(reservationId);
        }
        if (paymentId != null) {
            paymentActivities.refundPayment(paymentId);
        }
    }
}
// === Activity 구현 ===

@ActivityInterface
public interface PaymentActivities {
    String processPayment(UUID customerId, BigDecimal amount);
    void refundPayment(String paymentId);
}

@Component
@RequiredArgsConstructor
public class PaymentActivitiesImpl implements PaymentActivities {

    private final PaymentGateway paymentGateway;
    private final PaymentRepository paymentRepository;

    @Override
    public String processPayment(UUID customerId, BigDecimal amount) {
        PaymentResult result = paymentGateway.charge(customerId, amount);
        Payment payment = Payment.create(customerId, amount, result.transactionId());
        paymentRepository.save(payment);
        return payment.getId().toString();
    }

    @Override
    public void refundPayment(String paymentId) {
        Payment payment = paymentRepository.findById(UUID.fromString(paymentId))
            .orElseThrow();
        paymentGateway.refund(payment.getTransactionId());
        payment.setStatus(PaymentStatus.REFUNDED);
        paymentRepository.save(payment);
    }
}

Orchestration 장단점:

장점단점
전체 워크플로우가 한 곳에서 관리됨오케스트레이터가 SPOF 될 수 있음
디버깅과 모니터링이 용이서비스 간 결합도가 Choreography보다 높음
복잡한 보상 로직 관리가 쉬움오케스트레이터에 로직이 집중될 위험
Temporal/Cadence 등 프레임워크 지원추가 인프라(워크플로우 엔진) 필요

4-3. Choreography vs Orchestration 선택 가이드

기준ChoreographyOrchestration
서비스 수3~4개 이하5개 이상
워크플로우 복잡도선형적, 단순조건 분기, 병렬 처리
관찰 가능성 요구낮음높음
팀 구조독립적 팀중앙 플랫폼 팀
보상 로직단순복잡 (다중 경로)

4-4. Compensating Transactions (보상 트랜잭션)

보상 트랜잭션은 이미 커밋된 트랜잭션을 의미론적으로 되돌리는 작업입니다. 물리적 롤백이 아니라 역작용(reverse action)을 수행합니다.

// 보상 트랜잭션 예시
public enum CompensationAction {
    PAYMENT_CHARGE    -> PAYMENT_REFUND,
    INVENTORY_RESERVE -> INVENTORY_RELEASE,
    SHIPPING_CREATE   -> SHIPPING_CANCEL,
    COUPON_APPLY      -> COUPON_RESTORE,
    POINTS_DEDUCT     -> POINTS_RESTORE
}

Semantic Compensation vs Exact Compensation:

  • Exact Compensation: 정확히 원래 상태로 복원 (예: 결제 취소 → 전액 환불)
  • Semantic Compensation: 비즈니스적으로 의미 있는 보상 (예: 항공권 취소 → 부분 환불 + 크레딧)

실무에서는 Semantic Compensation이 더 일반적입니다. 시간이 지나면 정확한 복원이 불가능한 경우가 많기 때문입니다.


5. Event Sourcing

Event Sourcing은 엔티티의 현재 상태를 저장하는 대신, 상태 변경을 일으킨 모든 이벤트를 순서대로 저장하는 패턴입니다.

5-1. 핵심 개념

전통적 CRUD:
  Account 테이블: id=1, balance=750

Event Sourcing:
  Event #1: AccountCreated(id=1, initialBalance=1000)
  Event #2: MoneyWithdrawn(id=1, amount=200)
  Event #3: MoneyDeposited(id=1, amount=150)
  Event #4: MoneyWithdrawn(id=1, amount=200)
  → 재생: 1000 - 200 + 150 - 200 = 750 (현재 잔고)

모든 상태 변경이 불변(immutable) 이벤트 로그에 추가(append-only)됩니다. 삭제나 수정은 없습니다.

5-2. Event Store 구현

// === Event Store 인터페이스 ===

public interface EventStore {
    void saveEvents(UUID aggregateId, List<DomainEvent> events, int expectedVersion);
    List<DomainEvent> getEvents(UUID aggregateId);
    List<DomainEvent> getEvents(UUID aggregateId, int fromVersion);
}

// === PostgreSQL 기반 Event Store 구현 ===

@Repository
@RequiredArgsConstructor
public class PostgresEventStore implements EventStore {

    private final JdbcTemplate jdbcTemplate;
    private final ObjectMapper objectMapper;

    @Override
    @Transactional
    public void saveEvents(UUID aggregateId, List<DomainEvent> events,
                          int expectedVersion) {
        // 낙관적 동시성 제어
        Integer currentVersion = jdbcTemplate.queryForObject(
            "SELECT COALESCE(MAX(version), 0) FROM event_store WHERE aggregate_id = ?",
            Integer.class, aggregateId
        );

        if (currentVersion != expectedVersion) {
            throw new ConcurrencyException(
                "동시성 충돌: 예상 버전 " + expectedVersion +
                ", 실제 버전 " + currentVersion
            );
        }

        int version = expectedVersion;
        for (DomainEvent event : events) {
            version++;
            jdbcTemplate.update(
                """
                INSERT INTO event_store
                (event_id, aggregate_id, aggregate_type, event_type,
                 event_data, version, created_at)
                VALUES (?, ?, ?, ?, ?::jsonb, ?, ?)
                """,
                UUID.randomUUID(),
                aggregateId,
                event.getAggregateType(),
                event.getClass().getSimpleName(),
                objectMapper.writeValueAsString(event),
                version,
                Instant.now()
            );
        }
    }

    @Override
    public List<DomainEvent> getEvents(UUID aggregateId) {
        return jdbcTemplate.query(
            "SELECT * FROM event_store WHERE aggregate_id = ? ORDER BY version",
            this::mapRowToEvent,
            aggregateId
        );
    }
}
-- Event Store 테이블 스키마
CREATE TABLE event_store (
    event_id       UUID PRIMARY KEY,
    aggregate_id   UUID NOT NULL,
    aggregate_type VARCHAR(255) NOT NULL,
    event_type     VARCHAR(255) NOT NULL,
    event_data     JSONB NOT NULL,
    version        INTEGER NOT NULL,
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    UNIQUE (aggregate_id, version)  -- 낙관적 동시성 제어
);

CREATE INDEX idx_event_store_aggregate ON event_store (aggregate_id, version);
CREATE INDEX idx_event_store_type ON event_store (event_type, created_at);

5-3. Aggregate에서 이벤트 적용

// === Event-Sourced Aggregate ===

public class OrderAggregate {

    private UUID id;
    private UUID customerId;
    private OrderStatus status;
    private BigDecimal totalAmount;
    private List<OrderItem> items = new ArrayList<>();
    private int version = 0;

    // 미적용 이벤트 목록
    private final List<DomainEvent> uncommittedEvents = new ArrayList<>();

    // 이벤트 재생으로 상태 복원
    public static OrderAggregate fromHistory(List<DomainEvent> history) {
        OrderAggregate aggregate = new OrderAggregate();
        for (DomainEvent event : history) {
            aggregate.apply(event, false);
        }
        return aggregate;
    }

    // 새 명령 처리
    public void placeOrder(UUID customerId, List<OrderItem> items) {
        if (this.status != null) {
            throw new IllegalStateException("이미 생성된 주문입니다");
        }

        BigDecimal total = items.stream()
            .map(item -> item.price().multiply(BigDecimal.valueOf(item.quantity())))
            .reduce(BigDecimal.ZERO, BigDecimal::add);

        apply(new OrderPlacedEvent(this.id, customerId, total, items, Instant.now()), true);
    }

    public void cancelOrder(String reason) {
        if (this.status == OrderStatus.SHIPPED) {
            throw new IllegalStateException("배송된 주문은 취소할 수 없습니다");
        }
        apply(new OrderCancelledEvent(this.id, reason, Instant.now()), true);
    }

    // 이벤트 적용 (상태 변경)
    private void apply(DomainEvent event, boolean isNew) {
        when(event);  // 상태 변경
        this.version++;
        if (isNew) {
            uncommittedEvents.add(event);
        }
    }

    // 이벤트별 상태 변경 로직
    private void when(DomainEvent event) {
        switch (event) {
            case OrderPlacedEvent e -> {
                this.id = e.orderId();
                this.customerId = e.customerId();
                this.totalAmount = e.totalAmount();
                this.items = new ArrayList<>(e.items());
                this.status = OrderStatus.PLACED;
            }
            case OrderCancelledEvent e -> {
                this.status = OrderStatus.CANCELLED;
            }
            case OrderShippedEvent e -> {
                this.status = OrderStatus.SHIPPED;
            }
            default -> throw new IllegalArgumentException("알 수 없는 이벤트: " + event);
        }
    }

    public List<DomainEvent> getUncommittedEvents() {
        return Collections.unmodifiableList(uncommittedEvents);
    }

    public void clearUncommittedEvents() {
        uncommittedEvents.clear();
    }
}

5-4. Snapshot 패턴

이벤트가 수만 개 쌓이면 매번 전체를 재생하는 것은 비효율적입니다. Snapshot을 사용하여 특정 시점의 상태를 저장해두면 해당 시점 이후의 이벤트만 재생하면 됩니다.

// Snapshot 저장 및 복원
@Service
public class SnapshotService {

    private static final int SNAPSHOT_INTERVAL = 100; // 100개 이벤트마다

    public OrderAggregate loadAggregate(UUID aggregateId) {
        // 1. 최신 스냅샷 조회
        Optional<Snapshot> snapshot = snapshotRepository
            .findLatest(aggregateId);

        if (snapshot.isPresent()) {
            // 2. 스냅샷에서 상태 복원
            OrderAggregate aggregate = deserialize(snapshot.get().getData());
            int fromVersion = snapshot.get().getVersion();

            // 3. 스냅샷 이후 이벤트만 재생
            List<DomainEvent> newEvents = eventStore
                .getEvents(aggregateId, fromVersion + 1);
            for (DomainEvent event : newEvents) {
                aggregate.apply(event, false);
            }
            return aggregate;

        } else {
            // 스냅샷 없으면 전체 재생
            List<DomainEvent> allEvents = eventStore.getEvents(aggregateId);
            return OrderAggregate.fromHistory(allEvents);
        }
    }

    public void saveIfNeeded(OrderAggregate aggregate) {
        if (aggregate.getVersion() % SNAPSHOT_INTERVAL == 0) {
            snapshotRepository.save(new Snapshot(
                aggregate.getId(),
                aggregate.getVersion(),
                serialize(aggregate),
                Instant.now()
            ));
        }
    }
}

5-5. Event Sourcing vs CRUD 비교

기준CRUDEvent Sourcing
저장 방식현재 상태만 저장모든 상태 변경 이벤트 저장
감사 추적별도 구현 필요내장 (이벤트 자체가 감사 로그)
시간 여행불가능가능 (특정 시점 상태 복원)
저장소 용량상대적으로 적음이벤트 누적으로 증가
복잡도낮음높음 (이벤트 설계, 스냅샷 등)
동시성 처리비관적/낙관적 잠금이벤트 버전 기반 낙관적 제어
성능읽기 최적화 쉬움이벤트 재생 비용 (스냅샷으로 완화)
스키마 변경마이그레이션 필요이벤트 업캐스팅(Upcasting)
CQRS 연계선택사항자연스러운 조합

5-6. Event Sourcing을 사용하지 말아야 할 때

  • 단순한 CRUD 도메인 (블로그, 설정 관리)
  • 강한 일관성이 필수인 경우 (실시간 잔고 확인이 필요한 ATM)
  • 이벤트 설계가 불안정한 초기 단계 (잦은 변경 시 이벤트 업캐스팅 비용)
  • 팀의 경험이 부족한 경우 (학습 곡선이 가파름)
  • 규제 요건으로 데이터 삭제가 필요한 경우 (GDPR right to be forgotten과 충돌)

6. Outbox Pattern (신뢰성의 핵심)

6-1. 문제: 이중 쓰기 (Dual Write)

분산 시스템에서 가장 흔한 실수 중 하나는 데이터베이스 저장과 메시지 발행을 별도로 수행하는 것입니다.

// 위험한 코드: 이중 쓰기 문제
@Transactional
public void createOrder(Order order) {
    orderRepository.save(order);           // Step 1: DB 저장
    kafkaTemplate.send("orders", event);   // Step 2: 메시지 발행
    // Step 1 성공 + Step 2 실패 = DB에는 있지만 이벤트는 발행 안 됨
    // Step 1 성공 + Step 2 성공 + DB 커밋 실패 = 이벤트는 발행됐지만 DB에 없음
}

DB 트랜잭션과 메시지 브로커 전송은 서로 다른 인프라이므로 원자적으로 처리할 수 없습니다.

6-2. 해결: Outbox 패턴

Outbox 패턴은 이벤트를 DB 트랜잭션 내에서 outbox 테이블에 저장하고, 별도 프로세스가 이를 메시지 브로커로 전달합니다.

1. 비즈니스 로직 + outbox INSERT = 단일 DB 트랜잭션 (원자성 보장)
2. 별도 프로세스(CDC/Poller)가 outbox 테이블 읽어서 Kafka로 발행
3. 발행 성공 후 outbox 레코드 처리 완료로 표시
-- Outbox 테이블 스키마
CREATE TABLE outbox (
    id              UUID PRIMARY KEY,
    aggregate_type  VARCHAR(255) NOT NULL,
    aggregate_id    UUID NOT NULL,
    event_type      VARCHAR(255) NOT NULL,
    payload         JSONB NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMPTZ,
    status          VARCHAR(20) DEFAULT 'PENDING'
);

CREATE INDEX idx_outbox_pending ON outbox (status, created_at)
    WHERE status = 'PENDING';
// === Outbox 패턴 구현 ===

@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;

    @Transactional  // 단일 트랜잭션으로 보장
    public UUID createOrder(CreateOrderCommand command) {
        // 1. 비즈니스 로직 수행
        Order order = Order.create(command);
        orderRepository.save(order);

        // 2. 같은 트랜잭션에서 outbox에 이벤트 저장
        OutboxMessage message = OutboxMessage.builder()
            .aggregateType("Order")
            .aggregateId(order.getId())
            .eventType("OrderCreated")
            .payload(objectMapper.writeValueAsString(
                new OrderCreatedEvent(order.getId(), order.getTotalAmount())
            ))
            .build();
        outboxRepository.save(message);

        return order.getId();
        // 트랜잭션 커밋 시 order와 outbox 모두 함께 저장됨
    }
}

6-3. CDC (Debezium) 방식

Debezium은 DB의 트랜잭션 로그(WAL)를 모니터링하여 변경을 캡처하고 Kafka로 전달합니다.

{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "dbz_password",
    "database.dbname": "orderdb",
    "database.server.name": "order-service",
    "table.include.list": "public.outbox",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.by.field": "aggregate_type",
    "transforms.outbox.route.topic.replacement": "events.order"
  }
}

6-4. Polling vs CDC 비교

기준PollingCDC (Debezium)
지연폴링 주기에 의존 (수 초~수 분)거의 실시간 (수 ms)
DB 부하주기적 쿼리로 부하 발생WAL 읽기로 부하 최소
구현 복잡도낮음 (스케줄러 + 쿼리)높음 (Debezium 인프라 필요)
순서 보장outbox 테이블 순서WAL 순서 그대로
운영 부담낮음Kafka Connect 클러스터 운영 필요
확장성단일 폴러 병목 가능높은 확장성

7. 멱등성 패턴

분산 시스템에서 메시지는 최소 한 번(at-least-once) 전달됩니다. 네트워크 장애, 브로커 재시작, 소비자 재배포 등으로 중복 수신이 발생합니다. 멱등성은 같은 메시지를 여러 번 처리해도 결과가 한 번 처리한 것과 동일하도록 보장합니다.

7-1. Producer 멱등성 (Kafka)

# Kafka Producer 설정
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.properties.acks=all
spring.kafka.producer.properties.max.in.flight.requests.per.connection=5
spring.kafka.producer.properties.retries=2147483647

Kafka의 멱등 프로듀서는 각 메시지에 시퀀스 번호를 부여하여 브로커가 중복을 감지합니다.

7-2. Consumer 멱등성

// === 멱등 소비자 구현 ===

@Component
@RequiredArgsConstructor
public class IdempotentOrderEventHandler {

    private final ProcessedEventRepository processedEventRepository;
    private final OrderProjectionRepository orderProjectionRepository;

    @KafkaListener(topics = "order-events", groupId = "order-projection")
    @Transactional
    public void handleOrderEvent(
            @Payload OrderEvent event,
            @Header(KafkaHeaders.RECEIVED_KEY) String key,
            @Header("eventId") String eventId) {

        // 1. 이미 처리된 이벤트인지 확인
        if (processedEventRepository.existsById(eventId)) {
            log.info("이벤트 중복 수신 무시: eventId={}", eventId);
            return;
        }

        // 2. 비즈니스 로직 처리
        processEvent(event);

        // 3. 처리 완료 기록 (같은 트랜잭션)
        processedEventRepository.save(new ProcessedEvent(
            eventId,
            event.getClass().getSimpleName(),
            Instant.now()
        ));
    }

    private void processEvent(OrderEvent event) {
        switch (event) {
            case OrderCreatedEvent e -> createOrderProjection(e);
            case OrderStatusChangedEvent e -> updateOrderStatus(e);
            default -> log.warn("알 수 없는 이벤트 타입");
        }
    }
}
-- 이벤트 처리 이력 테이블
CREATE TABLE processed_events (
    event_id    VARCHAR(255) PRIMARY KEY,
    event_type  VARCHAR(255) NOT NULL,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

7-3. Unique Constraint vs Redis SET NX

접근법장점단점
DB Unique Constraint트랜잭션과 함께 원자적DB 부하 증가
Redis SET NX매우 빠른 조회Redis 장애 시 보장 안 됨
DB + Redis 조합빠른 1차 필터 + 확실한 보장구현 복잡
// Redis SET NX를 이용한 빠른 중복 체크
@Component
public class RedisIdempotencyGuard {

    private final StringRedisTemplate redisTemplate;
    private static final Duration TTL = Duration.ofHours(24);

    public boolean tryAcquire(String idempotencyKey) {
        Boolean result = redisTemplate.opsForValue()
            .setIfAbsent(
                "idempotency:" + idempotencyKey,
                "processed",
                TTL
            );
        return Boolean.TRUE.equals(result);
    }
}

8. Dead Letter Queue 전략

메시지 처리에 실패하면 무한 재시도는 시스템을 마비시킵니다. DLQ(Dead Letter Queue)는 처리 불가능한 메시지를 별도 큐로 격리하여 시스템의 안정성을 보장합니다.

8-1. Retry with Exponential Backoff

// === Spring Kafka 재시도 + DLQ 설정 ===

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public DefaultErrorHandler errorHandler(
            KafkaTemplate<String, Object> kafkaTemplate) {

        // DLQ로 보내는 recoverer
        DeadLetterPublishingRecoverer recoverer =
            new DeadLetterPublishingRecoverer(kafkaTemplate,
                (record, ex) -> new TopicPartition(
                    record.topic() + ".DLQ", record.partition()));

        // Exponential Backoff 재시도 설정
        ExponentialBackOff backOff = new ExponentialBackOff();
        backOff.setInitialInterval(1000L);    // 1초
        backOff.setMultiplier(2.0);            // 2배씩 증가
        backOff.setMaxInterval(60000L);        // 최대 60초
        backOff.setMaxElapsedTime(300000L);    // 최대 5분

        DefaultErrorHandler errorHandler =
            new DefaultErrorHandler(recoverer, backOff);

        // 재시도하지 않을 예외 (비즈니스 오류)
        errorHandler.addNotRetryableExceptions(
            InvalidOrderException.class,
            DuplicateEventException.class
        );

        return errorHandler;
    }
}

8-2. DLQ 모니터링 및 알림

// === DLQ 모니터링 서비스 ===

@Component
@RequiredArgsConstructor
public class DlqMonitoringService {

    private final MeterRegistry meterRegistry;
    private final AlertService alertService;

    @KafkaListener(topics = "order-events.DLQ", groupId = "dlq-monitor")
    public void monitorDlq(
            ConsumerRecord<String, Object> record,
            @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String errorMessage,
            @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic) {

        // 메트릭 기록
        meterRegistry.counter("dlq.messages.received",
            "originalTopic", originalTopic,
            "errorType", classifyError(errorMessage)
        ).increment();

        // 알림 발송
        alertService.sendAlert(DlqAlert.builder()
            .originalTopic(originalTopic)
            .messageKey(record.key())
            .errorMessage(errorMessage)
            .timestamp(Instant.ofEpochMilli(record.timestamp()))
            .severity(classifySeverity(errorMessage))
            .build()
        );

        log.error("DLQ 메시지 수신: topic={}, key={}, error={}",
            originalTopic, record.key(), errorMessage);
    }
}

8-3. DLQ 재처리 파이프라인

// === DLQ 메시지 재처리 ===

@RestController
@RequestMapping("/api/dlq")
@RequiredArgsConstructor
public class DlqReprocessController {

    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final DlqMessageRepository dlqRepository;

    // 단건 재처리
    @PostMapping("/reprocess/single")
    public ResponseEntity<String> reprocessSingle(
            @RequestParam String messageId) {
        DlqMessage message = dlqRepository.findById(messageId).orElseThrow();

        // 원래 토픽으로 재발행
        kafkaTemplate.send(
            message.getOriginalTopic(),
            message.getKey(),
            message.getValue()
        );

        message.setStatus("REPROCESSED");
        message.setReprocessedAt(Instant.now());
        dlqRepository.save(message);

        return ResponseEntity.ok("재처리 요청 완료: " + messageId);
    }

    // 배치 재처리
    @PostMapping("/reprocess/batch")
    public ResponseEntity<String> reprocessBatch(
            @RequestParam String originalTopic,
            @RequestParam(defaultValue = "100") int batchSize) {
        List<DlqMessage> messages = dlqRepository
            .findByOriginalTopicAndStatus(originalTopic, "PENDING",
                PageRequest.of(0, batchSize));

        int count = 0;
        for (DlqMessage message : messages) {
            kafkaTemplate.send(message.getOriginalTopic(),
                message.getKey(), message.getValue());
            message.setStatus("REPROCESSED");
            count++;
        }
        dlqRepository.saveAll(messages);

        return ResponseEntity.ok(count + "건 재처리 요청 완료");
    }
}

9. Exactly-Once Semantics

메시지 전달 보장 수준은 세 가지가 있습니다.

보장 수준설명복잡도
At-most-once최대 한 번 전달 (유실 가능)낮음
At-least-once최소 한 번 전달 (중복 가능)중간
Exactly-once정확히 한 번 전달높음

9-1. Kafka Exactly-Once 구현

Kafka는 0.11부터 Exactly-Once Semantics(EOS)를 지원합니다.

// === Kafka Exactly-Once 설정 ===

@Configuration
public class KafkaExactlyOnceConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTransactionManager<String, Object> kafkaTransactionManager(
            ProducerFactory<String, Object> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new DefaultKafkaConsumerFactory<>(config);
    }
}

9-2. End-to-End Exactly-Once

진정한 End-to-End Exactly-Once는 Producer부터 Consumer의 최종 상태 저장까지 보장해야 합니다.

End-to-End Exactly-Once 체인:

Producer (idempotent + transactions)
Kafka Broker (transaction log)
Consumer (read_committed + manual offset)
Database (idempotency key + business logic)

모든 체인이 함께 동작해야 Exactly-Once가 보장됨
// === End-to-End Exactly-Once 처리 ===

@Component
@RequiredArgsConstructor
public class ExactlyOnceOrderProcessor {

    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final OrderRepository orderRepository;
    private final ProcessedOffsetRepository offsetRepository;

    @KafkaListener(topics = "payment-completed")
    @Transactional("chainedKafkaTransactionManager")
    public void processPaymentCompleted(
            ConsumerRecord<String, PaymentCompletedEvent> record) {

        String offsetKey = record.topic() + "-" +
            record.partition() + "-" + record.offset();

        // 이미 처리된 오프셋인지 확인
        if (offsetRepository.existsById(offsetKey)) {
            return;
        }

        // 비즈니스 로직
        PaymentCompletedEvent event = record.value();
        Order order = orderRepository.findById(event.orderId()).orElseThrow();
        order.markAsPaid(event.paymentId());
        orderRepository.save(order);

        // 다음 단계 이벤트 발행 (같은 Kafka 트랜잭션)
        kafkaTemplate.send("order-paid",
            new OrderPaidEvent(order.getId(), event.paymentId()));

        // 처리된 오프셋 기록 (같은 DB 트랜잭션)
        offsetRepository.save(new ProcessedOffset(offsetKey, Instant.now()));
    }
}

10. Reactive Streams vs Virtual Threads (2025)

10-1. Project Reactor: 배압(Backpressure)이 핵심

리액티브 스트림은 논블로킹 I/O와 배압(Backpressure) 메커니즘을 핵심 설계 원칙으로 합니다.

// === Reactor 기반 비동기 처리 ===

@Service
public class ReactiveOrderService {

    public Flux<OrderEvent> processOrderStream() {
        return kafkaReceiver.receive()
            .flatMap(record -> {
                OrderEvent event = record.value();
                return processEvent(event)
                    .then(record.receiverOffset().commit())
                    .thenReturn(event);
            }, 16)  // 동시 처리 수 제한 (배압)
            .onErrorResume(e -> {
                log.error("처리 실패", e);
                return Mono.empty();
            })
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
    }

    private Mono<Void> processEvent(OrderEvent event) {
        return Mono.fromCallable(() -> {
            // 비즈니스 로직
            return null;
        }).subscribeOn(Schedulers.boundedElastic()).then();
    }
}

10-2. Java 21 Virtual Threads: 더 단순한 코드

Virtual Threads는 기존 동기식 코드 스타일을 유지하면서 높은 동시성을 달성합니다.

// === Virtual Threads 기반 비동기 처리 ===

@Configuration
public class VirtualThreadConfig {

    @Bean
    public TomcatProtocolHandlerCustomizer<?> protocolHandlerCustomizer() {
        return protocolHandler -> {
            protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
        };
    }
}

@Service
public class VirtualThreadOrderService {

    // 동기식 코드지만 Virtual Thread 위에서 실행
    // → 블로킹 I/O 시 OS 스레드가 아닌 Virtual Thread만 블로킹
    public OrderResult processOrder(UUID orderId) {
        // 병렬 처리: StructuredTaskScope 활용
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

            Subtask<PaymentResult> paymentTask = scope.fork(() ->
                paymentService.processPayment(orderId)
            );
            Subtask<InventoryResult> inventoryTask = scope.fork(() ->
                inventoryService.checkInventory(orderId)
            );

            scope.join();
            scope.throwIfFailed();

            return new OrderResult(
                paymentTask.get(),
                inventoryTask.get()
            );
        }
    }
}

10-3. 선택 가이드: 의사결정 매트릭스

기준Reactive (Reactor/WebFlux)Virtual Threads
코드 복잡도높음 (리액티브 연산자 학습)낮음 (기존 동기 코드)
배압 제어내장 (Flux/Mono)수동 구현 필요
디버깅어려움 (스택 트레이스 복잡)쉬움 (전통적 스택 트레이스)
메모리 효율매우 좋음좋음 (OS 스레드보다)
I/O 집약 작업최적매우 좋음
CPU 집약 작업부적합부적합 (플랫폼 스레드 사용)
기존 코드 마이그레이션전면 재작성 필요설정 변경만으로 가능
생태계 성숙도높음 (5년+)성장 중 (2023~)

권장 가이드:

  • 새 프로젝트 + 단순 I/O: Virtual Threads
  • 스트리밍 데이터 처리: Reactive Streams
  • 기존 Spring MVC 프로젝트: Virtual Threads (마이그레이션 비용 최소)
  • 실시간 데이터 파이프라인: Reactive Streams

11. 실전 아키텍처: 결제 시스템 설계

모든 패턴을 결합한 이커머스 결제 시스템의 전체 아키텍처를 설계합니다.

                         이커머스 결제 시스템 전체 아키텍처
                         ================================

[Client][API Gateway]
                |
        [Order Service] ─── Write DB (PostgreSQL)
           |                     |
           | (Outbox Pattern)    | (CDC - Debezium)
           |           |              [Kafka: order-events]
           |                /        |        \
           ↓               ↓        ↓         ↓
    [Saga Orchestrator]  [Payment] [Inventory] [Notification]
    (Temporal)           Service   Service     Service
           |                |        |
           | commands       |        |
           ↓                ↓        ↓
    [payment-commands]  [payment-  [inventory-
                        events]    events]
                           |
                    [DLQ + Monitoring]
                           |
                    [Alerting (PagerDuty)]

Read Side:
    [Kafka: order-events][Order Projection]Read DB (MongoDB)
[Analytics Service]Data Warehouse
[Search Service]Elasticsearch

적용된 패턴 요약

패턴적용 위치목적
CQRSOrder Service주문 쓰기/읽기 분리
Saga (Orchestration)Temporal Orchestrator결제-재고-배송 트랜잭션 관리
Event SourcingOrder Aggregate주문 상태 변경 이력 관리
Outbox PatternOrder Service → KafkaDB-메시지 원자성 보장
멱등성모든 Consumer중복 메시지 처리 방지
DLQ모든 Kafka Consumer처리 실패 메시지 격리
Exactly-OncePayment 처리이중 결제 방지

퀴즈

Q1. CQRS에서 Write Model과 Read Model이 서로 다른 데이터베이스를 사용할 때, 두 모델 간의 데이터 일관성을 보장하기 위해 가장 일반적으로 사용되는 접근 방식은?

정답: 이벤트 기반 비동기 동기화 (Eventual Consistency)

Write Model에서 상태 변경 시 이벤트를 발행하고, Read Model의 프로젝션 핸들러가 이 이벤트를 구독하여 Read DB를 업데이트합니다. 두 모델 사이에는 일시적 불일치(수 ms~수 초)가 존재할 수 있지만, 최종적으로 일관성이 보장됩니다.

강한 일관성이 필요한 경우에는 CQRS의 적합성을 재검토해야 합니다.

Q2. Choreography Saga에서 서비스 A가 이벤트를 발행하고 서비스 B가 처리에 실패했을 때, 서비스 A는 어떻게 이를 인지할 수 있나요?

정답: 서비스 B가 실패 이벤트(예: PaymentFailed)를 발행하면 서비스 A가 이를 구독하여 인지합니다. Choreography에서는 각 서비스가 성공/실패 이벤트를 모두 발행해야 하며, 관련 서비스들이 이를 구독하여 적절히 대응합니다.

이것이 Choreography의 단점 중 하나입니다. 서비스가 많아질수록 이벤트 흐름 추적이 복잡해지며, 타임아웃 기반 감지가 추가로 필요할 수 있습니다.

Q3. Event Sourcing에서 이벤트가 100만 개 쌓인 Aggregate를 로드할 때 성능 문제를 해결하는 핵심 기법은?

정답: Snapshot 패턴

주기적으로(예: 100개 이벤트마다) Aggregate의 현재 상태를 스냅샷으로 저장합니다. 로드 시 가장 최근 스냅샷에서 시작하여 그 이후의 이벤트만 재생하면 됩니다.

예: 100만 개 이벤트 중 마지막 스냅샷이 999,900번째라면 100개의 이벤트만 재생하면 됩니다.

Q4. Outbox Pattern에서 CDC(Change Data Capture) 방식이 Polling 방식보다 선호되는 주요 이유는?

정답: CDC는 DB의 트랜잭션 로그(WAL)를 직접 읽기 때문에 거의 실시간(수 ms)으로 변경을 감지합니다. Polling은 주기적으로 쿼리를 실행하므로 지연이 크고(수 초~수 분), 빈번한 쿼리로 DB에 부하를 줍니다.

또한 CDC는 WAL 순서 그대로 이벤트를 전달하여 순서 보장이 자연스럽고, DB 트랜잭션 로그를 읽으므로 추가적인 쿼리 부하가 거의 없습니다.

Q5. Kafka에서 Exactly-Once Semantics를 달성하려면 Producer, Broker, Consumer 각각에서 어떤 설정이 필요한가요?

정답:

  • Producer: enable.idempotence=true + transactional.id 설정 (멱등 프로듀서 + 트랜잭션)
  • Broker: 트랜잭션 로그 유지 (기본 활성화)
  • Consumer: isolation.level=read_committed + 수동 오프셋 커밋 + 멱등 처리

End-to-End Exactly-Once를 위해서는 Consumer가 DB에 결과를 저장할 때도 멱등성을 보장해야 합니다. Kafka 트랜잭션은 Kafka 내부의 Exactly-Once만 보장하며, 외부 시스템(DB)까지 포함하려면 Consumer 측 멱등성이 필수입니다.


참고 자료

  1. Martin Fowler — "Event Sourcing" (martinfowler.com)
  2. Chris Richardson — "Microservices Patterns" (Manning, 2018)
  3. Vaughn Vernon — "Implementing Domain-Driven Design" (Addison-Wesley, 2013)
  4. Greg Young — "CQRS Documents" (cqrs.files.wordpress.com)
  5. Apache Kafka Documentation — "Exactly-Once Semantics" (kafka.apache.org)
  6. Debezium Documentation — "Outbox Event Router" (debezium.io)
  7. Temporal.io Documentation — "Saga Pattern with Temporal" (docs.temporal.io)
  8. Netflix Tech Blog — "Evolution of the Netflix Data Pipeline" (netflixtechblog.com)
  9. Uber Engineering Blog — "Building Reliable Reprocessing and Dead Letter Queues" (eng.uber.com)
  10. Confluent Blog — "Exactly-Once Semantics Are Possible" (confluent.io)
  11. AWS Architecture Blog — "Saga Orchestration Pattern" (aws.amazon.com)
  12. Microsoft Azure Docs — "CQRS Pattern" (learn.microsoft.com)
  13. Spring for Apache Kafka Reference — "Error Handling and DLQ" (docs.spring.io)
  14. JEP 444 — "Virtual Threads" (openjdk.org)
  15. Project Reactor Reference — "Backpressure" (projectreactor.io)
  16. Pat Helland — "Idempotence Is Not a Medical Condition" (ACM Queue, 2012)