- Published on
비동기 처리 패턴 완전 정복: CQRS, Saga, Event Sourcing — 시니어 백엔드 개발자의 필수 무기
- Authors

- Name
- Youngju Kim
- @fjvbn20031
- 1. 왜 비동기인가? 대규모 시스템의 생존 전략
- 2. Event-Driven Architecture 기초
- 3. CQRS 딥다이브
- 4. Saga 패턴 완전 정복
- 5. Event Sourcing
- 6. Outbox Pattern (신뢰성의 핵심)
- 7. 멱등성 패턴
- 8. Dead Letter Queue 전략
- 9. Exactly-Once Semantics
- 10. Reactive Streams vs Virtual Threads (2025)
- 11. 실전 아키텍처: 결제 시스템 설계
- 퀴즈
- 참고 자료
1. 왜 비동기인가? 대규모 시스템의 생존 전략
현대 대규모 시스템은 동기식 요청-응답 모델로는 감당할 수 없는 규모의 트래픽을 처리합니다.
실제 기업 사례:
| 기업 | 규모 | 비동기 처리 방식 |
|---|---|---|
| Netflix | 2.6억 구독자, 초당 수백만 이벤트 | Apache Kafka 기반 이벤트 스트리밍 |
| Uber | 일일 페타바이트 실시간 데이터 | Apache Kafka + Apache Flink |
| 7조+ 메시지/일 Kafka 처리 | 자체 개발 Kafka 인프라 | |
| Shopify | 블랙프라이데이 초당 80만 요청 | 비동기 주문 처리 파이프라인 |
동기식 아키텍처에서 서비스 A가 서비스 B를 호출하고, B가 C를 호출하면 전체 레이턴시는 각 서비스 호출 시간의 합이 됩니다. 하나의 서비스가 느려지면 전체 체인이 블로킹됩니다.
비동기 아키텍처에서는 메시지 브로커를 통해 서비스 간 결합도를 낮추고, 각 서비스가 독립적으로 확장 가능합니다.
동기식: Client → A(50ms) → B(100ms) → C(200ms) = 350ms 총 대기
비동기식: Client → A(50ms) → Message Broker → (B, C 병렬 처리)
Client는 50ms 후 응답, B와 C는 비동기로 처리
비동기 전환의 핵심 이점:
- 시간적 결합 제거 — 생산자와 소비자가 동시에 가동될 필요 없음
- 독립적 확장 — 읽기와 쓰기 워크로드를 별도로 스케일링
- 장애 격리 — 하나의 서비스 장애가 전체에 전파되지 않음
- 피크 트래픽 흡수 — 메시지 큐가 버퍼 역할
그러나 비동기는 공짜가 아닙니다. Eventual Consistency(최종 일관성), 메시지 순서 보장, 중복 처리, 분산 트랜잭션 같은 새로운 문제가 생깁니다. 이 글에서 다루는 패턴들은 바로 이 문제들에 대한 해답입니다.
2. Event-Driven Architecture 기초
이벤트 주도 아키텍처(EDA)는 비동기 패턴의 토대입니다. 시스템의 상태 변경을 "이벤트"로 표현하고, 관심 있는 서비스가 이를 구독하여 반응합니다.
2-1. Domain Events vs Integration Events
| 구분 | Domain Event | Integration Event |
|---|---|---|
| 범위 | 단일 바운디드 컨텍스트 내부 | 바운디드 컨텍스트 간 |
| 예시 | OrderItemAdded | OrderPlaced |
| 전달 방식 | 인메모리 이벤트 버스 | 메시지 브로커 (Kafka, RabbitMQ) |
| 스키마 관리 | 내부적이므로 자유롭게 변경 가능 | 계약(Contract) 관리 필수 |
// Domain Event - 바운디드 컨텍스트 내부
public record OrderItemAdded(
UUID orderId,
UUID productId,
int quantity,
BigDecimal price,
Instant occurredAt
) implements DomainEvent {}
// Integration Event - 외부 서비스에 발행
public record OrderPlacedEvent(
UUID orderId,
UUID customerId,
BigDecimal totalAmount,
List<OrderLineItem> items,
Instant occurredAt,
int version // 스키마 버전 관리
) implements IntegrationEvent {}
2-2. Event Bus vs Event Store
Event Bus (예: Kafka, RabbitMQ)는 이벤트를 전달하는 파이프라인입니다. 소비자가 이벤트를 수신하면 보통 브로커에서 삭제(또는 TTL 이후 만료)됩니다.
Event Store는 모든 이벤트를 영구적으로 보관하는 저장소입니다. 이벤트를 재생(replay)하여 어떤 시점의 상태든 복원할 수 있습니다.
Event Bus (Kafka):
Producer → [Topic: orders] → Consumer Group A (주문 서비스)
→ Consumer Group B (알림 서비스)
→ Consumer Group C (분석 서비스)
Event Store:
[Event #1: OrderCreated]
[Event #2: ItemAdded]
[Event #3: PaymentReceived]
[Event #4: OrderShipped]
→ 이벤트 #1~#4를 재생하면 현재 주문 상태 복원 가능
2-3. Eventual Consistency 이해
분산 시스템에서 강한 일관성(Strong Consistency)을 유지하려면 분산 잠금, 2PC(2-Phase Commit) 등이 필요하지만 성능과 가용성이 크게 떨어집니다.
CAP 정리에 따라, 네트워크 파티션 상황에서 일관성(C)과 가용성(A) 중 하나만 선택해야 합니다. 대부분의 대규모 시스템은 가용성(A)을 선택하고 Eventual Consistency를 수용합니다.
Eventual Consistency란 "지금 당장은 데이터가 불일치할 수 있지만, 충분한 시간이 지나면 모든 노드가 동일한 상태로 수렴한다"는 것입니다.
실무에서의 의미:
- 주문 후 재고가 즉시 줄어들지 않을 수 있음 (수 밀리초~수 초 지연)
- 사용자에게 "주문이 처리 중입니다" 상태를 먼저 보여주고, 확정 후 업데이트
- 일관성 창(Consistency Window)을 최소화하는 것이 핵심
3. CQRS 딥다이브
CQRS(Command Query Responsibility Segregation)는 읽기와 쓰기 모델을 완전히 분리하는 패턴입니다.
3-1. 기본 개념
전통적인 CRUD 모델에서는 하나의 데이터 모델이 읽기와 쓰기를 모두 담당합니다. 하지만 실제로 읽기와 쓰기는 매우 다른 요구사항을 가집니다.
전통적 CRUD:
Client → [단일 모델] → Database
(읽기/쓰기 모두)
CQRS:
Client ──Command──→ [Write Model] → Command DB (정규화)
↓ Events
Client ←──Query───← [Read Model] ← Read DB (비정규화, 최적화)
Write Model (명령 측):
- 비즈니스 규칙 검증
- 도메인 불변식(Invariant) 보장
- 정규화된 스키마
- 강한 일관성
Read Model (질의 측):
- 조회에 최적화된 비정규화 뷰
- 다양한 저장소 사용 가능 (Elasticsearch, Redis, MongoDB)
- Eventual Consistency 허용
3-2. 언제 CQRS를 사용해야 하는가?
적합한 경우:
- 읽기/쓰기 비율이 극단적으로 다를 때 (읽기가 100:1 이상)
- 복잡한 도메인 로직 + 다양한 조회 요구사항
- 읽기와 쓰기의 스케일링 요구가 다를 때
- Event Sourcing과 함께 사용할 때
부적합한 경우:
- 단순한 CRUD 애플리케이션
- 강한 일관성이 절대적으로 필요한 경우
- 도메인이 단순하고 조회 패턴이 예측 가능한 경우
3-3. Spring Boot + Kafka 구현
// === Command 측: 주문 생성 ===
@Service
@RequiredArgsConstructor
public class OrderCommandService {
private final OrderRepository orderRepository;
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
@Transactional
public UUID createOrder(CreateOrderCommand command) {
// 1. 비즈니스 규칙 검증
validateOrder(command);
// 2. 도메인 모델에서 처리
Order order = Order.create(
command.customerId(),
command.items(),
command.shippingAddress()
);
// 3. Write DB에 저장
orderRepository.save(order);
// 4. 이벤트 발행 (Read Model 동기화용)
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getCustomerId(),
order.getTotalAmount(),
order.getItems().stream()
.map(this::toEventItem)
.toList(),
Instant.now()
);
kafkaTemplate.send("order-events", order.getId().toString(), event);
return order.getId();
}
private void validateOrder(CreateOrderCommand command) {
if (command.items().isEmpty()) {
throw new InvalidOrderException("주문 항목이 비어있습니다");
}
// 재고 확인, 고객 상태 확인 등
}
}
// === Query 측: 주문 조회 프로젝션 ===
@Service
@RequiredArgsConstructor
public class OrderQueryService {
private final OrderReadRepository readRepository;
public OrderDetailView getOrderDetail(UUID orderId) {
return readRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
}
public Page<OrderSummaryView> getCustomerOrders(
UUID customerId, Pageable pageable) {
return readRepository.findByCustomerId(customerId, pageable);
}
}
// Read Model - 조회에 최적화된 비정규화 뷰
@Document(collection = "order_views")
public record OrderDetailView(
UUID orderId,
UUID customerId,
String customerName, // 비정규화: 고객명 포함
String customerEmail,
BigDecimal totalAmount,
String status,
List<OrderItemView> items,
AddressView shippingAddress,
Instant createdAt,
Instant lastUpdatedAt
) {}
// === 이벤트 핸들러: Write → Read 동기화 ===
@Component
@RequiredArgsConstructor
public class OrderProjectionHandler {
private final OrderReadRepository readRepository;
private final CustomerQueryClient customerClient;
@KafkaListener(topics = "order-events", groupId = "order-projection")
public void handleOrderEvent(OrderEvent event) {
switch (event) {
case OrderCreatedEvent e -> handleOrderCreated(e);
case OrderStatusChangedEvent e -> handleStatusChanged(e);
case OrderCancelledEvent e -> handleOrderCancelled(e);
default -> log.warn("알 수 없는 이벤트 타입: {}", event.getClass());
}
}
private void handleOrderCreated(OrderCreatedEvent event) {
// 고객 정보 조회하여 비정규화된 뷰 생성
CustomerInfo customer = customerClient.getCustomer(event.customerId());
OrderDetailView view = new OrderDetailView(
event.orderId(),
event.customerId(),
customer.name(),
customer.email(),
event.totalAmount(),
"CREATED",
event.items().stream().map(this::toItemView).toList(),
null,
event.occurredAt(),
event.occurredAt()
);
readRepository.save(view);
}
}
3-4. Read Model 동기화 전략
| 전략 | 지연 | 일관성 | 복잡도 | 적합한 경우 |
|---|---|---|---|---|
| 이벤트 핸들러 (Kafka) | 수 ms ~ 수 초 | Eventual | 중간 | 대부분의 경우 |
| CDC (Debezium) | 수 초 | Eventual | 높음 | 레거시 시스템 연동 |
| 폴링 (Scheduled) | 수 초 ~ 수 분 | Eventual | 낮음 | 단순한 경우 |
| 동기식 업데이트 | 즉시 | Strong | 낮음 | 성능 요구 낮을 때 |
4. Saga 패턴 완전 정복
마이크로서비스에서 여러 서비스에 걸친 비즈니스 트랜잭션은 어떻게 관리할까요? 기존 모놀리스의 DB 트랜잭션은 사용할 수 없습니다. Saga 패턴은 분산 트랜잭션을 일련의 로컬 트랜잭션으로 분해합니다.
4-1. Choreography (안무형)
Choreography 방식에서는 중앙 조율자가 없습니다. 각 서비스가 이벤트를 발행하면 다음 서비스가 해당 이벤트를 구독하여 자신의 로컬 트랜잭션을 수행합니다.
이커머스 주문 흐름 (Choreography):
Order Service ──OrderPlaced──→ Payment Service
│
PaymentCompleted
│
↓
Inventory Service
│
InventoryReserved
│
↓
Shipping Service
│
ShipmentCreated
│
↓
Order Service (상태 업데이트)
실패 시 (보상 트랜잭션):
Inventory Service ──InventoryReservationFailed──→ Payment Service (환불)
│
PaymentRefunded
│
↓
Order Service (주문 취소)
// === Choreography Saga: 주문 서비스 ===
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
@Transactional
public UUID placeOrder(PlaceOrderCommand command) {
Order order = Order.create(command);
order.setStatus(OrderStatus.PENDING);
orderRepository.save(order);
// 이벤트 발행 - 결제 서비스가 구독
kafkaTemplate.send("order-events", new OrderPlacedEvent(
order.getId(),
order.getCustomerId(),
order.getTotalAmount(),
order.getItems()
));
return order.getId();
}
// 결제 완료 이벤트 수신
@KafkaListener(topics = "payment-events", groupId = "order-service")
public void onPaymentCompleted(PaymentCompletedEvent event) {
Order order = orderRepository.findById(event.orderId()).orElseThrow();
order.setStatus(OrderStatus.PAID);
orderRepository.save(order);
}
// 결제 실패 이벤트 수신
@KafkaListener(topics = "payment-events", groupId = "order-service")
public void onPaymentFailed(PaymentFailedEvent event) {
Order order = orderRepository.findById(event.orderId()).orElseThrow();
order.setStatus(OrderStatus.CANCELLED);
order.setCancellationReason(event.reason());
orderRepository.save(order);
}
}
// === Choreography Saga: 결제 서비스 ===
@Service
@RequiredArgsConstructor
public class PaymentService {
private final PaymentRepository paymentRepository;
private final PaymentGateway paymentGateway;
private final KafkaTemplate<String, Object> kafkaTemplate;
@KafkaListener(topics = "order-events", groupId = "payment-service")
@Transactional
public void onOrderPlaced(OrderPlacedEvent event) {
try {
// 결제 처리
PaymentResult result = paymentGateway.charge(
event.customerId(),
event.totalAmount()
);
Payment payment = Payment.create(
event.orderId(),
event.customerId(),
event.totalAmount(),
result.transactionId()
);
paymentRepository.save(payment);
// 성공 이벤트 발행 - 재고 서비스가 구독
kafkaTemplate.send("payment-events", new PaymentCompletedEvent(
event.orderId(),
payment.getId(),
result.transactionId()
));
} catch (PaymentException e) {
// 실패 이벤트 발행 - 주문 서비스가 구독하여 취소
kafkaTemplate.send("payment-events", new PaymentFailedEvent(
event.orderId(),
e.getMessage()
));
}
}
}
Choreography 장단점:
| 장점 | 단점 |
|---|---|
| 서비스 간 느슨한 결합 | 전체 흐름 파악이 어려움 |
| 각 서비스가 독립적으로 배포 가능 | 디버깅이 복잡함 |
| 단일 장애 지점(SPOF) 없음 | 순환 의존성 발생 가능 |
| 구현이 비교적 단순 | 서비스가 많아지면 이벤트 추적 난이도 상승 |
4-2. Orchestration (지휘형)
Orchestration 방식에서는 중앙 조율자(Orchestrator)가 전체 워크플로우를 관리합니다. 각 서비스에 명령을 보내고, 응답에 따라 다음 단계를 결정합니다.
이커머스 주문 흐름 (Orchestration):
Order Saga Orchestrator
/ | \
↓ ↓ ↓
Payment Service Inventory Shipping
(charge) (reserve) (create)
↓ ↓ ↓
(result) (result) (result)
\ | /
Order Saga Orchestrator
(최종 결정)
// === Orchestration Saga: Temporal 프레임워크 사용 ===
@WorkflowInterface
public interface OrderSagaWorkflow {
@WorkflowMethod
OrderResult processOrder(PlaceOrderCommand command);
}
@WorkflowImpl(workflows = OrderSagaWorkflow.class)
public class OrderSagaWorkflowImpl implements OrderSagaWorkflow {
// 각 서비스의 Activity 참조
private final PaymentActivities paymentActivities =
Workflow.newActivityStub(PaymentActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.setRetryOptions(RetryOptions.newBuilder()
.setMaximumAttempts(3)
.build())
.build());
private final InventoryActivities inventoryActivities =
Workflow.newActivityStub(InventoryActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.build());
private final ShippingActivities shippingActivities =
Workflow.newActivityStub(ShippingActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(60))
.build());
@Override
public OrderResult processOrder(PlaceOrderCommand command) {
String paymentId = null;
String reservationId = null;
try {
// Step 1: 결제
paymentId = paymentActivities.processPayment(
command.customerId(),
command.totalAmount()
);
// Step 2: 재고 예약
reservationId = inventoryActivities.reserveInventory(
command.items()
);
// Step 3: 배송 생성
String shipmentId = shippingActivities.createShipment(
command.orderId(),
command.shippingAddress()
);
return OrderResult.success(command.orderId(), shipmentId);
} catch (Exception e) {
// 보상 트랜잭션 실행
compensate(paymentId, reservationId);
return OrderResult.failed(command.orderId(), e.getMessage());
}
}
private void compensate(String paymentId, String reservationId) {
// 역순으로 보상 트랜잭션 실행
if (reservationId != null) {
inventoryActivities.cancelReservation(reservationId);
}
if (paymentId != null) {
paymentActivities.refundPayment(paymentId);
}
}
}
// === Activity 구현 ===
@ActivityInterface
public interface PaymentActivities {
String processPayment(UUID customerId, BigDecimal amount);
void refundPayment(String paymentId);
}
@Component
@RequiredArgsConstructor
public class PaymentActivitiesImpl implements PaymentActivities {
private final PaymentGateway paymentGateway;
private final PaymentRepository paymentRepository;
@Override
public String processPayment(UUID customerId, BigDecimal amount) {
PaymentResult result = paymentGateway.charge(customerId, amount);
Payment payment = Payment.create(customerId, amount, result.transactionId());
paymentRepository.save(payment);
return payment.getId().toString();
}
@Override
public void refundPayment(String paymentId) {
Payment payment = paymentRepository.findById(UUID.fromString(paymentId))
.orElseThrow();
paymentGateway.refund(payment.getTransactionId());
payment.setStatus(PaymentStatus.REFUNDED);
paymentRepository.save(payment);
}
}
Orchestration 장단점:
| 장점 | 단점 |
|---|---|
| 전체 워크플로우가 한 곳에서 관리됨 | 오케스트레이터가 SPOF 될 수 있음 |
| 디버깅과 모니터링이 용이 | 서비스 간 결합도가 Choreography보다 높음 |
| 복잡한 보상 로직 관리가 쉬움 | 오케스트레이터에 로직이 집중될 위험 |
| Temporal/Cadence 등 프레임워크 지원 | 추가 인프라(워크플로우 엔진) 필요 |
4-3. Choreography vs Orchestration 선택 가이드
| 기준 | Choreography | Orchestration |
|---|---|---|
| 서비스 수 | 3~4개 이하 | 5개 이상 |
| 워크플로우 복잡도 | 선형적, 단순 | 조건 분기, 병렬 처리 |
| 관찰 가능성 요구 | 낮음 | 높음 |
| 팀 구조 | 독립적 팀 | 중앙 플랫폼 팀 |
| 보상 로직 | 단순 | 복잡 (다중 경로) |
4-4. Compensating Transactions (보상 트랜잭션)
보상 트랜잭션은 이미 커밋된 트랜잭션을 의미론적으로 되돌리는 작업입니다. 물리적 롤백이 아니라 역작용(reverse action)을 수행합니다.
// 보상 트랜잭션 예시
public enum CompensationAction {
PAYMENT_CHARGE -> PAYMENT_REFUND,
INVENTORY_RESERVE -> INVENTORY_RELEASE,
SHIPPING_CREATE -> SHIPPING_CANCEL,
COUPON_APPLY -> COUPON_RESTORE,
POINTS_DEDUCT -> POINTS_RESTORE
}
Semantic Compensation vs Exact Compensation:
- Exact Compensation: 정확히 원래 상태로 복원 (예: 결제 취소 → 전액 환불)
- Semantic Compensation: 비즈니스적으로 의미 있는 보상 (예: 항공권 취소 → 부분 환불 + 크레딧)
실무에서는 Semantic Compensation이 더 일반적입니다. 시간이 지나면 정확한 복원이 불가능한 경우가 많기 때문입니다.
5. Event Sourcing
Event Sourcing은 엔티티의 현재 상태를 저장하는 대신, 상태 변경을 일으킨 모든 이벤트를 순서대로 저장하는 패턴입니다.
5-1. 핵심 개념
전통적 CRUD:
Account 테이블: id=1, balance=750
Event Sourcing:
Event #1: AccountCreated(id=1, initialBalance=1000)
Event #2: MoneyWithdrawn(id=1, amount=200)
Event #3: MoneyDeposited(id=1, amount=150)
Event #4: MoneyWithdrawn(id=1, amount=200)
→ 재생: 1000 - 200 + 150 - 200 = 750 (현재 잔고)
모든 상태 변경이 불변(immutable) 이벤트 로그에 추가(append-only)됩니다. 삭제나 수정은 없습니다.
5-2. Event Store 구현
// === Event Store 인터페이스 ===
public interface EventStore {
void saveEvents(UUID aggregateId, List<DomainEvent> events, int expectedVersion);
List<DomainEvent> getEvents(UUID aggregateId);
List<DomainEvent> getEvents(UUID aggregateId, int fromVersion);
}
// === PostgreSQL 기반 Event Store 구현 ===
@Repository
@RequiredArgsConstructor
public class PostgresEventStore implements EventStore {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
@Override
@Transactional
public void saveEvents(UUID aggregateId, List<DomainEvent> events,
int expectedVersion) {
// 낙관적 동시성 제어
Integer currentVersion = jdbcTemplate.queryForObject(
"SELECT COALESCE(MAX(version), 0) FROM event_store WHERE aggregate_id = ?",
Integer.class, aggregateId
);
if (currentVersion != expectedVersion) {
throw new ConcurrencyException(
"동시성 충돌: 예상 버전 " + expectedVersion +
", 실제 버전 " + currentVersion
);
}
int version = expectedVersion;
for (DomainEvent event : events) {
version++;
jdbcTemplate.update(
"""
INSERT INTO event_store
(event_id, aggregate_id, aggregate_type, event_type,
event_data, version, created_at)
VALUES (?, ?, ?, ?, ?::jsonb, ?, ?)
""",
UUID.randomUUID(),
aggregateId,
event.getAggregateType(),
event.getClass().getSimpleName(),
objectMapper.writeValueAsString(event),
version,
Instant.now()
);
}
}
@Override
public List<DomainEvent> getEvents(UUID aggregateId) {
return jdbcTemplate.query(
"SELECT * FROM event_store WHERE aggregate_id = ? ORDER BY version",
this::mapRowToEvent,
aggregateId
);
}
}
-- Event Store 테이블 스키마
CREATE TABLE event_store (
event_id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
version INTEGER NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (aggregate_id, version) -- 낙관적 동시성 제어
);
CREATE INDEX idx_event_store_aggregate ON event_store (aggregate_id, version);
CREATE INDEX idx_event_store_type ON event_store (event_type, created_at);
5-3. Aggregate에서 이벤트 적용
// === Event-Sourced Aggregate ===
public class OrderAggregate {
private UUID id;
private UUID customerId;
private OrderStatus status;
private BigDecimal totalAmount;
private List<OrderItem> items = new ArrayList<>();
private int version = 0;
// 미적용 이벤트 목록
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
// 이벤트 재생으로 상태 복원
public static OrderAggregate fromHistory(List<DomainEvent> history) {
OrderAggregate aggregate = new OrderAggregate();
for (DomainEvent event : history) {
aggregate.apply(event, false);
}
return aggregate;
}
// 새 명령 처리
public void placeOrder(UUID customerId, List<OrderItem> items) {
if (this.status != null) {
throw new IllegalStateException("이미 생성된 주문입니다");
}
BigDecimal total = items.stream()
.map(item -> item.price().multiply(BigDecimal.valueOf(item.quantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
apply(new OrderPlacedEvent(this.id, customerId, total, items, Instant.now()), true);
}
public void cancelOrder(String reason) {
if (this.status == OrderStatus.SHIPPED) {
throw new IllegalStateException("배송된 주문은 취소할 수 없습니다");
}
apply(new OrderCancelledEvent(this.id, reason, Instant.now()), true);
}
// 이벤트 적용 (상태 변경)
private void apply(DomainEvent event, boolean isNew) {
when(event); // 상태 변경
this.version++;
if (isNew) {
uncommittedEvents.add(event);
}
}
// 이벤트별 상태 변경 로직
private void when(DomainEvent event) {
switch (event) {
case OrderPlacedEvent e -> {
this.id = e.orderId();
this.customerId = e.customerId();
this.totalAmount = e.totalAmount();
this.items = new ArrayList<>(e.items());
this.status = OrderStatus.PLACED;
}
case OrderCancelledEvent e -> {
this.status = OrderStatus.CANCELLED;
}
case OrderShippedEvent e -> {
this.status = OrderStatus.SHIPPED;
}
default -> throw new IllegalArgumentException("알 수 없는 이벤트: " + event);
}
}
public List<DomainEvent> getUncommittedEvents() {
return Collections.unmodifiableList(uncommittedEvents);
}
public void clearUncommittedEvents() {
uncommittedEvents.clear();
}
}
5-4. Snapshot 패턴
이벤트가 수만 개 쌓이면 매번 전체를 재생하는 것은 비효율적입니다. Snapshot을 사용하여 특정 시점의 상태를 저장해두면 해당 시점 이후의 이벤트만 재생하면 됩니다.
// Snapshot 저장 및 복원
@Service
public class SnapshotService {
private static final int SNAPSHOT_INTERVAL = 100; // 100개 이벤트마다
public OrderAggregate loadAggregate(UUID aggregateId) {
// 1. 최신 스냅샷 조회
Optional<Snapshot> snapshot = snapshotRepository
.findLatest(aggregateId);
if (snapshot.isPresent()) {
// 2. 스냅샷에서 상태 복원
OrderAggregate aggregate = deserialize(snapshot.get().getData());
int fromVersion = snapshot.get().getVersion();
// 3. 스냅샷 이후 이벤트만 재생
List<DomainEvent> newEvents = eventStore
.getEvents(aggregateId, fromVersion + 1);
for (DomainEvent event : newEvents) {
aggregate.apply(event, false);
}
return aggregate;
} else {
// 스냅샷 없으면 전체 재생
List<DomainEvent> allEvents = eventStore.getEvents(aggregateId);
return OrderAggregate.fromHistory(allEvents);
}
}
public void saveIfNeeded(OrderAggregate aggregate) {
if (aggregate.getVersion() % SNAPSHOT_INTERVAL == 0) {
snapshotRepository.save(new Snapshot(
aggregate.getId(),
aggregate.getVersion(),
serialize(aggregate),
Instant.now()
));
}
}
}
5-5. Event Sourcing vs CRUD 비교
| 기준 | CRUD | Event Sourcing |
|---|---|---|
| 저장 방식 | 현재 상태만 저장 | 모든 상태 변경 이벤트 저장 |
| 감사 추적 | 별도 구현 필요 | 내장 (이벤트 자체가 감사 로그) |
| 시간 여행 | 불가능 | 가능 (특정 시점 상태 복원) |
| 저장소 용량 | 상대적으로 적음 | 이벤트 누적으로 증가 |
| 복잡도 | 낮음 | 높음 (이벤트 설계, 스냅샷 등) |
| 동시성 처리 | 비관적/낙관적 잠금 | 이벤트 버전 기반 낙관적 제어 |
| 성능 | 읽기 최적화 쉬움 | 이벤트 재생 비용 (스냅샷으로 완화) |
| 스키마 변경 | 마이그레이션 필요 | 이벤트 업캐스팅(Upcasting) |
| CQRS 연계 | 선택사항 | 자연스러운 조합 |
5-6. Event Sourcing을 사용하지 말아야 할 때
- 단순한 CRUD 도메인 (블로그, 설정 관리)
- 강한 일관성이 필수인 경우 (실시간 잔고 확인이 필요한 ATM)
- 이벤트 설계가 불안정한 초기 단계 (잦은 변경 시 이벤트 업캐스팅 비용)
- 팀의 경험이 부족한 경우 (학습 곡선이 가파름)
- 규제 요건으로 데이터 삭제가 필요한 경우 (GDPR right to be forgotten과 충돌)
6. Outbox Pattern (신뢰성의 핵심)
6-1. 문제: 이중 쓰기 (Dual Write)
분산 시스템에서 가장 흔한 실수 중 하나는 데이터베이스 저장과 메시지 발행을 별도로 수행하는 것입니다.
// 위험한 코드: 이중 쓰기 문제
@Transactional
public void createOrder(Order order) {
orderRepository.save(order); // Step 1: DB 저장
kafkaTemplate.send("orders", event); // Step 2: 메시지 발행
// Step 1 성공 + Step 2 실패 = DB에는 있지만 이벤트는 발행 안 됨
// Step 1 성공 + Step 2 성공 + DB 커밋 실패 = 이벤트는 발행됐지만 DB에 없음
}
DB 트랜잭션과 메시지 브로커 전송은 서로 다른 인프라이므로 원자적으로 처리할 수 없습니다.
6-2. 해결: Outbox 패턴
Outbox 패턴은 이벤트를 DB 트랜잭션 내에서 outbox 테이블에 저장하고, 별도 프로세스가 이를 메시지 브로커로 전달합니다.
1. 비즈니스 로직 + outbox INSERT = 단일 DB 트랜잭션 (원자성 보장)
2. 별도 프로세스(CDC/Poller)가 outbox 테이블 읽어서 Kafka로 발행
3. 발행 성공 후 outbox 레코드 처리 완료로 표시
-- Outbox 테이블 스키마
CREATE TABLE outbox (
id UUID PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id UUID NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
status VARCHAR(20) DEFAULT 'PENDING'
);
CREATE INDEX idx_outbox_pending ON outbox (status, created_at)
WHERE status = 'PENDING';
// === Outbox 패턴 구현 ===
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
@Transactional // 단일 트랜잭션으로 보장
public UUID createOrder(CreateOrderCommand command) {
// 1. 비즈니스 로직 수행
Order order = Order.create(command);
orderRepository.save(order);
// 2. 같은 트랜잭션에서 outbox에 이벤트 저장
OutboxMessage message = OutboxMessage.builder()
.aggregateType("Order")
.aggregateId(order.getId())
.eventType("OrderCreated")
.payload(objectMapper.writeValueAsString(
new OrderCreatedEvent(order.getId(), order.getTotalAmount())
))
.build();
outboxRepository.save(message);
return order.getId();
// 트랜잭션 커밋 시 order와 outbox 모두 함께 저장됨
}
}
6-3. CDC (Debezium) 방식
Debezium은 DB의 트랜잭션 로그(WAL)를 모니터링하여 변경을 캡처하고 Kafka로 전달합니다.
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "dbz_password",
"database.dbname": "orderdb",
"database.server.name": "order-service",
"table.include.list": "public.outbox",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "events.order"
}
}
6-4. Polling vs CDC 비교
| 기준 | Polling | CDC (Debezium) |
|---|---|---|
| 지연 | 폴링 주기에 의존 (수 초~수 분) | 거의 실시간 (수 ms) |
| DB 부하 | 주기적 쿼리로 부하 발생 | WAL 읽기로 부하 최소 |
| 구현 복잡도 | 낮음 (스케줄러 + 쿼리) | 높음 (Debezium 인프라 필요) |
| 순서 보장 | outbox 테이블 순서 | WAL 순서 그대로 |
| 운영 부담 | 낮음 | Kafka Connect 클러스터 운영 필요 |
| 확장성 | 단일 폴러 병목 가능 | 높은 확장성 |
7. 멱등성 패턴
분산 시스템에서 메시지는 최소 한 번(at-least-once) 전달됩니다. 네트워크 장애, 브로커 재시작, 소비자 재배포 등으로 중복 수신이 발생합니다. 멱등성은 같은 메시지를 여러 번 처리해도 결과가 한 번 처리한 것과 동일하도록 보장합니다.
7-1. Producer 멱등성 (Kafka)
# Kafka Producer 설정
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.properties.acks=all
spring.kafka.producer.properties.max.in.flight.requests.per.connection=5
spring.kafka.producer.properties.retries=2147483647
Kafka의 멱등 프로듀서는 각 메시지에 시퀀스 번호를 부여하여 브로커가 중복을 감지합니다.
7-2. Consumer 멱등성
// === 멱등 소비자 구현 ===
@Component
@RequiredArgsConstructor
public class IdempotentOrderEventHandler {
private final ProcessedEventRepository processedEventRepository;
private final OrderProjectionRepository orderProjectionRepository;
@KafkaListener(topics = "order-events", groupId = "order-projection")
@Transactional
public void handleOrderEvent(
@Payload OrderEvent event,
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Header("eventId") String eventId) {
// 1. 이미 처리된 이벤트인지 확인
if (processedEventRepository.existsById(eventId)) {
log.info("이벤트 중복 수신 무시: eventId={}", eventId);
return;
}
// 2. 비즈니스 로직 처리
processEvent(event);
// 3. 처리 완료 기록 (같은 트랜잭션)
processedEventRepository.save(new ProcessedEvent(
eventId,
event.getClass().getSimpleName(),
Instant.now()
));
}
private void processEvent(OrderEvent event) {
switch (event) {
case OrderCreatedEvent e -> createOrderProjection(e);
case OrderStatusChangedEvent e -> updateOrderStatus(e);
default -> log.warn("알 수 없는 이벤트 타입");
}
}
}
-- 이벤트 처리 이력 테이블
CREATE TABLE processed_events (
event_id VARCHAR(255) PRIMARY KEY,
event_type VARCHAR(255) NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
7-3. Unique Constraint vs Redis SET NX
| 접근법 | 장점 | 단점 |
|---|---|---|
| DB Unique Constraint | 트랜잭션과 함께 원자적 | DB 부하 증가 |
| Redis SET NX | 매우 빠른 조회 | Redis 장애 시 보장 안 됨 |
| DB + Redis 조합 | 빠른 1차 필터 + 확실한 보장 | 구현 복잡 |
// Redis SET NX를 이용한 빠른 중복 체크
@Component
public class RedisIdempotencyGuard {
private final StringRedisTemplate redisTemplate;
private static final Duration TTL = Duration.ofHours(24);
public boolean tryAcquire(String idempotencyKey) {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(
"idempotency:" + idempotencyKey,
"processed",
TTL
);
return Boolean.TRUE.equals(result);
}
}
8. Dead Letter Queue 전략
메시지 처리에 실패하면 무한 재시도는 시스템을 마비시킵니다. DLQ(Dead Letter Queue)는 처리 불가능한 메시지를 별도 큐로 격리하여 시스템의 안정성을 보장합니다.
8-1. Retry with Exponential Backoff
// === Spring Kafka 재시도 + DLQ 설정 ===
@Configuration
public class KafkaConsumerConfig {
@Bean
public DefaultErrorHandler errorHandler(
KafkaTemplate<String, Object> kafkaTemplate) {
// DLQ로 보내는 recoverer
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(
record.topic() + ".DLQ", record.partition()));
// Exponential Backoff 재시도 설정
ExponentialBackOff backOff = new ExponentialBackOff();
backOff.setInitialInterval(1000L); // 1초
backOff.setMultiplier(2.0); // 2배씩 증가
backOff.setMaxInterval(60000L); // 최대 60초
backOff.setMaxElapsedTime(300000L); // 최대 5분
DefaultErrorHandler errorHandler =
new DefaultErrorHandler(recoverer, backOff);
// 재시도하지 않을 예외 (비즈니스 오류)
errorHandler.addNotRetryableExceptions(
InvalidOrderException.class,
DuplicateEventException.class
);
return errorHandler;
}
}
8-2. DLQ 모니터링 및 알림
// === DLQ 모니터링 서비스 ===
@Component
@RequiredArgsConstructor
public class DlqMonitoringService {
private final MeterRegistry meterRegistry;
private final AlertService alertService;
@KafkaListener(topics = "order-events.DLQ", groupId = "dlq-monitor")
public void monitorDlq(
ConsumerRecord<String, Object> record,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String errorMessage,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic) {
// 메트릭 기록
meterRegistry.counter("dlq.messages.received",
"originalTopic", originalTopic,
"errorType", classifyError(errorMessage)
).increment();
// 알림 발송
alertService.sendAlert(DlqAlert.builder()
.originalTopic(originalTopic)
.messageKey(record.key())
.errorMessage(errorMessage)
.timestamp(Instant.ofEpochMilli(record.timestamp()))
.severity(classifySeverity(errorMessage))
.build()
);
log.error("DLQ 메시지 수신: topic={}, key={}, error={}",
originalTopic, record.key(), errorMessage);
}
}
8-3. DLQ 재처리 파이프라인
// === DLQ 메시지 재처리 ===
@RestController
@RequestMapping("/api/dlq")
@RequiredArgsConstructor
public class DlqReprocessController {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final DlqMessageRepository dlqRepository;
// 단건 재처리
@PostMapping("/reprocess/single")
public ResponseEntity<String> reprocessSingle(
@RequestParam String messageId) {
DlqMessage message = dlqRepository.findById(messageId).orElseThrow();
// 원래 토픽으로 재발행
kafkaTemplate.send(
message.getOriginalTopic(),
message.getKey(),
message.getValue()
);
message.setStatus("REPROCESSED");
message.setReprocessedAt(Instant.now());
dlqRepository.save(message);
return ResponseEntity.ok("재처리 요청 완료: " + messageId);
}
// 배치 재처리
@PostMapping("/reprocess/batch")
public ResponseEntity<String> reprocessBatch(
@RequestParam String originalTopic,
@RequestParam(defaultValue = "100") int batchSize) {
List<DlqMessage> messages = dlqRepository
.findByOriginalTopicAndStatus(originalTopic, "PENDING",
PageRequest.of(0, batchSize));
int count = 0;
for (DlqMessage message : messages) {
kafkaTemplate.send(message.getOriginalTopic(),
message.getKey(), message.getValue());
message.setStatus("REPROCESSED");
count++;
}
dlqRepository.saveAll(messages);
return ResponseEntity.ok(count + "건 재처리 요청 완료");
}
}
9. Exactly-Once Semantics
메시지 전달 보장 수준은 세 가지가 있습니다.
| 보장 수준 | 설명 | 복잡도 |
|---|---|---|
| At-most-once | 최대 한 번 전달 (유실 가능) | 낮음 |
| At-least-once | 최소 한 번 전달 (중복 가능) | 중간 |
| Exactly-once | 정확히 한 번 전달 | 높음 |
9-1. Kafka Exactly-Once 구현
Kafka는 0.11부터 Exactly-Once Semantics(EOS)를 지원합니다.
// === Kafka Exactly-Once 설정 ===
@Configuration
public class KafkaExactlyOnceConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-");
config.put(ProducerConfig.ACKS_CONFIG, "all");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config);
}
}
9-2. End-to-End Exactly-Once
진정한 End-to-End Exactly-Once는 Producer부터 Consumer의 최종 상태 저장까지 보장해야 합니다.
End-to-End Exactly-Once 체인:
Producer (idempotent + transactions)
→ Kafka Broker (transaction log)
→ Consumer (read_committed + manual offset)
→ Database (idempotency key + business logic)
모든 체인이 함께 동작해야 Exactly-Once가 보장됨
// === End-to-End Exactly-Once 처리 ===
@Component
@RequiredArgsConstructor
public class ExactlyOnceOrderProcessor {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final OrderRepository orderRepository;
private final ProcessedOffsetRepository offsetRepository;
@KafkaListener(topics = "payment-completed")
@Transactional("chainedKafkaTransactionManager")
public void processPaymentCompleted(
ConsumerRecord<String, PaymentCompletedEvent> record) {
String offsetKey = record.topic() + "-" +
record.partition() + "-" + record.offset();
// 이미 처리된 오프셋인지 확인
if (offsetRepository.existsById(offsetKey)) {
return;
}
// 비즈니스 로직
PaymentCompletedEvent event = record.value();
Order order = orderRepository.findById(event.orderId()).orElseThrow();
order.markAsPaid(event.paymentId());
orderRepository.save(order);
// 다음 단계 이벤트 발행 (같은 Kafka 트랜잭션)
kafkaTemplate.send("order-paid",
new OrderPaidEvent(order.getId(), event.paymentId()));
// 처리된 오프셋 기록 (같은 DB 트랜잭션)
offsetRepository.save(new ProcessedOffset(offsetKey, Instant.now()));
}
}
10. Reactive Streams vs Virtual Threads (2025)
10-1. Project Reactor: 배압(Backpressure)이 핵심
리액티브 스트림은 논블로킹 I/O와 배압(Backpressure) 메커니즘을 핵심 설계 원칙으로 합니다.
// === Reactor 기반 비동기 처리 ===
@Service
public class ReactiveOrderService {
public Flux<OrderEvent> processOrderStream() {
return kafkaReceiver.receive()
.flatMap(record -> {
OrderEvent event = record.value();
return processEvent(event)
.then(record.receiverOffset().commit())
.thenReturn(event);
}, 16) // 동시 처리 수 제한 (배압)
.onErrorResume(e -> {
log.error("처리 실패", e);
return Mono.empty();
})
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
}
private Mono<Void> processEvent(OrderEvent event) {
return Mono.fromCallable(() -> {
// 비즈니스 로직
return null;
}).subscribeOn(Schedulers.boundedElastic()).then();
}
}
10-2. Java 21 Virtual Threads: 더 단순한 코드
Virtual Threads는 기존 동기식 코드 스타일을 유지하면서 높은 동시성을 달성합니다.
// === Virtual Threads 기반 비동기 처리 ===
@Configuration
public class VirtualThreadConfig {
@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerCustomizer() {
return protocolHandler -> {
protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
};
}
}
@Service
public class VirtualThreadOrderService {
// 동기식 코드지만 Virtual Thread 위에서 실행
// → 블로킹 I/O 시 OS 스레드가 아닌 Virtual Thread만 블로킹
public OrderResult processOrder(UUID orderId) {
// 병렬 처리: StructuredTaskScope 활용
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<PaymentResult> paymentTask = scope.fork(() ->
paymentService.processPayment(orderId)
);
Subtask<InventoryResult> inventoryTask = scope.fork(() ->
inventoryService.checkInventory(orderId)
);
scope.join();
scope.throwIfFailed();
return new OrderResult(
paymentTask.get(),
inventoryTask.get()
);
}
}
}
10-3. 선택 가이드: 의사결정 매트릭스
| 기준 | Reactive (Reactor/WebFlux) | Virtual Threads |
|---|---|---|
| 코드 복잡도 | 높음 (리액티브 연산자 학습) | 낮음 (기존 동기 코드) |
| 배압 제어 | 내장 (Flux/Mono) | 수동 구현 필요 |
| 디버깅 | 어려움 (스택 트레이스 복잡) | 쉬움 (전통적 스택 트레이스) |
| 메모리 효율 | 매우 좋음 | 좋음 (OS 스레드보다) |
| I/O 집약 작업 | 최적 | 매우 좋음 |
| CPU 집약 작업 | 부적합 | 부적합 (플랫폼 스레드 사용) |
| 기존 코드 마이그레이션 | 전면 재작성 필요 | 설정 변경만으로 가능 |
| 생태계 성숙도 | 높음 (5년+) | 성장 중 (2023~) |
권장 가이드:
- 새 프로젝트 + 단순 I/O: Virtual Threads
- 스트리밍 데이터 처리: Reactive Streams
- 기존 Spring MVC 프로젝트: Virtual Threads (마이그레이션 비용 최소)
- 실시간 데이터 파이프라인: Reactive Streams
11. 실전 아키텍처: 결제 시스템 설계
모든 패턴을 결합한 이커머스 결제 시스템의 전체 아키텍처를 설계합니다.
이커머스 결제 시스템 전체 아키텍처
================================
[Client] → [API Gateway]
|
↓
[Order Service] ─── Write DB (PostgreSQL)
| |
| (Outbox Pattern) | (CDC - Debezium)
| ↓
| [Kafka: order-events]
| / | \
↓ ↓ ↓ ↓
[Saga Orchestrator] [Payment] [Inventory] [Notification]
(Temporal) Service Service Service
| | |
| commands | |
↓ ↓ ↓
[payment-commands] [payment- [inventory-
events] events]
|
[DLQ + Monitoring]
|
[Alerting (PagerDuty)]
Read Side:
[Kafka: order-events] → [Order Projection] → Read DB (MongoDB)
→ [Analytics Service] → Data Warehouse
→ [Search Service] → Elasticsearch
적용된 패턴 요약
| 패턴 | 적용 위치 | 목적 |
|---|---|---|
| CQRS | Order Service | 주문 쓰기/읽기 분리 |
| Saga (Orchestration) | Temporal Orchestrator | 결제-재고-배송 트랜잭션 관리 |
| Event Sourcing | Order Aggregate | 주문 상태 변경 이력 관리 |
| Outbox Pattern | Order Service → Kafka | DB-메시지 원자성 보장 |
| 멱등성 | 모든 Consumer | 중복 메시지 처리 방지 |
| DLQ | 모든 Kafka Consumer | 처리 실패 메시지 격리 |
| Exactly-Once | Payment 처리 | 이중 결제 방지 |
퀴즈
Q1. CQRS에서 Write Model과 Read Model이 서로 다른 데이터베이스를 사용할 때, 두 모델 간의 데이터 일관성을 보장하기 위해 가장 일반적으로 사용되는 접근 방식은?
정답: 이벤트 기반 비동기 동기화 (Eventual Consistency)
Write Model에서 상태 변경 시 이벤트를 발행하고, Read Model의 프로젝션 핸들러가 이 이벤트를 구독하여 Read DB를 업데이트합니다. 두 모델 사이에는 일시적 불일치(수 ms~수 초)가 존재할 수 있지만, 최종적으로 일관성이 보장됩니다.
강한 일관성이 필요한 경우에는 CQRS의 적합성을 재검토해야 합니다.
Q2. Choreography Saga에서 서비스 A가 이벤트를 발행하고 서비스 B가 처리에 실패했을 때, 서비스 A는 어떻게 이를 인지할 수 있나요?
정답: 서비스 B가 실패 이벤트(예: PaymentFailed)를 발행하면 서비스 A가 이를 구독하여 인지합니다. Choreography에서는 각 서비스가 성공/실패 이벤트를 모두 발행해야 하며, 관련 서비스들이 이를 구독하여 적절히 대응합니다.
이것이 Choreography의 단점 중 하나입니다. 서비스가 많아질수록 이벤트 흐름 추적이 복잡해지며, 타임아웃 기반 감지가 추가로 필요할 수 있습니다.
Q3. Event Sourcing에서 이벤트가 100만 개 쌓인 Aggregate를 로드할 때 성능 문제를 해결하는 핵심 기법은?
정답: Snapshot 패턴
주기적으로(예: 100개 이벤트마다) Aggregate의 현재 상태를 스냅샷으로 저장합니다. 로드 시 가장 최근 스냅샷에서 시작하여 그 이후의 이벤트만 재생하면 됩니다.
예: 100만 개 이벤트 중 마지막 스냅샷이 999,900번째라면 100개의 이벤트만 재생하면 됩니다.
Q4. Outbox Pattern에서 CDC(Change Data Capture) 방식이 Polling 방식보다 선호되는 주요 이유는?
정답: CDC는 DB의 트랜잭션 로그(WAL)를 직접 읽기 때문에 거의 실시간(수 ms)으로 변경을 감지합니다. Polling은 주기적으로 쿼리를 실행하므로 지연이 크고(수 초~수 분), 빈번한 쿼리로 DB에 부하를 줍니다.
또한 CDC는 WAL 순서 그대로 이벤트를 전달하여 순서 보장이 자연스럽고, DB 트랜잭션 로그를 읽으므로 추가적인 쿼리 부하가 거의 없습니다.
Q5. Kafka에서 Exactly-Once Semantics를 달성하려면 Producer, Broker, Consumer 각각에서 어떤 설정이 필요한가요?
정답:
- Producer: enable.idempotence=true + transactional.id 설정 (멱등 프로듀서 + 트랜잭션)
- Broker: 트랜잭션 로그 유지 (기본 활성화)
- Consumer: isolation.level=read_committed + 수동 오프셋 커밋 + 멱등 처리
End-to-End Exactly-Once를 위해서는 Consumer가 DB에 결과를 저장할 때도 멱등성을 보장해야 합니다. Kafka 트랜잭션은 Kafka 내부의 Exactly-Once만 보장하며, 외부 시스템(DB)까지 포함하려면 Consumer 측 멱등성이 필수입니다.
참고 자료
- Martin Fowler — "Event Sourcing" (martinfowler.com)
- Chris Richardson — "Microservices Patterns" (Manning, 2018)
- Vaughn Vernon — "Implementing Domain-Driven Design" (Addison-Wesley, 2013)
- Greg Young — "CQRS Documents" (cqrs.files.wordpress.com)
- Apache Kafka Documentation — "Exactly-Once Semantics" (kafka.apache.org)
- Debezium Documentation — "Outbox Event Router" (debezium.io)
- Temporal.io Documentation — "Saga Pattern with Temporal" (docs.temporal.io)
- Netflix Tech Blog — "Evolution of the Netflix Data Pipeline" (netflixtechblog.com)
- Uber Engineering Blog — "Building Reliable Reprocessing and Dead Letter Queues" (eng.uber.com)
- Confluent Blog — "Exactly-Once Semantics Are Possible" (confluent.io)
- AWS Architecture Blog — "Saga Orchestration Pattern" (aws.amazon.com)
- Microsoft Azure Docs — "CQRS Pattern" (learn.microsoft.com)
- Spring for Apache Kafka Reference — "Error Handling and DLQ" (docs.spring.io)
- JEP 444 — "Virtual Threads" (openjdk.org)
- Project Reactor Reference — "Backpressure" (projectreactor.io)
- Pat Helland — "Idempotence Is Not a Medical Condition" (ACM Queue, 2012)