Split View: 비동기 처리 패턴 완전 정복: CQRS, Saga, Event Sourcing — 시니어 백엔드 개발자의 필수 무기
비동기 처리 패턴 완전 정복: CQRS, Saga, Event Sourcing — 시니어 백엔드 개발자의 필수 무기
- 1. 왜 비동기인가? 대규모 시스템의 생존 전략
- 2. Event-Driven Architecture 기초
- 3. CQRS 딥다이브
- 4. Saga 패턴 완전 정복
- 5. Event Sourcing
- 6. Outbox Pattern (신뢰성의 핵심)
- 7. 멱등성 패턴
- 8. Dead Letter Queue 전략
- 9. Exactly-Once Semantics
- 10. Reactive Streams vs Virtual Threads (2025)
- 11. 실전 아키텍처: 결제 시스템 설계
- 퀴즈
- 참고 자료
1. 왜 비동기인가? 대규모 시스템의 생존 전략
현대 대규모 시스템은 동기식 요청-응답 모델로는 감당할 수 없는 규모의 트래픽을 처리합니다.
실제 기업 사례:
| 기업 | 규모 | 비동기 처리 방식 |
|---|---|---|
| Netflix | 2.6억 구독자, 초당 수백만 이벤트 | Apache Kafka 기반 이벤트 스트리밍 |
| Uber | 일일 페타바이트 실시간 데이터 | Apache Kafka + Apache Flink |
| 7조+ 메시지/일 Kafka 처리 | 자체 개발 Kafka 인프라 | |
| Shopify | 블랙프라이데이 초당 80만 요청 | 비동기 주문 처리 파이프라인 |
동기식 아키텍처에서 서비스 A가 서비스 B를 호출하고, B가 C를 호출하면 전체 레이턴시는 각 서비스 호출 시간의 합이 됩니다. 하나의 서비스가 느려지면 전체 체인이 블로킹됩니다.
비동기 아키텍처에서는 메시지 브로커를 통해 서비스 간 결합도를 낮추고, 각 서비스가 독립적으로 확장 가능합니다.
동기식: Client → A(50ms) → B(100ms) → C(200ms) = 350ms 총 대기
비동기식: Client → A(50ms) → Message Broker → (B, C 병렬 처리)
Client는 50ms 후 응답, B와 C는 비동기로 처리
비동기 전환의 핵심 이점:
- 시간적 결합 제거 — 생산자와 소비자가 동시에 가동될 필요 없음
- 독립적 확장 — 읽기와 쓰기 워크로드를 별도로 스케일링
- 장애 격리 — 하나의 서비스 장애가 전체에 전파되지 않음
- 피크 트래픽 흡수 — 메시지 큐가 버퍼 역할
그러나 비동기는 공짜가 아닙니다. Eventual Consistency(최종 일관성), 메시지 순서 보장, 중복 처리, 분산 트랜잭션 같은 새로운 문제가 생깁니다. 이 글에서 다루는 패턴들은 바로 이 문제들에 대한 해답입니다.
2. Event-Driven Architecture 기초
이벤트 주도 아키텍처(EDA)는 비동기 패턴의 토대입니다. 시스템의 상태 변경을 "이벤트"로 표현하고, 관심 있는 서비스가 이를 구독하여 반응합니다.
2-1. Domain Events vs Integration Events
| 구분 | Domain Event | Integration Event |
|---|---|---|
| 범위 | 단일 바운디드 컨텍스트 내부 | 바운디드 컨텍스트 간 |
| 예시 | OrderItemAdded | OrderPlaced |
| 전달 방식 | 인메모리 이벤트 버스 | 메시지 브로커 (Kafka, RabbitMQ) |
| 스키마 관리 | 내부적이므로 자유롭게 변경 가능 | 계약(Contract) 관리 필수 |
// Domain Event - 바운디드 컨텍스트 내부
public record OrderItemAdded(
UUID orderId,
UUID productId,
int quantity,
BigDecimal price,
Instant occurredAt
) implements DomainEvent {}
// Integration Event - 외부 서비스에 발행
public record OrderPlacedEvent(
UUID orderId,
UUID customerId,
BigDecimal totalAmount,
List<OrderLineItem> items,
Instant occurredAt,
int version // 스키마 버전 관리
) implements IntegrationEvent {}
2-2. Event Bus vs Event Store
Event Bus (예: Kafka, RabbitMQ)는 이벤트를 전달하는 파이프라인입니다. 소비자가 이벤트를 수신하면 보통 브로커에서 삭제(또는 TTL 이후 만료)됩니다.
Event Store는 모든 이벤트를 영구적으로 보관하는 저장소입니다. 이벤트를 재생(replay)하여 어떤 시점의 상태든 복원할 수 있습니다.
Event Bus (Kafka):
Producer → [Topic: orders] → Consumer Group A (주문 서비스)
→ Consumer Group B (알림 서비스)
→ Consumer Group C (분석 서비스)
Event Store:
[Event #1: OrderCreated]
[Event #2: ItemAdded]
[Event #3: PaymentReceived]
[Event #4: OrderShipped]
→ 이벤트 #1~#4를 재생하면 현재 주문 상태 복원 가능
2-3. Eventual Consistency 이해
분산 시스템에서 강한 일관성(Strong Consistency)을 유지하려면 분산 잠금, 2PC(2-Phase Commit) 등이 필요하지만 성능과 가용성이 크게 떨어집니다.
CAP 정리에 따라, 네트워크 파티션 상황에서 일관성(C)과 가용성(A) 중 하나만 선택해야 합니다. 대부분의 대규모 시스템은 가용성(A)을 선택하고 Eventual Consistency를 수용합니다.
Eventual Consistency란 "지금 당장은 데이터가 불일치할 수 있지만, 충분한 시간이 지나면 모든 노드가 동일한 상태로 수렴한다"는 것입니다.
실무에서의 의미:
- 주문 후 재고가 즉시 줄어들지 않을 수 있음 (수 밀리초~수 초 지연)
- 사용자에게 "주문이 처리 중입니다" 상태를 먼저 보여주고, 확정 후 업데이트
- 일관성 창(Consistency Window)을 최소화하는 것이 핵심
3. CQRS 딥다이브
CQRS(Command Query Responsibility Segregation)는 읽기와 쓰기 모델을 완전히 분리하는 패턴입니다.
3-1. 기본 개념
전통적인 CRUD 모델에서는 하나의 데이터 모델이 읽기와 쓰기를 모두 담당합니다. 하지만 실제로 읽기와 쓰기는 매우 다른 요구사항을 가집니다.
전통적 CRUD:
Client → [단일 모델] → Database
(읽기/쓰기 모두)
CQRS:
Client ──Command──→ [Write Model] → Command DB (정규화)
↓ Events
Client ←──Query───← [Read Model] ← Read DB (비정규화, 최적화)
Write Model (명령 측):
- 비즈니스 규칙 검증
- 도메인 불변식(Invariant) 보장
- 정규화된 스키마
- 강한 일관성
Read Model (질의 측):
- 조회에 최적화된 비정규화 뷰
- 다양한 저장소 사용 가능 (Elasticsearch, Redis, MongoDB)
- Eventual Consistency 허용
3-2. 언제 CQRS를 사용해야 하는가?
적합한 경우:
- 읽기/쓰기 비율이 극단적으로 다를 때 (읽기가 100:1 이상)
- 복잡한 도메인 로직 + 다양한 조회 요구사항
- 읽기와 쓰기의 스케일링 요구가 다를 때
- Event Sourcing과 함께 사용할 때
부적합한 경우:
- 단순한 CRUD 애플리케이션
- 강한 일관성이 절대적으로 필요한 경우
- 도메인이 단순하고 조회 패턴이 예측 가능한 경우
3-3. Spring Boot + Kafka 구현
// === Command 측: 주문 생성 ===
@Service
@RequiredArgsConstructor
public class OrderCommandService {
private final OrderRepository orderRepository;
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
@Transactional
public UUID createOrder(CreateOrderCommand command) {
// 1. 비즈니스 규칙 검증
validateOrder(command);
// 2. 도메인 모델에서 처리
Order order = Order.create(
command.customerId(),
command.items(),
command.shippingAddress()
);
// 3. Write DB에 저장
orderRepository.save(order);
// 4. 이벤트 발행 (Read Model 동기화용)
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getCustomerId(),
order.getTotalAmount(),
order.getItems().stream()
.map(this::toEventItem)
.toList(),
Instant.now()
);
kafkaTemplate.send("order-events", order.getId().toString(), event);
return order.getId();
}
private void validateOrder(CreateOrderCommand command) {
if (command.items().isEmpty()) {
throw new InvalidOrderException("주문 항목이 비어있습니다");
}
// 재고 확인, 고객 상태 확인 등
}
}
// === Query 측: 주문 조회 프로젝션 ===
@Service
@RequiredArgsConstructor
public class OrderQueryService {
private final OrderReadRepository readRepository;
public OrderDetailView getOrderDetail(UUID orderId) {
return readRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
}
public Page<OrderSummaryView> getCustomerOrders(
UUID customerId, Pageable pageable) {
return readRepository.findByCustomerId(customerId, pageable);
}
}
// Read Model - 조회에 최적화된 비정규화 뷰
@Document(collection = "order_views")
public record OrderDetailView(
UUID orderId,
UUID customerId,
String customerName, // 비정규화: 고객명 포함
String customerEmail,
BigDecimal totalAmount,
String status,
List<OrderItemView> items,
AddressView shippingAddress,
Instant createdAt,
Instant lastUpdatedAt
) {}
// === 이벤트 핸들러: Write → Read 동기화 ===
@Component
@RequiredArgsConstructor
public class OrderProjectionHandler {
private final OrderReadRepository readRepository;
private final CustomerQueryClient customerClient;
@KafkaListener(topics = "order-events", groupId = "order-projection")
public void handleOrderEvent(OrderEvent event) {
switch (event) {
case OrderCreatedEvent e -> handleOrderCreated(e);
case OrderStatusChangedEvent e -> handleStatusChanged(e);
case OrderCancelledEvent e -> handleOrderCancelled(e);
default -> log.warn("알 수 없는 이벤트 타입: {}", event.getClass());
}
}
private void handleOrderCreated(OrderCreatedEvent event) {
// 고객 정보 조회하여 비정규화된 뷰 생성
CustomerInfo customer = customerClient.getCustomer(event.customerId());
OrderDetailView view = new OrderDetailView(
event.orderId(),
event.customerId(),
customer.name(),
customer.email(),
event.totalAmount(),
"CREATED",
event.items().stream().map(this::toItemView).toList(),
null,
event.occurredAt(),
event.occurredAt()
);
readRepository.save(view);
}
}
3-4. Read Model 동기화 전략
| 전략 | 지연 | 일관성 | 복잡도 | 적합한 경우 |
|---|---|---|---|---|
| 이벤트 핸들러 (Kafka) | 수 ms ~ 수 초 | Eventual | 중간 | 대부분의 경우 |
| CDC (Debezium) | 수 초 | Eventual | 높음 | 레거시 시스템 연동 |
| 폴링 (Scheduled) | 수 초 ~ 수 분 | Eventual | 낮음 | 단순한 경우 |
| 동기식 업데이트 | 즉시 | Strong | 낮음 | 성능 요구 낮을 때 |
4. Saga 패턴 완전 정복
마이크로서비스에서 여러 서비스에 걸친 비즈니스 트랜잭션은 어떻게 관리할까요? 기존 모놀리스의 DB 트랜잭션은 사용할 수 없습니다. Saga 패턴은 분산 트랜잭션을 일련의 로컬 트랜잭션으로 분해합니다.
4-1. Choreography (안무형)
Choreography 방식에서는 중앙 조율자가 없습니다. 각 서비스가 이벤트를 발행하면 다음 서비스가 해당 이벤트를 구독하여 자신의 로컬 트랜잭션을 수행합니다.
이커머스 주문 흐름 (Choreography):
Order Service ──OrderPlaced──→ Payment Service
│
PaymentCompleted
│
↓
Inventory Service
│
InventoryReserved
│
↓
Shipping Service
│
ShipmentCreated
│
↓
Order Service (상태 업데이트)
실패 시 (보상 트랜잭션):
Inventory Service ──InventoryReservationFailed──→ Payment Service (환불)
│
PaymentRefunded
│
↓
Order Service (주문 취소)
// === Choreography Saga: 주문 서비스 ===
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
@Transactional
public UUID placeOrder(PlaceOrderCommand command) {
Order order = Order.create(command);
order.setStatus(OrderStatus.PENDING);
orderRepository.save(order);
// 이벤트 발행 - 결제 서비스가 구독
kafkaTemplate.send("order-events", new OrderPlacedEvent(
order.getId(),
order.getCustomerId(),
order.getTotalAmount(),
order.getItems()
));
return order.getId();
}
// 결제 완료 이벤트 수신
@KafkaListener(topics = "payment-events", groupId = "order-service")
public void onPaymentCompleted(PaymentCompletedEvent event) {
Order order = orderRepository.findById(event.orderId()).orElseThrow();
order.setStatus(OrderStatus.PAID);
orderRepository.save(order);
}
// 결제 실패 이벤트 수신
@KafkaListener(topics = "payment-events", groupId = "order-service")
public void onPaymentFailed(PaymentFailedEvent event) {
Order order = orderRepository.findById(event.orderId()).orElseThrow();
order.setStatus(OrderStatus.CANCELLED);
order.setCancellationReason(event.reason());
orderRepository.save(order);
}
}
// === Choreography Saga: 결제 서비스 ===
@Service
@RequiredArgsConstructor
public class PaymentService {
private final PaymentRepository paymentRepository;
private final PaymentGateway paymentGateway;
private final KafkaTemplate<String, Object> kafkaTemplate;
@KafkaListener(topics = "order-events", groupId = "payment-service")
@Transactional
public void onOrderPlaced(OrderPlacedEvent event) {
try {
// 결제 처리
PaymentResult result = paymentGateway.charge(
event.customerId(),
event.totalAmount()
);
Payment payment = Payment.create(
event.orderId(),
event.customerId(),
event.totalAmount(),
result.transactionId()
);
paymentRepository.save(payment);
// 성공 이벤트 발행 - 재고 서비스가 구독
kafkaTemplate.send("payment-events", new PaymentCompletedEvent(
event.orderId(),
payment.getId(),
result.transactionId()
));
} catch (PaymentException e) {
// 실패 이벤트 발행 - 주문 서비스가 구독하여 취소
kafkaTemplate.send("payment-events", new PaymentFailedEvent(
event.orderId(),
e.getMessage()
));
}
}
}
Choreography 장단점:
| 장점 | 단점 |
|---|---|
| 서비스 간 느슨한 결합 | 전체 흐름 파악이 어려움 |
| 각 서비스가 독립적으로 배포 가능 | 디버깅이 복잡함 |
| 단일 장애 지점(SPOF) 없음 | 순환 의존성 발생 가능 |
| 구현이 비교적 단순 | 서비스가 많아지면 이벤트 추적 난이도 상승 |
4-2. Orchestration (지휘형)
Orchestration 방식에서는 중앙 조율자(Orchestrator)가 전체 워크플로우를 관리합니다. 각 서비스에 명령을 보내고, 응답에 따라 다음 단계를 결정합니다.
이커머스 주문 흐름 (Orchestration):
Order Saga Orchestrator
/ | \
↓ ↓ ↓
Payment Service Inventory Shipping
(charge) (reserve) (create)
↓ ↓ ↓
(result) (result) (result)
\ | /
Order Saga Orchestrator
(최종 결정)
// === Orchestration Saga: Temporal 프레임워크 사용 ===
@WorkflowInterface
public interface OrderSagaWorkflow {
@WorkflowMethod
OrderResult processOrder(PlaceOrderCommand command);
}
@WorkflowImpl(workflows = OrderSagaWorkflow.class)
public class OrderSagaWorkflowImpl implements OrderSagaWorkflow {
// 각 서비스의 Activity 참조
private final PaymentActivities paymentActivities =
Workflow.newActivityStub(PaymentActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.setRetryOptions(RetryOptions.newBuilder()
.setMaximumAttempts(3)
.build())
.build());
private final InventoryActivities inventoryActivities =
Workflow.newActivityStub(InventoryActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.build());
private final ShippingActivities shippingActivities =
Workflow.newActivityStub(ShippingActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(60))
.build());
@Override
public OrderResult processOrder(PlaceOrderCommand command) {
String paymentId = null;
String reservationId = null;
try {
// Step 1: 결제
paymentId = paymentActivities.processPayment(
command.customerId(),
command.totalAmount()
);
// Step 2: 재고 예약
reservationId = inventoryActivities.reserveInventory(
command.items()
);
// Step 3: 배송 생성
String shipmentId = shippingActivities.createShipment(
command.orderId(),
command.shippingAddress()
);
return OrderResult.success(command.orderId(), shipmentId);
} catch (Exception e) {
// 보상 트랜잭션 실행
compensate(paymentId, reservationId);
return OrderResult.failed(command.orderId(), e.getMessage());
}
}
private void compensate(String paymentId, String reservationId) {
// 역순으로 보상 트랜잭션 실행
if (reservationId != null) {
inventoryActivities.cancelReservation(reservationId);
}
if (paymentId != null) {
paymentActivities.refundPayment(paymentId);
}
}
}
// === Activity 구현 ===
@ActivityInterface
public interface PaymentActivities {
String processPayment(UUID customerId, BigDecimal amount);
void refundPayment(String paymentId);
}
@Component
@RequiredArgsConstructor
public class PaymentActivitiesImpl implements PaymentActivities {
private final PaymentGateway paymentGateway;
private final PaymentRepository paymentRepository;
@Override
public String processPayment(UUID customerId, BigDecimal amount) {
PaymentResult result = paymentGateway.charge(customerId, amount);
Payment payment = Payment.create(customerId, amount, result.transactionId());
paymentRepository.save(payment);
return payment.getId().toString();
}
@Override
public void refundPayment(String paymentId) {
Payment payment = paymentRepository.findById(UUID.fromString(paymentId))
.orElseThrow();
paymentGateway.refund(payment.getTransactionId());
payment.setStatus(PaymentStatus.REFUNDED);
paymentRepository.save(payment);
}
}
Orchestration 장단점:
| 장점 | 단점 |
|---|---|
| 전체 워크플로우가 한 곳에서 관리됨 | 오케스트레이터가 SPOF 될 수 있음 |
| 디버깅과 모니터링이 용이 | 서비스 간 결합도가 Choreography보다 높음 |
| 복잡한 보상 로직 관리가 쉬움 | 오케스트레이터에 로직이 집중될 위험 |
| Temporal/Cadence 등 프레임워크 지원 | 추가 인프라(워크플로우 엔진) 필요 |
4-3. Choreography vs Orchestration 선택 가이드
| 기준 | Choreography | Orchestration |
|---|---|---|
| 서비스 수 | 3~4개 이하 | 5개 이상 |
| 워크플로우 복잡도 | 선형적, 단순 | 조건 분기, 병렬 처리 |
| 관찰 가능성 요구 | 낮음 | 높음 |
| 팀 구조 | 독립적 팀 | 중앙 플랫폼 팀 |
| 보상 로직 | 단순 | 복잡 (다중 경로) |
4-4. Compensating Transactions (보상 트랜잭션)
보상 트랜잭션은 이미 커밋된 트랜잭션을 의미론적으로 되돌리는 작업입니다. 물리적 롤백이 아니라 역작용(reverse action)을 수행합니다.
// 보상 트랜잭션 예시
public enum CompensationAction {
PAYMENT_CHARGE -> PAYMENT_REFUND,
INVENTORY_RESERVE -> INVENTORY_RELEASE,
SHIPPING_CREATE -> SHIPPING_CANCEL,
COUPON_APPLY -> COUPON_RESTORE,
POINTS_DEDUCT -> POINTS_RESTORE
}
Semantic Compensation vs Exact Compensation:
- Exact Compensation: 정확히 원래 상태로 복원 (예: 결제 취소 → 전액 환불)
- Semantic Compensation: 비즈니스적으로 의미 있는 보상 (예: 항공권 취소 → 부분 환불 + 크레딧)
실무에서는 Semantic Compensation이 더 일반적입니다. 시간이 지나면 정확한 복원이 불가능한 경우가 많기 때문입니다.
5. Event Sourcing
Event Sourcing은 엔티티의 현재 상태를 저장하는 대신, 상태 변경을 일으킨 모든 이벤트를 순서대로 저장하는 패턴입니다.
5-1. 핵심 개념
전통적 CRUD:
Account 테이블: id=1, balance=750
Event Sourcing:
Event #1: AccountCreated(id=1, initialBalance=1000)
Event #2: MoneyWithdrawn(id=1, amount=200)
Event #3: MoneyDeposited(id=1, amount=150)
Event #4: MoneyWithdrawn(id=1, amount=200)
→ 재생: 1000 - 200 + 150 - 200 = 750 (현재 잔고)
모든 상태 변경이 불변(immutable) 이벤트 로그에 추가(append-only)됩니다. 삭제나 수정은 없습니다.
5-2. Event Store 구현
// === Event Store 인터페이스 ===
public interface EventStore {
void saveEvents(UUID aggregateId, List<DomainEvent> events, int expectedVersion);
List<DomainEvent> getEvents(UUID aggregateId);
List<DomainEvent> getEvents(UUID aggregateId, int fromVersion);
}
// === PostgreSQL 기반 Event Store 구현 ===
@Repository
@RequiredArgsConstructor
public class PostgresEventStore implements EventStore {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
@Override
@Transactional
public void saveEvents(UUID aggregateId, List<DomainEvent> events,
int expectedVersion) {
// 낙관적 동시성 제어
Integer currentVersion = jdbcTemplate.queryForObject(
"SELECT COALESCE(MAX(version), 0) FROM event_store WHERE aggregate_id = ?",
Integer.class, aggregateId
);
if (currentVersion != expectedVersion) {
throw new ConcurrencyException(
"동시성 충돌: 예상 버전 " + expectedVersion +
", 실제 버전 " + currentVersion
);
}
int version = expectedVersion;
for (DomainEvent event : events) {
version++;
jdbcTemplate.update(
"""
INSERT INTO event_store
(event_id, aggregate_id, aggregate_type, event_type,
event_data, version, created_at)
VALUES (?, ?, ?, ?, ?::jsonb, ?, ?)
""",
UUID.randomUUID(),
aggregateId,
event.getAggregateType(),
event.getClass().getSimpleName(),
objectMapper.writeValueAsString(event),
version,
Instant.now()
);
}
}
@Override
public List<DomainEvent> getEvents(UUID aggregateId) {
return jdbcTemplate.query(
"SELECT * FROM event_store WHERE aggregate_id = ? ORDER BY version",
this::mapRowToEvent,
aggregateId
);
}
}
-- Event Store 테이블 스키마
CREATE TABLE event_store (
event_id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
version INTEGER NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (aggregate_id, version) -- 낙관적 동시성 제어
);
CREATE INDEX idx_event_store_aggregate ON event_store (aggregate_id, version);
CREATE INDEX idx_event_store_type ON event_store (event_type, created_at);
5-3. Aggregate에서 이벤트 적용
// === Event-Sourced Aggregate ===
public class OrderAggregate {
private UUID id;
private UUID customerId;
private OrderStatus status;
private BigDecimal totalAmount;
private List<OrderItem> items = new ArrayList<>();
private int version = 0;
// 미적용 이벤트 목록
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
// 이벤트 재생으로 상태 복원
public static OrderAggregate fromHistory(List<DomainEvent> history) {
OrderAggregate aggregate = new OrderAggregate();
for (DomainEvent event : history) {
aggregate.apply(event, false);
}
return aggregate;
}
// 새 명령 처리
public void placeOrder(UUID customerId, List<OrderItem> items) {
if (this.status != null) {
throw new IllegalStateException("이미 생성된 주문입니다");
}
BigDecimal total = items.stream()
.map(item -> item.price().multiply(BigDecimal.valueOf(item.quantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
apply(new OrderPlacedEvent(this.id, customerId, total, items, Instant.now()), true);
}
public void cancelOrder(String reason) {
if (this.status == OrderStatus.SHIPPED) {
throw new IllegalStateException("배송된 주문은 취소할 수 없습니다");
}
apply(new OrderCancelledEvent(this.id, reason, Instant.now()), true);
}
// 이벤트 적용 (상태 변경)
private void apply(DomainEvent event, boolean isNew) {
when(event); // 상태 변경
this.version++;
if (isNew) {
uncommittedEvents.add(event);
}
}
// 이벤트별 상태 변경 로직
private void when(DomainEvent event) {
switch (event) {
case OrderPlacedEvent e -> {
this.id = e.orderId();
this.customerId = e.customerId();
this.totalAmount = e.totalAmount();
this.items = new ArrayList<>(e.items());
this.status = OrderStatus.PLACED;
}
case OrderCancelledEvent e -> {
this.status = OrderStatus.CANCELLED;
}
case OrderShippedEvent e -> {
this.status = OrderStatus.SHIPPED;
}
default -> throw new IllegalArgumentException("알 수 없는 이벤트: " + event);
}
}
public List<DomainEvent> getUncommittedEvents() {
return Collections.unmodifiableList(uncommittedEvents);
}
public void clearUncommittedEvents() {
uncommittedEvents.clear();
}
}
5-4. Snapshot 패턴
이벤트가 수만 개 쌓이면 매번 전체를 재생하는 것은 비효율적입니다. Snapshot을 사용하여 특정 시점의 상태를 저장해두면 해당 시점 이후의 이벤트만 재생하면 됩니다.
// Snapshot 저장 및 복원
@Service
public class SnapshotService {
private static final int SNAPSHOT_INTERVAL = 100; // 100개 이벤트마다
public OrderAggregate loadAggregate(UUID aggregateId) {
// 1. 최신 스냅샷 조회
Optional<Snapshot> snapshot = snapshotRepository
.findLatest(aggregateId);
if (snapshot.isPresent()) {
// 2. 스냅샷에서 상태 복원
OrderAggregate aggregate = deserialize(snapshot.get().getData());
int fromVersion = snapshot.get().getVersion();
// 3. 스냅샷 이후 이벤트만 재생
List<DomainEvent> newEvents = eventStore
.getEvents(aggregateId, fromVersion + 1);
for (DomainEvent event : newEvents) {
aggregate.apply(event, false);
}
return aggregate;
} else {
// 스냅샷 없으면 전체 재생
List<DomainEvent> allEvents = eventStore.getEvents(aggregateId);
return OrderAggregate.fromHistory(allEvents);
}
}
public void saveIfNeeded(OrderAggregate aggregate) {
if (aggregate.getVersion() % SNAPSHOT_INTERVAL == 0) {
snapshotRepository.save(new Snapshot(
aggregate.getId(),
aggregate.getVersion(),
serialize(aggregate),
Instant.now()
));
}
}
}
5-5. Event Sourcing vs CRUD 비교
| 기준 | CRUD | Event Sourcing |
|---|---|---|
| 저장 방식 | 현재 상태만 저장 | 모든 상태 변경 이벤트 저장 |
| 감사 추적 | 별도 구현 필요 | 내장 (이벤트 자체가 감사 로그) |
| 시간 여행 | 불가능 | 가능 (특정 시점 상태 복원) |
| 저장소 용량 | 상대적으로 적음 | 이벤트 누적으로 증가 |
| 복잡도 | 낮음 | 높음 (이벤트 설계, 스냅샷 등) |
| 동시성 처리 | 비관적/낙관적 잠금 | 이벤트 버전 기반 낙관적 제어 |
| 성능 | 읽기 최적화 쉬움 | 이벤트 재생 비용 (스냅샷으로 완화) |
| 스키마 변경 | 마이그레이션 필요 | 이벤트 업캐스팅(Upcasting) |
| CQRS 연계 | 선택사항 | 자연스러운 조합 |
5-6. Event Sourcing을 사용하지 말아야 할 때
- 단순한 CRUD 도메인 (블로그, 설정 관리)
- 강한 일관성이 필수인 경우 (실시간 잔고 확인이 필요한 ATM)
- 이벤트 설계가 불안정한 초기 단계 (잦은 변경 시 이벤트 업캐스팅 비용)
- 팀의 경험이 부족한 경우 (학습 곡선이 가파름)
- 규제 요건으로 데이터 삭제가 필요한 경우 (GDPR right to be forgotten과 충돌)
6. Outbox Pattern (신뢰성의 핵심)
6-1. 문제: 이중 쓰기 (Dual Write)
분산 시스템에서 가장 흔한 실수 중 하나는 데이터베이스 저장과 메시지 발행을 별도로 수행하는 것입니다.
// 위험한 코드: 이중 쓰기 문제
@Transactional
public void createOrder(Order order) {
orderRepository.save(order); // Step 1: DB 저장
kafkaTemplate.send("orders", event); // Step 2: 메시지 발행
// Step 1 성공 + Step 2 실패 = DB에는 있지만 이벤트는 발행 안 됨
// Step 1 성공 + Step 2 성공 + DB 커밋 실패 = 이벤트는 발행됐지만 DB에 없음
}
DB 트랜잭션과 메시지 브로커 전송은 서로 다른 인프라이므로 원자적으로 처리할 수 없습니다.
6-2. 해결: Outbox 패턴
Outbox 패턴은 이벤트를 DB 트랜잭션 내에서 outbox 테이블에 저장하고, 별도 프로세스가 이를 메시지 브로커로 전달합니다.
1. 비즈니스 로직 + outbox INSERT = 단일 DB 트랜잭션 (원자성 보장)
2. 별도 프로세스(CDC/Poller)가 outbox 테이블 읽어서 Kafka로 발행
3. 발행 성공 후 outbox 레코드 처리 완료로 표시
-- Outbox 테이블 스키마
CREATE TABLE outbox (
id UUID PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id UUID NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
status VARCHAR(20) DEFAULT 'PENDING'
);
CREATE INDEX idx_outbox_pending ON outbox (status, created_at)
WHERE status = 'PENDING';
// === Outbox 패턴 구현 ===
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
@Transactional // 단일 트랜잭션으로 보장
public UUID createOrder(CreateOrderCommand command) {
// 1. 비즈니스 로직 수행
Order order = Order.create(command);
orderRepository.save(order);
// 2. 같은 트랜잭션에서 outbox에 이벤트 저장
OutboxMessage message = OutboxMessage.builder()
.aggregateType("Order")
.aggregateId(order.getId())
.eventType("OrderCreated")
.payload(objectMapper.writeValueAsString(
new OrderCreatedEvent(order.getId(), order.getTotalAmount())
))
.build();
outboxRepository.save(message);
return order.getId();
// 트랜잭션 커밋 시 order와 outbox 모두 함께 저장됨
}
}
6-3. CDC (Debezium) 방식
Debezium은 DB의 트랜잭션 로그(WAL)를 모니터링하여 변경을 캡처하고 Kafka로 전달합니다.
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "dbz_password",
"database.dbname": "orderdb",
"database.server.name": "order-service",
"table.include.list": "public.outbox",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "events.order"
}
}
6-4. Polling vs CDC 비교
| 기준 | Polling | CDC (Debezium) |
|---|---|---|
| 지연 | 폴링 주기에 의존 (수 초~수 분) | 거의 실시간 (수 ms) |
| DB 부하 | 주기적 쿼리로 부하 발생 | WAL 읽기로 부하 최소 |
| 구현 복잡도 | 낮음 (스케줄러 + 쿼리) | 높음 (Debezium 인프라 필요) |
| 순서 보장 | outbox 테이블 순서 | WAL 순서 그대로 |
| 운영 부담 | 낮음 | Kafka Connect 클러스터 운영 필요 |
| 확장성 | 단일 폴러 병목 가능 | 높은 확장성 |
7. 멱등성 패턴
분산 시스템에서 메시지는 최소 한 번(at-least-once) 전달됩니다. 네트워크 장애, 브로커 재시작, 소비자 재배포 등으로 중복 수신이 발생합니다. 멱등성은 같은 메시지를 여러 번 처리해도 결과가 한 번 처리한 것과 동일하도록 보장합니다.
7-1. Producer 멱등성 (Kafka)
# Kafka Producer 설정
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.properties.acks=all
spring.kafka.producer.properties.max.in.flight.requests.per.connection=5
spring.kafka.producer.properties.retries=2147483647
Kafka의 멱등 프로듀서는 각 메시지에 시퀀스 번호를 부여하여 브로커가 중복을 감지합니다.
7-2. Consumer 멱등성
// === 멱등 소비자 구현 ===
@Component
@RequiredArgsConstructor
public class IdempotentOrderEventHandler {
private final ProcessedEventRepository processedEventRepository;
private final OrderProjectionRepository orderProjectionRepository;
@KafkaListener(topics = "order-events", groupId = "order-projection")
@Transactional
public void handleOrderEvent(
@Payload OrderEvent event,
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Header("eventId") String eventId) {
// 1. 이미 처리된 이벤트인지 확인
if (processedEventRepository.existsById(eventId)) {
log.info("이벤트 중복 수신 무시: eventId={}", eventId);
return;
}
// 2. 비즈니스 로직 처리
processEvent(event);
// 3. 처리 완료 기록 (같은 트랜잭션)
processedEventRepository.save(new ProcessedEvent(
eventId,
event.getClass().getSimpleName(),
Instant.now()
));
}
private void processEvent(OrderEvent event) {
switch (event) {
case OrderCreatedEvent e -> createOrderProjection(e);
case OrderStatusChangedEvent e -> updateOrderStatus(e);
default -> log.warn("알 수 없는 이벤트 타입");
}
}
}
-- 이벤트 처리 이력 테이블
CREATE TABLE processed_events (
event_id VARCHAR(255) PRIMARY KEY,
event_type VARCHAR(255) NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
7-3. Unique Constraint vs Redis SET NX
| 접근법 | 장점 | 단점 |
|---|---|---|
| DB Unique Constraint | 트랜잭션과 함께 원자적 | DB 부하 증가 |
| Redis SET NX | 매우 빠른 조회 | Redis 장애 시 보장 안 됨 |
| DB + Redis 조합 | 빠른 1차 필터 + 확실한 보장 | 구현 복잡 |
// Redis SET NX를 이용한 빠른 중복 체크
@Component
public class RedisIdempotencyGuard {
private final StringRedisTemplate redisTemplate;
private static final Duration TTL = Duration.ofHours(24);
public boolean tryAcquire(String idempotencyKey) {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(
"idempotency:" + idempotencyKey,
"processed",
TTL
);
return Boolean.TRUE.equals(result);
}
}
8. Dead Letter Queue 전략
메시지 처리에 실패하면 무한 재시도는 시스템을 마비시킵니다. DLQ(Dead Letter Queue)는 처리 불가능한 메시지를 별도 큐로 격리하여 시스템의 안정성을 보장합니다.
8-1. Retry with Exponential Backoff
// === Spring Kafka 재시도 + DLQ 설정 ===
@Configuration
public class KafkaConsumerConfig {
@Bean
public DefaultErrorHandler errorHandler(
KafkaTemplate<String, Object> kafkaTemplate) {
// DLQ로 보내는 recoverer
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(
record.topic() + ".DLQ", record.partition()));
// Exponential Backoff 재시도 설정
ExponentialBackOff backOff = new ExponentialBackOff();
backOff.setInitialInterval(1000L); // 1초
backOff.setMultiplier(2.0); // 2배씩 증가
backOff.setMaxInterval(60000L); // 최대 60초
backOff.setMaxElapsedTime(300000L); // 최대 5분
DefaultErrorHandler errorHandler =
new DefaultErrorHandler(recoverer, backOff);
// 재시도하지 않을 예외 (비즈니스 오류)
errorHandler.addNotRetryableExceptions(
InvalidOrderException.class,
DuplicateEventException.class
);
return errorHandler;
}
}
8-2. DLQ 모니터링 및 알림
// === DLQ 모니터링 서비스 ===
@Component
@RequiredArgsConstructor
public class DlqMonitoringService {
private final MeterRegistry meterRegistry;
private final AlertService alertService;
@KafkaListener(topics = "order-events.DLQ", groupId = "dlq-monitor")
public void monitorDlq(
ConsumerRecord<String, Object> record,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String errorMessage,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic) {
// 메트릭 기록
meterRegistry.counter("dlq.messages.received",
"originalTopic", originalTopic,
"errorType", classifyError(errorMessage)
).increment();
// 알림 발송
alertService.sendAlert(DlqAlert.builder()
.originalTopic(originalTopic)
.messageKey(record.key())
.errorMessage(errorMessage)
.timestamp(Instant.ofEpochMilli(record.timestamp()))
.severity(classifySeverity(errorMessage))
.build()
);
log.error("DLQ 메시지 수신: topic={}, key={}, error={}",
originalTopic, record.key(), errorMessage);
}
}
8-3. DLQ 재처리 파이프라인
// === DLQ 메시지 재처리 ===
@RestController
@RequestMapping("/api/dlq")
@RequiredArgsConstructor
public class DlqReprocessController {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final DlqMessageRepository dlqRepository;
// 단건 재처리
@PostMapping("/reprocess/single")
public ResponseEntity<String> reprocessSingle(
@RequestParam String messageId) {
DlqMessage message = dlqRepository.findById(messageId).orElseThrow();
// 원래 토픽으로 재발행
kafkaTemplate.send(
message.getOriginalTopic(),
message.getKey(),
message.getValue()
);
message.setStatus("REPROCESSED");
message.setReprocessedAt(Instant.now());
dlqRepository.save(message);
return ResponseEntity.ok("재처리 요청 완료: " + messageId);
}
// 배치 재처리
@PostMapping("/reprocess/batch")
public ResponseEntity<String> reprocessBatch(
@RequestParam String originalTopic,
@RequestParam(defaultValue = "100") int batchSize) {
List<DlqMessage> messages = dlqRepository
.findByOriginalTopicAndStatus(originalTopic, "PENDING",
PageRequest.of(0, batchSize));
int count = 0;
for (DlqMessage message : messages) {
kafkaTemplate.send(message.getOriginalTopic(),
message.getKey(), message.getValue());
message.setStatus("REPROCESSED");
count++;
}
dlqRepository.saveAll(messages);
return ResponseEntity.ok(count + "건 재처리 요청 완료");
}
}
9. Exactly-Once Semantics
메시지 전달 보장 수준은 세 가지가 있습니다.
| 보장 수준 | 설명 | 복잡도 |
|---|---|---|
| At-most-once | 최대 한 번 전달 (유실 가능) | 낮음 |
| At-least-once | 최소 한 번 전달 (중복 가능) | 중간 |
| Exactly-once | 정확히 한 번 전달 | 높음 |
9-1. Kafka Exactly-Once 구현
Kafka는 0.11부터 Exactly-Once Semantics(EOS)를 지원합니다.
// === Kafka Exactly-Once 설정 ===
@Configuration
public class KafkaExactlyOnceConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-");
config.put(ProducerConfig.ACKS_CONFIG, "all");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config);
}
}
9-2. End-to-End Exactly-Once
진정한 End-to-End Exactly-Once는 Producer부터 Consumer의 최종 상태 저장까지 보장해야 합니다.
End-to-End Exactly-Once 체인:
Producer (idempotent + transactions)
→ Kafka Broker (transaction log)
→ Consumer (read_committed + manual offset)
→ Database (idempotency key + business logic)
모든 체인이 함께 동작해야 Exactly-Once가 보장됨
// === End-to-End Exactly-Once 처리 ===
@Component
@RequiredArgsConstructor
public class ExactlyOnceOrderProcessor {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final OrderRepository orderRepository;
private final ProcessedOffsetRepository offsetRepository;
@KafkaListener(topics = "payment-completed")
@Transactional("chainedKafkaTransactionManager")
public void processPaymentCompleted(
ConsumerRecord<String, PaymentCompletedEvent> record) {
String offsetKey = record.topic() + "-" +
record.partition() + "-" + record.offset();
// 이미 처리된 오프셋인지 확인
if (offsetRepository.existsById(offsetKey)) {
return;
}
// 비즈니스 로직
PaymentCompletedEvent event = record.value();
Order order = orderRepository.findById(event.orderId()).orElseThrow();
order.markAsPaid(event.paymentId());
orderRepository.save(order);
// 다음 단계 이벤트 발행 (같은 Kafka 트랜잭션)
kafkaTemplate.send("order-paid",
new OrderPaidEvent(order.getId(), event.paymentId()));
// 처리된 오프셋 기록 (같은 DB 트랜잭션)
offsetRepository.save(new ProcessedOffset(offsetKey, Instant.now()));
}
}
10. Reactive Streams vs Virtual Threads (2025)
10-1. Project Reactor: 배압(Backpressure)이 핵심
리액티브 스트림은 논블로킹 I/O와 배압(Backpressure) 메커니즘을 핵심 설계 원칙으로 합니다.
// === Reactor 기반 비동기 처리 ===
@Service
public class ReactiveOrderService {
public Flux<OrderEvent> processOrderStream() {
return kafkaReceiver.receive()
.flatMap(record -> {
OrderEvent event = record.value();
return processEvent(event)
.then(record.receiverOffset().commit())
.thenReturn(event);
}, 16) // 동시 처리 수 제한 (배압)
.onErrorResume(e -> {
log.error("처리 실패", e);
return Mono.empty();
})
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
}
private Mono<Void> processEvent(OrderEvent event) {
return Mono.fromCallable(() -> {
// 비즈니스 로직
return null;
}).subscribeOn(Schedulers.boundedElastic()).then();
}
}
10-2. Java 21 Virtual Threads: 더 단순한 코드
Virtual Threads는 기존 동기식 코드 스타일을 유지하면서 높은 동시성을 달성합니다.
// === Virtual Threads 기반 비동기 처리 ===
@Configuration
public class VirtualThreadConfig {
@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerCustomizer() {
return protocolHandler -> {
protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
};
}
}
@Service
public class VirtualThreadOrderService {
// 동기식 코드지만 Virtual Thread 위에서 실행
// → 블로킹 I/O 시 OS 스레드가 아닌 Virtual Thread만 블로킹
public OrderResult processOrder(UUID orderId) {
// 병렬 처리: StructuredTaskScope 활용
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<PaymentResult> paymentTask = scope.fork(() ->
paymentService.processPayment(orderId)
);
Subtask<InventoryResult> inventoryTask = scope.fork(() ->
inventoryService.checkInventory(orderId)
);
scope.join();
scope.throwIfFailed();
return new OrderResult(
paymentTask.get(),
inventoryTask.get()
);
}
}
}
10-3. 선택 가이드: 의사결정 매트릭스
| 기준 | Reactive (Reactor/WebFlux) | Virtual Threads |
|---|---|---|
| 코드 복잡도 | 높음 (리액티브 연산자 학습) | 낮음 (기존 동기 코드) |
| 배압 제어 | 내장 (Flux/Mono) | 수동 구현 필요 |
| 디버깅 | 어려움 (스택 트레이스 복잡) | 쉬움 (전통적 스택 트레이스) |
| 메모리 효율 | 매우 좋음 | 좋음 (OS 스레드보다) |
| I/O 집약 작업 | 최적 | 매우 좋음 |
| CPU 집약 작업 | 부적합 | 부적합 (플랫폼 스레드 사용) |
| 기존 코드 마이그레이션 | 전면 재작성 필요 | 설정 변경만으로 가능 |
| 생태계 성숙도 | 높음 (5년+) | 성장 중 (2023~) |
권장 가이드:
- 새 프로젝트 + 단순 I/O: Virtual Threads
- 스트리밍 데이터 처리: Reactive Streams
- 기존 Spring MVC 프로젝트: Virtual Threads (마이그레이션 비용 최소)
- 실시간 데이터 파이프라인: Reactive Streams
11. 실전 아키텍처: 결제 시스템 설계
모든 패턴을 결합한 이커머스 결제 시스템의 전체 아키텍처를 설계합니다.
이커머스 결제 시스템 전체 아키텍처
================================
[Client] → [API Gateway]
|
↓
[Order Service] ─── Write DB (PostgreSQL)
| |
| (Outbox Pattern) | (CDC - Debezium)
| ↓
| [Kafka: order-events]
| / | \
↓ ↓ ↓ ↓
[Saga Orchestrator] [Payment] [Inventory] [Notification]
(Temporal) Service Service Service
| | |
| commands | |
↓ ↓ ↓
[payment-commands] [payment- [inventory-
events] events]
|
[DLQ + Monitoring]
|
[Alerting (PagerDuty)]
Read Side:
[Kafka: order-events] → [Order Projection] → Read DB (MongoDB)
→ [Analytics Service] → Data Warehouse
→ [Search Service] → Elasticsearch
적용된 패턴 요약
| 패턴 | 적용 위치 | 목적 |
|---|---|---|
| CQRS | Order Service | 주문 쓰기/읽기 분리 |
| Saga (Orchestration) | Temporal Orchestrator | 결제-재고-배송 트랜잭션 관리 |
| Event Sourcing | Order Aggregate | 주문 상태 변경 이력 관리 |
| Outbox Pattern | Order Service → Kafka | DB-메시지 원자성 보장 |
| 멱등성 | 모든 Consumer | 중복 메시지 처리 방지 |
| DLQ | 모든 Kafka Consumer | 처리 실패 메시지 격리 |
| Exactly-Once | Payment 처리 | 이중 결제 방지 |
퀴즈
Q1. CQRS에서 Write Model과 Read Model이 서로 다른 데이터베이스를 사용할 때, 두 모델 간의 데이터 일관성을 보장하기 위해 가장 일반적으로 사용되는 접근 방식은?
정답: 이벤트 기반 비동기 동기화 (Eventual Consistency)
Write Model에서 상태 변경 시 이벤트를 발행하고, Read Model의 프로젝션 핸들러가 이 이벤트를 구독하여 Read DB를 업데이트합니다. 두 모델 사이에는 일시적 불일치(수 ms~수 초)가 존재할 수 있지만, 최종적으로 일관성이 보장됩니다.
강한 일관성이 필요한 경우에는 CQRS의 적합성을 재검토해야 합니다.
Q2. Choreography Saga에서 서비스 A가 이벤트를 발행하고 서비스 B가 처리에 실패했을 때, 서비스 A는 어떻게 이를 인지할 수 있나요?
정답: 서비스 B가 실패 이벤트(예: PaymentFailed)를 발행하면 서비스 A가 이를 구독하여 인지합니다. Choreography에서는 각 서비스가 성공/실패 이벤트를 모두 발행해야 하며, 관련 서비스들이 이를 구독하여 적절히 대응합니다.
이것이 Choreography의 단점 중 하나입니다. 서비스가 많아질수록 이벤트 흐름 추적이 복잡해지며, 타임아웃 기반 감지가 추가로 필요할 수 있습니다.
Q3. Event Sourcing에서 이벤트가 100만 개 쌓인 Aggregate를 로드할 때 성능 문제를 해결하는 핵심 기법은?
정답: Snapshot 패턴
주기적으로(예: 100개 이벤트마다) Aggregate의 현재 상태를 스냅샷으로 저장합니다. 로드 시 가장 최근 스냅샷에서 시작하여 그 이후의 이벤트만 재생하면 됩니다.
예: 100만 개 이벤트 중 마지막 스냅샷이 999,900번째라면 100개의 이벤트만 재생하면 됩니다.
Q4. Outbox Pattern에서 CDC(Change Data Capture) 방식이 Polling 방식보다 선호되는 주요 이유는?
정답: CDC는 DB의 트랜잭션 로그(WAL)를 직접 읽기 때문에 거의 실시간(수 ms)으로 변경을 감지합니다. Polling은 주기적으로 쿼리를 실행하므로 지연이 크고(수 초~수 분), 빈번한 쿼리로 DB에 부하를 줍니다.
또한 CDC는 WAL 순서 그대로 이벤트를 전달하여 순서 보장이 자연스럽고, DB 트랜잭션 로그를 읽으므로 추가적인 쿼리 부하가 거의 없습니다.
Q5. Kafka에서 Exactly-Once Semantics를 달성하려면 Producer, Broker, Consumer 각각에서 어떤 설정이 필요한가요?
정답:
- Producer: enable.idempotence=true + transactional.id 설정 (멱등 프로듀서 + 트랜잭션)
- Broker: 트랜잭션 로그 유지 (기본 활성화)
- Consumer: isolation.level=read_committed + 수동 오프셋 커밋 + 멱등 처리
End-to-End Exactly-Once를 위해서는 Consumer가 DB에 결과를 저장할 때도 멱등성을 보장해야 합니다. Kafka 트랜잭션은 Kafka 내부의 Exactly-Once만 보장하며, 외부 시스템(DB)까지 포함하려면 Consumer 측 멱등성이 필수입니다.
참고 자료
- Martin Fowler — "Event Sourcing" (martinfowler.com)
- Chris Richardson — "Microservices Patterns" (Manning, 2018)
- Vaughn Vernon — "Implementing Domain-Driven Design" (Addison-Wesley, 2013)
- Greg Young — "CQRS Documents" (cqrs.files.wordpress.com)
- Apache Kafka Documentation — "Exactly-Once Semantics" (kafka.apache.org)
- Debezium Documentation — "Outbox Event Router" (debezium.io)
- Temporal.io Documentation — "Saga Pattern with Temporal" (docs.temporal.io)
- Netflix Tech Blog — "Evolution of the Netflix Data Pipeline" (netflixtechblog.com)
- Uber Engineering Blog — "Building Reliable Reprocessing and Dead Letter Queues" (eng.uber.com)
- Confluent Blog — "Exactly-Once Semantics Are Possible" (confluent.io)
- AWS Architecture Blog — "Saga Orchestration Pattern" (aws.amazon.com)
- Microsoft Azure Docs — "CQRS Pattern" (learn.microsoft.com)
- Spring for Apache Kafka Reference — "Error Handling and DLQ" (docs.spring.io)
- JEP 444 — "Virtual Threads" (openjdk.org)
- Project Reactor Reference — "Backpressure" (projectreactor.io)
- Pat Helland — "Idempotence Is Not a Medical Condition" (ACM Queue, 2012)
Async Processing Patterns Mastery: CQRS, Saga, Event Sourcing — Senior Backend Developer Essential Guide
- 1. Why Async? The Survival Strategy for Large-Scale Systems
- 2. Event-Driven Architecture Fundamentals
- 3. CQRS Deep Dive
- 4. Saga Pattern Mastery
- 5. Event Sourcing
- 6. Outbox Pattern (The Core of Reliability)
- 7. Idempotency Patterns
- 8. Dead Letter Queue Strategy
- 9. Exactly-Once Semantics
- 10. Reactive Streams vs Virtual Threads (2025)
- 11. Production Architecture: Payment System Design
- Quiz
- References
1. Why Async? The Survival Strategy for Large-Scale Systems
Modern large-scale systems handle traffic volumes that synchronous request-response models simply cannot manage.
Real-World Examples:
| Company | Scale | Async Processing Approach |
|---|---|---|
| Netflix | 260M subscribers, millions of events/sec | Apache Kafka-based event streaming |
| Uber | Petabytes of daily real-time data | Apache Kafka + Apache Flink |
| 7T+ messages/day via Kafka | Self-developed Kafka infrastructure | |
| Shopify | 800K requests/sec on Black Friday | Async order processing pipeline |
In synchronous architecture, when Service A calls Service B, and B calls C, the total latency is the sum of each service call. If any single service slows down, the entire chain blocks.
In asynchronous architecture, a message broker decouples services, enabling each to scale independently.
Synchronous: Client -> A(50ms) -> B(100ms) -> C(200ms) = 350ms total wait
Asynchronous: Client -> A(50ms) -> Message Broker -> (B, C process in parallel)
Client gets response after 50ms, B and C process asynchronously
Key Benefits of Going Async:
- Temporal decoupling — Producers and consumers do not need to be running simultaneously
- Independent scaling — Read and write workloads can be scaled separately
- Fault isolation — A single service failure does not cascade to the entire system
- Peak traffic absorption — Message queues act as buffers
However, async is not free. Eventual Consistency, message ordering, duplicate handling, and distributed transactions are new challenges. The patterns covered in this guide are the answers to these problems.
2. Event-Driven Architecture Fundamentals
Event-Driven Architecture (EDA) is the foundation of async patterns. State changes in the system are expressed as "events," and interested services subscribe and react to them.
2-1. Domain Events vs Integration Events
| Aspect | Domain Event | Integration Event |
|---|---|---|
| Scope | Within a single bounded context | Between bounded contexts |
| Example | OrderItemAdded | OrderPlaced |
| Delivery | In-memory event bus | Message broker (Kafka, RabbitMQ) |
| Schema mgmt | Freely changeable (internal) | Contract management required |
// Domain Event - within bounded context
public record OrderItemAdded(
UUID orderId,
UUID productId,
int quantity,
BigDecimal price,
Instant occurredAt
) implements DomainEvent {}
// Integration Event - published to external services
public record OrderPlacedEvent(
UUID orderId,
UUID customerId,
BigDecimal totalAmount,
List<OrderLineItem> items,
Instant occurredAt,
int version // schema version management
) implements IntegrationEvent {}
2-2. Event Bus vs Event Store
An Event Bus (e.g., Kafka, RabbitMQ) is a pipeline that delivers events. Once consumers receive events, they are typically deleted (or expire after TTL).
An Event Store permanently persists all events. Events can be replayed to reconstruct state at any point in time.
Event Bus (Kafka):
Producer -> [Topic: orders] -> Consumer Group A (Order Service)
-> Consumer Group B (Notification Service)
-> Consumer Group C (Analytics Service)
Event Store:
[Event #1: OrderCreated]
[Event #2: ItemAdded]
[Event #3: PaymentReceived]
[Event #4: OrderShipped]
-> Replay events #1-#4 to reconstruct current order state
2-3. Understanding Eventual Consistency
Maintaining strong consistency in distributed systems requires distributed locks, 2PC (2-Phase Commit), etc., which significantly degrade performance and availability.
According to the CAP theorem, during network partitions you must choose between Consistency (C) and Availability (A). Most large-scale systems choose Availability and accept Eventual Consistency.
Eventual Consistency means "data may be inconsistent right now, but given enough time, all nodes will converge to the same state."
Practical Implications:
- Inventory might not decrease immediately after an order (milliseconds to seconds delay)
- Show users "Order is being processed" first, then update after confirmation
- Minimizing the consistency window is the key concern
3. CQRS Deep Dive
CQRS (Command Query Responsibility Segregation) completely separates the read and write models.
3-1. Core Concept
In traditional CRUD, a single data model handles both reads and writes. In reality, reads and writes have very different requirements.
Traditional CRUD:
Client -> [Single Model] -> Database
(both read/write)
CQRS:
Client --Command--> [Write Model] -> Command DB (normalized)
| Events
Client <--Query---- [Read Model] <- Read DB (denormalized, optimized)
Write Model (Command Side):
- Business rule validation
- Domain invariant enforcement
- Normalized schema
- Strong consistency
Read Model (Query Side):
- Denormalized views optimized for queries
- Multiple storage options (Elasticsearch, Redis, MongoDB)
- Eventual Consistency acceptable
3-2. When to Use CQRS
Good fit:
- Extreme read/write ratio (reads 100:1 or more)
- Complex domain logic + diverse query requirements
- Different scaling needs for reads and writes
- When used together with Event Sourcing
Not a good fit:
- Simple CRUD applications
- When strong consistency is absolutely required
- Simple domain with predictable query patterns
3-3. Implementation with Spring Boot + Kafka
// === Command Side: Order Creation ===
@Service
@RequiredArgsConstructor
public class OrderCommandService {
private final OrderRepository orderRepository;
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
@Transactional
public UUID createOrder(CreateOrderCommand command) {
// 1. Business rule validation
validateOrder(command);
// 2. Process in domain model
Order order = Order.create(
command.customerId(),
command.items(),
command.shippingAddress()
);
// 3. Save to Write DB
orderRepository.save(order);
// 4. Publish event (for Read Model sync)
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getCustomerId(),
order.getTotalAmount(),
order.getItems().stream()
.map(this::toEventItem)
.toList(),
Instant.now()
);
kafkaTemplate.send("order-events", order.getId().toString(), event);
return order.getId();
}
private void validateOrder(CreateOrderCommand command) {
if (command.items().isEmpty()) {
throw new InvalidOrderException("Order items are empty");
}
// Check inventory, customer status, etc.
}
}
// === Query Side: Order Query Projection ===
@Service
@RequiredArgsConstructor
public class OrderQueryService {
private final OrderReadRepository readRepository;
public OrderDetailView getOrderDetail(UUID orderId) {
return readRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
}
public Page<OrderSummaryView> getCustomerOrders(
UUID customerId, Pageable pageable) {
return readRepository.findByCustomerId(customerId, pageable);
}
}
// Read Model - denormalized view optimized for queries
@Document(collection = "order_views")
public record OrderDetailView(
UUID orderId,
UUID customerId,
String customerName, // Denormalized: includes customer name
String customerEmail,
BigDecimal totalAmount,
String status,
List<OrderItemView> items,
AddressView shippingAddress,
Instant createdAt,
Instant lastUpdatedAt
) {}
// === Event Handler: Write -> Read Sync ===
@Component
@RequiredArgsConstructor
public class OrderProjectionHandler {
private final OrderReadRepository readRepository;
private final CustomerQueryClient customerClient;
@KafkaListener(topics = "order-events", groupId = "order-projection")
public void handleOrderEvent(OrderEvent event) {
switch (event) {
case OrderCreatedEvent e -> handleOrderCreated(e);
case OrderStatusChangedEvent e -> handleStatusChanged(e);
case OrderCancelledEvent e -> handleOrderCancelled(e);
default -> log.warn("Unknown event type: {}", event.getClass());
}
}
private void handleOrderCreated(OrderCreatedEvent event) {
// Fetch customer info to create denormalized view
CustomerInfo customer = customerClient.getCustomer(event.customerId());
OrderDetailView view = new OrderDetailView(
event.orderId(),
event.customerId(),
customer.name(),
customer.email(),
event.totalAmount(),
"CREATED",
event.items().stream().map(this::toItemView).toList(),
null,
event.occurredAt(),
event.occurredAt()
);
readRepository.save(view);
}
}
3-4. Read Model Synchronization Strategies
| Strategy | Latency | Consistency | Complexity | Best For |
|---|---|---|---|---|
| Event Handler (Kafka) | ms to seconds | Eventual | Medium | Most cases |
| CDC (Debezium) | Seconds | Eventual | High | Legacy system integration |
| Polling (Scheduled) | Seconds to minutes | Eventual | Low | Simple cases |
| Synchronous update | Immediate | Strong | Low | Low performance requirements |
4. Saga Pattern Mastery
How do you manage business transactions spanning multiple services in microservices? Traditional DB transactions from monoliths are not available. The Saga pattern decomposes distributed transactions into a series of local transactions.
4-1. Choreography
In the Choreography approach, there is no central coordinator. Each service publishes events, and the next service subscribes to those events to perform its local transaction.
E-commerce Order Flow (Choreography):
Order Service --OrderPlaced--> Payment Service
|
PaymentCompleted
|
v
Inventory Service
|
InventoryReserved
|
v
Shipping Service
|
ShipmentCreated
|
v
Order Service (status update)
On Failure (Compensating Transactions):
Inventory Service --InventoryReservationFailed--> Payment Service (refund)
|
PaymentRefunded
|
v
Order Service (cancel order)
// === Choreography Saga: Order Service ===
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
@Transactional
public UUID placeOrder(PlaceOrderCommand command) {
Order order = Order.create(command);
order.setStatus(OrderStatus.PENDING);
orderRepository.save(order);
// Publish event - Payment Service subscribes
kafkaTemplate.send("order-events", new OrderPlacedEvent(
order.getId(),
order.getCustomerId(),
order.getTotalAmount(),
order.getItems()
));
return order.getId();
}
// Payment completed event listener
@KafkaListener(topics = "payment-events", groupId = "order-service")
public void onPaymentCompleted(PaymentCompletedEvent event) {
Order order = orderRepository.findById(event.orderId()).orElseThrow();
order.setStatus(OrderStatus.PAID);
orderRepository.save(order);
}
// Payment failed event listener
@KafkaListener(topics = "payment-events", groupId = "order-service")
public void onPaymentFailed(PaymentFailedEvent event) {
Order order = orderRepository.findById(event.orderId()).orElseThrow();
order.setStatus(OrderStatus.CANCELLED);
order.setCancellationReason(event.reason());
orderRepository.save(order);
}
}
// === Choreography Saga: Payment Service ===
@Service
@RequiredArgsConstructor
public class PaymentService {
private final PaymentRepository paymentRepository;
private final PaymentGateway paymentGateway;
private final KafkaTemplate<String, Object> kafkaTemplate;
@KafkaListener(topics = "order-events", groupId = "payment-service")
@Transactional
public void onOrderPlaced(OrderPlacedEvent event) {
try {
// Process payment
PaymentResult result = paymentGateway.charge(
event.customerId(),
event.totalAmount()
);
Payment payment = Payment.create(
event.orderId(),
event.customerId(),
event.totalAmount(),
result.transactionId()
);
paymentRepository.save(payment);
// Publish success event - Inventory Service subscribes
kafkaTemplate.send("payment-events", new PaymentCompletedEvent(
event.orderId(),
payment.getId(),
result.transactionId()
));
} catch (PaymentException e) {
// Publish failure event - Order Service subscribes and cancels
kafkaTemplate.send("payment-events", new PaymentFailedEvent(
event.orderId(),
e.getMessage()
));
}
}
}
Choreography Pros and Cons:
| Pros | Cons |
|---|---|
| Loose coupling between services | Difficult to understand full flow |
| Each service independently deployable | Complex debugging |
| No Single Point of Failure (SPOF) | Possible circular dependencies |
| Relatively simple implementation | Event tracking complexity grows with services |
4-2. Orchestration
In the Orchestration approach, a central coordinator (Orchestrator) manages the entire workflow. It sends commands to each service and decides the next step based on responses.
E-commerce Order Flow (Orchestration):
Order Saga Orchestrator
/ | \
v v v
Payment Service Inventory Shipping
(charge) (reserve) (create)
v v v
(result) (result) (result)
\ | /
Order Saga Orchestrator
(final decision)
// === Orchestration Saga: Using Temporal Framework ===
@WorkflowInterface
public interface OrderSagaWorkflow {
@WorkflowMethod
OrderResult processOrder(PlaceOrderCommand command);
}
@WorkflowImpl(workflows = OrderSagaWorkflow.class)
public class OrderSagaWorkflowImpl implements OrderSagaWorkflow {
// Activity stubs for each service
private final PaymentActivities paymentActivities =
Workflow.newActivityStub(PaymentActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.setRetryOptions(RetryOptions.newBuilder()
.setMaximumAttempts(3)
.build())
.build());
private final InventoryActivities inventoryActivities =
Workflow.newActivityStub(InventoryActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.build());
private final ShippingActivities shippingActivities =
Workflow.newActivityStub(ShippingActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(60))
.build());
@Override
public OrderResult processOrder(PlaceOrderCommand command) {
String paymentId = null;
String reservationId = null;
try {
// Step 1: Payment
paymentId = paymentActivities.processPayment(
command.customerId(),
command.totalAmount()
);
// Step 2: Inventory reservation
reservationId = inventoryActivities.reserveInventory(
command.items()
);
// Step 3: Create shipment
String shipmentId = shippingActivities.createShipment(
command.orderId(),
command.shippingAddress()
);
return OrderResult.success(command.orderId(), shipmentId);
} catch (Exception e) {
// Execute compensating transactions
compensate(paymentId, reservationId);
return OrderResult.failed(command.orderId(), e.getMessage());
}
}
private void compensate(String paymentId, String reservationId) {
// Execute compensating transactions in reverse order
if (reservationId != null) {
inventoryActivities.cancelReservation(reservationId);
}
if (paymentId != null) {
paymentActivities.refundPayment(paymentId);
}
}
}
// === Activity Implementation ===
@ActivityInterface
public interface PaymentActivities {
String processPayment(UUID customerId, BigDecimal amount);
void refundPayment(String paymentId);
}
@Component
@RequiredArgsConstructor
public class PaymentActivitiesImpl implements PaymentActivities {
private final PaymentGateway paymentGateway;
private final PaymentRepository paymentRepository;
@Override
public String processPayment(UUID customerId, BigDecimal amount) {
PaymentResult result = paymentGateway.charge(customerId, amount);
Payment payment = Payment.create(customerId, amount, result.transactionId());
paymentRepository.save(payment);
return payment.getId().toString();
}
@Override
public void refundPayment(String paymentId) {
Payment payment = paymentRepository.findById(UUID.fromString(paymentId))
.orElseThrow();
paymentGateway.refund(payment.getTransactionId());
payment.setStatus(PaymentStatus.REFUNDED);
paymentRepository.save(payment);
}
}
Orchestration Pros and Cons:
| Pros | Cons |
|---|---|
| Entire workflow managed in one place | Orchestrator can become SPOF |
| Easy debugging and monitoring | Higher coupling than Choreography |
| Easier complex compensation logic | Risk of logic concentration in orchestrator |
| Framework support (Temporal/Cadence) | Additional infrastructure (workflow engine) needed |
4-3. Choreography vs Orchestration Selection Guide
| Criteria | Choreography | Orchestration |
|---|---|---|
| Number of services | 3-4 or fewer | 5 or more |
| Workflow complexity | Linear, simple | Conditional branching, parallel processing |
| Observability needs | Low | High |
| Team structure | Independent teams | Central platform team |
| Compensation logic | Simple | Complex (multiple paths) |
4-4. Compensating Transactions
Compensating transactions semantically undo already committed transactions. They do not physically roll back but perform reverse actions.
// Compensating transaction mapping
public enum CompensationAction {
PAYMENT_CHARGE -> PAYMENT_REFUND,
INVENTORY_RESERVE -> INVENTORY_RELEASE,
SHIPPING_CREATE -> SHIPPING_CANCEL,
COUPON_APPLY -> COUPON_RESTORE,
POINTS_DEDUCT -> POINTS_RESTORE
}
Semantic Compensation vs Exact Compensation:
- Exact Compensation: Restores precisely to original state (e.g., payment cancel -> full refund)
- Semantic Compensation: Provides a business-meaningful compensation (e.g., flight cancellation -> partial refund + credit)
In practice, Semantic Compensation is more common because exact restoration is often impossible after time passes.
5. Event Sourcing
Event Sourcing stores all events that caused state changes in order, instead of storing only the current state of an entity.
5-1. Core Concept
Traditional CRUD:
Account table: id=1, balance=750
Event Sourcing:
Event #1: AccountCreated(id=1, initialBalance=1000)
Event #2: MoneyWithdrawn(id=1, amount=200)
Event #3: MoneyDeposited(id=1, amount=150)
Event #4: MoneyWithdrawn(id=1, amount=200)
-> Replay: 1000 - 200 + 150 - 200 = 750 (current balance)
All state changes are appended to an immutable, append-only event log. No deletions or updates.
5-2. Event Store Implementation
// === Event Store Interface ===
public interface EventStore {
void saveEvents(UUID aggregateId, List<DomainEvent> events, int expectedVersion);
List<DomainEvent> getEvents(UUID aggregateId);
List<DomainEvent> getEvents(UUID aggregateId, int fromVersion);
}
// === PostgreSQL-based Event Store Implementation ===
@Repository
@RequiredArgsConstructor
public class PostgresEventStore implements EventStore {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
@Override
@Transactional
public void saveEvents(UUID aggregateId, List<DomainEvent> events,
int expectedVersion) {
// Optimistic concurrency control
Integer currentVersion = jdbcTemplate.queryForObject(
"SELECT COALESCE(MAX(version), 0) FROM event_store WHERE aggregate_id = ?",
Integer.class, aggregateId
);
if (currentVersion != expectedVersion) {
throw new ConcurrencyException(
"Concurrency conflict: expected version " + expectedVersion +
", actual version " + currentVersion
);
}
int version = expectedVersion;
for (DomainEvent event : events) {
version++;
jdbcTemplate.update(
"""
INSERT INTO event_store
(event_id, aggregate_id, aggregate_type, event_type,
event_data, version, created_at)
VALUES (?, ?, ?, ?, ?::jsonb, ?, ?)
""",
UUID.randomUUID(),
aggregateId,
event.getAggregateType(),
event.getClass().getSimpleName(),
objectMapper.writeValueAsString(event),
version,
Instant.now()
);
}
}
@Override
public List<DomainEvent> getEvents(UUID aggregateId) {
return jdbcTemplate.query(
"SELECT * FROM event_store WHERE aggregate_id = ? ORDER BY version",
this::mapRowToEvent,
aggregateId
);
}
}
-- Event Store table schema
CREATE TABLE event_store (
event_id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
version INTEGER NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (aggregate_id, version) -- optimistic concurrency control
);
CREATE INDEX idx_event_store_aggregate ON event_store (aggregate_id, version);
CREATE INDEX idx_event_store_type ON event_store (event_type, created_at);
5-3. Applying Events on Aggregates
// === Event-Sourced Aggregate ===
public class OrderAggregate {
private UUID id;
private UUID customerId;
private OrderStatus status;
private BigDecimal totalAmount;
private List<OrderItem> items = new ArrayList<>();
private int version = 0;
// Uncommitted events list
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
// Restore state from event history
public static OrderAggregate fromHistory(List<DomainEvent> history) {
OrderAggregate aggregate = new OrderAggregate();
for (DomainEvent event : history) {
aggregate.apply(event, false);
}
return aggregate;
}
// Handle new commands
public void placeOrder(UUID customerId, List<OrderItem> items) {
if (this.status != null) {
throw new IllegalStateException("Order already created");
}
BigDecimal total = items.stream()
.map(item -> item.price().multiply(BigDecimal.valueOf(item.quantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
apply(new OrderPlacedEvent(this.id, customerId, total, items, Instant.now()), true);
}
public void cancelOrder(String reason) {
if (this.status == OrderStatus.SHIPPED) {
throw new IllegalStateException("Cannot cancel shipped orders");
}
apply(new OrderCancelledEvent(this.id, reason, Instant.now()), true);
}
// Apply event (state change)
private void apply(DomainEvent event, boolean isNew) {
when(event); // State mutation
this.version++;
if (isNew) {
uncommittedEvents.add(event);
}
}
// State change logic per event type
private void when(DomainEvent event) {
switch (event) {
case OrderPlacedEvent e -> {
this.id = e.orderId();
this.customerId = e.customerId();
this.totalAmount = e.totalAmount();
this.items = new ArrayList<>(e.items());
this.status = OrderStatus.PLACED;
}
case OrderCancelledEvent e -> {
this.status = OrderStatus.CANCELLED;
}
case OrderShippedEvent e -> {
this.status = OrderStatus.SHIPPED;
}
default -> throw new IllegalArgumentException("Unknown event: " + event);
}
}
public List<DomainEvent> getUncommittedEvents() {
return Collections.unmodifiableList(uncommittedEvents);
}
public void clearUncommittedEvents() {
uncommittedEvents.clear();
}
}
5-4. Snapshot Pattern
When tens of thousands of events accumulate, replaying all of them every time is inefficient. Snapshots save the aggregate state at specific intervals, so only events after the snapshot need to be replayed.
// Snapshot save and restore
@Service
public class SnapshotService {
private static final int SNAPSHOT_INTERVAL = 100; // every 100 events
public OrderAggregate loadAggregate(UUID aggregateId) {
// 1. Get latest snapshot
Optional<Snapshot> snapshot = snapshotRepository
.findLatest(aggregateId);
if (snapshot.isPresent()) {
// 2. Restore state from snapshot
OrderAggregate aggregate = deserialize(snapshot.get().getData());
int fromVersion = snapshot.get().getVersion();
// 3. Replay only events after snapshot
List<DomainEvent> newEvents = eventStore
.getEvents(aggregateId, fromVersion + 1);
for (DomainEvent event : newEvents) {
aggregate.apply(event, false);
}
return aggregate;
} else {
// No snapshot, replay all
List<DomainEvent> allEvents = eventStore.getEvents(aggregateId);
return OrderAggregate.fromHistory(allEvents);
}
}
public void saveIfNeeded(OrderAggregate aggregate) {
if (aggregate.getVersion() % SNAPSHOT_INTERVAL == 0) {
snapshotRepository.save(new Snapshot(
aggregate.getId(),
aggregate.getVersion(),
serialize(aggregate),
Instant.now()
));
}
}
}
5-5. Event Sourcing vs CRUD Comparison
| Criteria | CRUD | Event Sourcing |
|---|---|---|
| Storage | Current state only | All state change events |
| Audit trail | Requires separate implementation | Built-in (events are the audit log) |
| Time travel | Not possible | Possible (reconstruct state at any point) |
| Storage volume | Relatively small | Grows with event accumulation |
| Complexity | Low | High (event design, snapshots, etc.) |
| Concurrency | Pessimistic/optimistic locking | Event version-based optimistic control |
| Performance | Easy read optimization | Replay cost (mitigated by snapshots) |
| Schema changes | Migration required | Event upcasting |
| CQRS integration | Optional | Natural combination |
5-6. When NOT to Use Event Sourcing
- Simple CRUD domains (blogs, configuration management)
- Strong consistency is mandatory (ATMs requiring real-time balance checks)
- Unstable event design in early stages (frequent changes mean high upcasting cost)
- Team lacks experience (steep learning curve)
- Regulatory requirements for data deletion (conflicts with GDPR right to be forgotten)
6. Outbox Pattern (The Core of Reliability)
6-1. The Problem: Dual Write
One of the most common mistakes in distributed systems is performing database saves and message publishing as separate operations.
// Dangerous code: Dual write problem
@Transactional
public void createOrder(Order order) {
orderRepository.save(order); // Step 1: DB save
kafkaTemplate.send("orders", event); // Step 2: Message publish
// Step 1 success + Step 2 failure = In DB but event not published
// Step 1 success + Step 2 success + DB commit failure = Event published but not in DB
}
DB transactions and message broker sends are different infrastructure, so they cannot be processed atomically.
6-2. Solution: Outbox Pattern
The Outbox Pattern stores events in an outbox table within the DB transaction, and a separate process delivers them to the message broker.
1. Business logic + outbox INSERT = single DB transaction (atomicity guaranteed)
2. Separate process (CDC/Poller) reads outbox table and publishes to Kafka
3. After successful publish, mark outbox record as completed
-- Outbox table schema
CREATE TABLE outbox (
id UUID PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id UUID NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
status VARCHAR(20) DEFAULT 'PENDING'
);
CREATE INDEX idx_outbox_pending ON outbox (status, created_at)
WHERE status = 'PENDING';
// === Outbox Pattern Implementation ===
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
@Transactional // Guaranteed by single transaction
public UUID createOrder(CreateOrderCommand command) {
// 1. Execute business logic
Order order = Order.create(command);
orderRepository.save(order);
// 2. Save event to outbox in same transaction
OutboxMessage message = OutboxMessage.builder()
.aggregateType("Order")
.aggregateId(order.getId())
.eventType("OrderCreated")
.payload(objectMapper.writeValueAsString(
new OrderCreatedEvent(order.getId(), order.getTotalAmount())
))
.build();
outboxRepository.save(message);
return order.getId();
// When transaction commits, both order and outbox are saved together
}
}
6-3. CDC (Debezium) Approach
Debezium monitors the DB transaction log (WAL) to capture changes and deliver them to Kafka.
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "dbz_password",
"database.dbname": "orderdb",
"database.server.name": "order-service",
"table.include.list": "public.outbox",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "events.order"
}
}
6-4. Polling vs CDC Comparison
| Criteria | Polling | CDC (Debezium) |
|---|---|---|
| Latency | Depends on polling interval (seconds-minutes) | Near real-time (ms) |
| DB load | Periodic queries cause load | Minimal (reads WAL) |
| Implementation complexity | Low (scheduler + query) | High (Debezium infrastructure) |
| Order guarantee | Outbox table order | WAL order preserved |
| Operational overhead | Low | Kafka Connect cluster needed |
| Scalability | Single poller bottleneck | Highly scalable |
7. Idempotency Patterns
In distributed systems, messages are delivered at-least-once. Network failures, broker restarts, consumer redeployments cause duplicate delivery. Idempotency guarantees that processing the same message multiple times produces the same result as processing it once.
7-1. Producer Idempotency (Kafka)
# Kafka Producer configuration
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.properties.acks=all
spring.kafka.producer.properties.max.in.flight.requests.per.connection=5
spring.kafka.producer.properties.retries=2147483647
Kafka's idempotent producer assigns sequence numbers to each message, enabling the broker to detect duplicates.
7-2. Consumer Idempotency
// === Idempotent Consumer Implementation ===
@Component
@RequiredArgsConstructor
public class IdempotentOrderEventHandler {
private final ProcessedEventRepository processedEventRepository;
private final OrderProjectionRepository orderProjectionRepository;
@KafkaListener(topics = "order-events", groupId = "order-projection")
@Transactional
public void handleOrderEvent(
@Payload OrderEvent event,
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Header("eventId") String eventId) {
// 1. Check if event already processed
if (processedEventRepository.existsById(eventId)) {
log.info("Ignoring duplicate event: eventId={}", eventId);
return;
}
// 2. Process business logic
processEvent(event);
// 3. Record as processed (same transaction)
processedEventRepository.save(new ProcessedEvent(
eventId,
event.getClass().getSimpleName(),
Instant.now()
));
}
private void processEvent(OrderEvent event) {
switch (event) {
case OrderCreatedEvent e -> createOrderProjection(e);
case OrderStatusChangedEvent e -> updateOrderStatus(e);
default -> log.warn("Unknown event type");
}
}
}
-- Processed events table
CREATE TABLE processed_events (
event_id VARCHAR(255) PRIMARY KEY,
event_type VARCHAR(255) NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
7-3. Unique Constraint vs Redis SET NX
| Approach | Pros | Cons |
|---|---|---|
| DB Unique Constraint | Atomic with transaction | Increases DB load |
| Redis SET NX | Very fast lookups | Not guaranteed if Redis fails |
| DB + Redis combo | Fast primary filter + reliable guarantee | Complex implementation |
// Redis SET NX for fast duplicate check
@Component
public class RedisIdempotencyGuard {
private final StringRedisTemplate redisTemplate;
private static final Duration TTL = Duration.ofHours(24);
public boolean tryAcquire(String idempotencyKey) {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(
"idempotency:" + idempotencyKey,
"processed",
TTL
);
return Boolean.TRUE.equals(result);
}
}
8. Dead Letter Queue Strategy
When message processing fails, infinite retries can paralyze the system. DLQ (Dead Letter Queue) isolates unprocessable messages into a separate queue to ensure system stability.
8-1. Retry with Exponential Backoff
// === Spring Kafka Retry + DLQ Configuration ===
@Configuration
public class KafkaConsumerConfig {
@Bean
public DefaultErrorHandler errorHandler(
KafkaTemplate<String, Object> kafkaTemplate) {
// Recoverer that sends to DLQ
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(
record.topic() + ".DLQ", record.partition()));
// Exponential Backoff retry configuration
ExponentialBackOff backOff = new ExponentialBackOff();
backOff.setInitialInterval(1000L); // 1 second
backOff.setMultiplier(2.0); // 2x increase
backOff.setMaxInterval(60000L); // max 60 seconds
backOff.setMaxElapsedTime(300000L); // max 5 minutes
DefaultErrorHandler errorHandler =
new DefaultErrorHandler(recoverer, backOff);
// Non-retryable exceptions (business errors)
errorHandler.addNotRetryableExceptions(
InvalidOrderException.class,
DuplicateEventException.class
);
return errorHandler;
}
}
8-2. DLQ Monitoring and Alerting
// === DLQ Monitoring Service ===
@Component
@RequiredArgsConstructor
public class DlqMonitoringService {
private final MeterRegistry meterRegistry;
private final AlertService alertService;
@KafkaListener(topics = "order-events.DLQ", groupId = "dlq-monitor")
public void monitorDlq(
ConsumerRecord<String, Object> record,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String errorMessage,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic) {
// Record metrics
meterRegistry.counter("dlq.messages.received",
"originalTopic", originalTopic,
"errorType", classifyError(errorMessage)
).increment();
// Send alert
alertService.sendAlert(DlqAlert.builder()
.originalTopic(originalTopic)
.messageKey(record.key())
.errorMessage(errorMessage)
.timestamp(Instant.ofEpochMilli(record.timestamp()))
.severity(classifySeverity(errorMessage))
.build()
);
log.error("DLQ message received: topic={}, key={}, error={}",
originalTopic, record.key(), errorMessage);
}
}
8-3. DLQ Reprocessing Pipeline
// === DLQ Message Reprocessing ===
@RestController
@RequestMapping("/api/dlq")
@RequiredArgsConstructor
public class DlqReprocessController {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final DlqMessageRepository dlqRepository;
// Single message reprocessing
@PostMapping("/reprocess/single")
public ResponseEntity<String> reprocessSingle(
@RequestParam String messageId) {
DlqMessage message = dlqRepository.findById(messageId).orElseThrow();
// Republish to original topic
kafkaTemplate.send(
message.getOriginalTopic(),
message.getKey(),
message.getValue()
);
message.setStatus("REPROCESSED");
message.setReprocessedAt(Instant.now());
dlqRepository.save(message);
return ResponseEntity.ok("Reprocessing requested: " + messageId);
}
// Batch reprocessing
@PostMapping("/reprocess/batch")
public ResponseEntity<String> reprocessBatch(
@RequestParam String originalTopic,
@RequestParam(defaultValue = "100") int batchSize) {
List<DlqMessage> messages = dlqRepository
.findByOriginalTopicAndStatus(originalTopic, "PENDING",
PageRequest.of(0, batchSize));
int count = 0;
for (DlqMessage message : messages) {
kafkaTemplate.send(message.getOriginalTopic(),
message.getKey(), message.getValue());
message.setStatus("REPROCESSED");
count++;
}
dlqRepository.saveAll(messages);
return ResponseEntity.ok(count + " messages reprocessing requested");
}
}
9. Exactly-Once Semantics
There are three levels of message delivery guarantees:
| Guarantee Level | Description | Complexity |
|---|---|---|
| At-most-once | Delivered at most once (may be lost) | Low |
| At-least-once | Delivered at least once (may be duplicated) | Medium |
| Exactly-once | Delivered exactly once | High |
9-1. Kafka Exactly-Once Implementation
Kafka supports Exactly-Once Semantics (EOS) since version 0.11.
// === Kafka Exactly-Once Configuration ===
@Configuration
public class KafkaExactlyOnceConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-");
config.put(ProducerConfig.ACKS_CONFIG, "all");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config);
}
}
9-2. End-to-End Exactly-Once
True End-to-End Exactly-Once must guarantee from Producer through Consumer to the final state persistence.
End-to-End Exactly-Once Chain:
Producer (idempotent + transactions)
-> Kafka Broker (transaction log)
-> Consumer (read_committed + manual offset)
-> Database (idempotency key + business logic)
All links in the chain must work together for Exactly-Once guarantee
// === End-to-End Exactly-Once Processing ===
@Component
@RequiredArgsConstructor
public class ExactlyOnceOrderProcessor {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final OrderRepository orderRepository;
private final ProcessedOffsetRepository offsetRepository;
@KafkaListener(topics = "payment-completed")
@Transactional("chainedKafkaTransactionManager")
public void processPaymentCompleted(
ConsumerRecord<String, PaymentCompletedEvent> record) {
String offsetKey = record.topic() + "-" +
record.partition() + "-" + record.offset();
// Check if offset already processed
if (offsetRepository.existsById(offsetKey)) {
return;
}
// Business logic
PaymentCompletedEvent event = record.value();
Order order = orderRepository.findById(event.orderId()).orElseThrow();
order.markAsPaid(event.paymentId());
orderRepository.save(order);
// Publish next step event (same Kafka transaction)
kafkaTemplate.send("order-paid",
new OrderPaidEvent(order.getId(), event.paymentId()));
// Record processed offset (same DB transaction)
offsetRepository.save(new ProcessedOffset(offsetKey, Instant.now()));
}
}
10. Reactive Streams vs Virtual Threads (2025)
10-1. Project Reactor: Backpressure as Core Design
Reactive Streams have non-blocking I/O and backpressure mechanisms as core design principles.
// === Reactor-based Async Processing ===
@Service
public class ReactiveOrderService {
public Flux<OrderEvent> processOrderStream() {
return kafkaReceiver.receive()
.flatMap(record -> {
OrderEvent event = record.value();
return processEvent(event)
.then(record.receiverOffset().commit())
.thenReturn(event);
}, 16) // Concurrency limit (backpressure)
.onErrorResume(e -> {
log.error("Processing failed", e);
return Mono.empty();
})
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
}
private Mono<Void> processEvent(OrderEvent event) {
return Mono.fromCallable(() -> {
// Business logic
return null;
}).subscribeOn(Schedulers.boundedElastic()).then();
}
}
10-2. Java 21 Virtual Threads: Simpler Code
Virtual Threads achieve high concurrency while maintaining traditional synchronous code style.
// === Virtual Threads-based Async Processing ===
@Configuration
public class VirtualThreadConfig {
@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerCustomizer() {
return protocolHandler -> {
protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
};
}
}
@Service
public class VirtualThreadOrderService {
// Synchronous code but running on Virtual Threads
// -> blocking I/O only blocks Virtual Thread, not OS thread
public OrderResult processOrder(UUID orderId) {
// Parallel processing: using StructuredTaskScope
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<PaymentResult> paymentTask = scope.fork(() ->
paymentService.processPayment(orderId)
);
Subtask<InventoryResult> inventoryTask = scope.fork(() ->
inventoryService.checkInventory(orderId)
);
scope.join();
scope.throwIfFailed();
return new OrderResult(
paymentTask.get(),
inventoryTask.get()
);
}
}
}
10-3. Selection Guide: Decision Matrix
| Criteria | Reactive (Reactor/WebFlux) | Virtual Threads |
|---|---|---|
| Code complexity | High (reactive operator learning) | Low (existing sync code) |
| Backpressure control | Built-in (Flux/Mono) | Manual implementation needed |
| Debugging | Difficult (complex stack traces) | Easy (traditional stack traces) |
| Memory efficiency | Excellent | Good (better than OS threads) |
| I/O-intensive workloads | Optimal | Very good |
| CPU-intensive workloads | Not ideal | Not ideal (use platform threads) |
| Legacy code migration | Complete rewrite needed | Configuration change only |
| Ecosystem maturity | High (5+ years) | Growing (2023~) |
Recommendations:
- New project + simple I/O: Virtual Threads
- Streaming data processing: Reactive Streams
- Existing Spring MVC project: Virtual Threads (minimal migration cost)
- Real-time data pipeline: Reactive Streams
11. Production Architecture: Payment System Design
A complete e-commerce payment system architecture combining all patterns.
E-commerce Payment System Architecture
======================================
[Client] -> [API Gateway]
|
v
[Order Service] --- Write DB (PostgreSQL)
| |
| (Outbox Pattern) | (CDC - Debezium)
| v
| [Kafka: order-events]
| / | \
v v v v
[Saga Orchestrator] [Payment] [Inventory] [Notification]
(Temporal) Service Service Service
| | |
| commands | |
v v v
[payment-commands] [payment- [inventory-
events] events]
|
[DLQ + Monitoring]
|
[Alerting (PagerDuty)]
Read Side:
[Kafka: order-events] -> [Order Projection] -> Read DB (MongoDB)
-> [Analytics Service] -> Data Warehouse
-> [Search Service] -> Elasticsearch
Applied Patterns Summary
| Pattern | Applied At | Purpose |
|---|---|---|
| CQRS | Order Service | Separate order read/write |
| Saga (Orchestration) | Temporal Orchestrator | Payment-inventory-shipping transaction mgmt |
| Event Sourcing | Order Aggregate | Order state change history |
| Outbox Pattern | Order Service to Kafka | DB-message atomicity guarantee |
| Idempotency | All Consumers | Prevent duplicate message processing |
| DLQ | All Kafka Consumers | Isolate failed messages |
| Exactly-Once | Payment processing | Prevent double charging |
Quiz
Q1. When the Write Model and Read Model in CQRS use different databases, what is the most commonly used approach to ensure data consistency between the two models?
Answer: Event-based asynchronous synchronization (Eventual Consistency)
When the Write Model changes state, it publishes events. The Read Model's projection handler subscribes to these events and updates the Read DB. There may be temporary inconsistency (ms to seconds) between the two models, but consistency is eventually guaranteed.
If strong consistency is required, reconsider whether CQRS is the right fit.
Q2. In a Choreography Saga, when Service A publishes an event and Service B fails to process it, how does Service A become aware of the failure?
Answer: Service B publishes a failure event (e.g., PaymentFailed), and Service A subscribes to it to become aware. In Choreography, each service must publish both success and failure events, and related services subscribe to respond appropriately.
This is one of the downsides of Choreography. As the number of services grows, event flow tracking becomes increasingly complex, and timeout-based detection may be needed as an additional mechanism.
Q3. When an Aggregate in Event Sourcing has 1 million accumulated events, what is the key technique to solve the performance issue of loading it?
Answer: Snapshot Pattern
Periodically (e.g., every 100 events), save the current state of the Aggregate as a snapshot. When loading, start from the most recent snapshot and only replay events after that point.
Example: Out of 1 million events, if the last snapshot is at event 999,900, only 100 events need to be replayed.
Q4. In the Outbox Pattern, what is the main reason the CDC (Change Data Capture) approach is preferred over the Polling approach?
Answer: CDC reads the DB transaction log (WAL) directly, detecting changes in near real-time (milliseconds). Polling executes queries periodically, causing higher latency (seconds to minutes) and additional DB load from frequent queries.
Additionally, CDC delivers events in WAL order, naturally preserving ordering, and since it reads the transaction log, there is virtually no additional query overhead.
Q5. To achieve Exactly-Once Semantics in Kafka, what configuration is needed at the Producer, Broker, and Consumer levels respectively?
Answer:
- Producer: enable.idempotence=true + transactional.id configuration (idempotent producer + transactions)
- Broker: Transaction log maintenance (enabled by default)
- Consumer: isolation.level=read_committed + manual offset commit + idempotent processing
For End-to-End Exactly-Once, the Consumer must also ensure idempotency when saving results to the DB. Kafka transactions only guarantee Exactly-Once within Kafka; to include external systems (DB), consumer-side idempotency is essential.
References
- Martin Fowler — "Event Sourcing" (martinfowler.com)
- Chris Richardson — "Microservices Patterns" (Manning, 2018)
- Vaughn Vernon — "Implementing Domain-Driven Design" (Addison-Wesley, 2013)
- Greg Young — "CQRS Documents" (cqrs.files.wordpress.com)
- Apache Kafka Documentation — "Exactly-Once Semantics" (kafka.apache.org)
- Debezium Documentation — "Outbox Event Router" (debezium.io)
- Temporal.io Documentation — "Saga Pattern with Temporal" (docs.temporal.io)
- Netflix Tech Blog — "Evolution of the Netflix Data Pipeline" (netflixtechblog.com)
- Uber Engineering Blog — "Building Reliable Reprocessing and Dead Letter Queues" (eng.uber.com)
- Confluent Blog — "Exactly-Once Semantics Are Possible" (confluent.io)
- AWS Architecture Blog — "Saga Orchestration Pattern" (aws.amazon.com)
- Microsoft Azure Docs — "CQRS Pattern" (learn.microsoft.com)
- Spring for Apache Kafka Reference — "Error Handling and DLQ" (docs.spring.io)
- JEP 444 — "Virtual Threads" (openjdk.org)
- Project Reactor Reference — "Backpressure" (projectreactor.io)
- Pat Helland — "Idempotence Is Not a Medical Condition" (ACM Queue, 2012)