- Published on
CQRS + Event Sourcing 마이크로서비스 아키텍처: Kafka 기반 구현과 Saga 패턴 실전 가이드
- Authors
- Name
- 들어가며 — 왜 CQRS + Event Sourcing인가
- CQRS 패턴 핵심 원리
- Event Sourcing 개념과 이벤트 스토어
- Kafka를 이벤트 스토어로 활용하기
- Aggregate와 Command Handler 구현
- Saga 패턴으로 분산 트랜잭션 관리
- Read Model 프로젝션과 동기화
- 아키텍처 비교: CQRS vs 전통적 CRUD vs Event-Driven
- 운영 시 주의사항
- 실패 사례와 복구 절차
- 체크리스트
- 참고자료

들어가며 — 왜 CQRS + Event Sourcing인가
마이크로서비스 아키텍처가 성숙해지면서, 서비스 간 데이터 정합성과 상태 관리에 대한 요구 수준이 달라졌다. 전통적인 CRUD 기반 아키텍처는 단일 데이터베이스에 현재 상태만 보관하기 때문에 "왜 이 상태가 되었는지"를 역추적할 수 없다. 특히 금융, 이커머스, 물류처럼 감사 추적(Audit Trail)이 중요한 도메인에서는 상태 변경의 전체 이력을 보존해야 한다.
CQRS(Command Query Responsibility Segregation)는 쓰기 모델과 읽기 모델을 분리하여, 각각의 모델을 독립적으로 최적화할 수 있게 한다. Event Sourcing은 현재 상태 대신 상태 변경 이벤트의 불변 시퀀스를 저장하여, 어떤 시점의 상태든 이벤트 리플레이로 재구성할 수 있게 한다. 이 두 패턴을 결합하면, 쓰기 측은 이벤트를 append-only로 기록하고 읽기 측은 이벤트로부터 프로젝션된 뷰를 조회하는, 확장성과 추적성을 동시에 확보한 아키텍처를 구현할 수 있다.
Apache Kafka는 분산 커밋 로그라는 본질적 특성 덕분에 이벤트 스토어 역할을 자연스럽게 수행한다. 토픽 파티션별 순서 보장, 컨슈머 그룹 기반 수평 확장, log compaction을 통한 스냅샷 유지 등이 Event Sourcing에 적합한 기반을 제공한다. 이 글에서는 Kafka를 이벤트 스토어로 활용하여 CQRS + Event Sourcing 아키텍처를 Spring Boot(Java/Kotlin) 기반으로 구현하고, Saga 패턴을 통한 분산 트랜잭션 관리, 프로덕션 환경에서의 운영 노하우와 장애 복구 절차까지 다룬다.
CQRS 패턴 핵심 원리
Command와 Query의 분리
CQRS의 핵심은 Bertrand Meyer의 CQS(Command Query Separation) 원칙을 아키텍처 레벨로 확장한 것이다. CQS는 메서드 레벨에서 "상태를 변경하는 메서드(Command)는 값을 반환하지 않고, 값을 반환하는 메서드(Query)는 상태를 변경하지 않는다"는 원칙이다. CQRS는 이를 시스템 레벨로 끌어올려, 쓰기 경로와 읽기 경로를 완전히 독립된 모델(심지어 독립된 데이터 저장소)로 분리한다.
**Command 측(Write Model)**은 도메인 불변식(invariant)을 검증하고, 비즈니스 로직을 실행하며, 상태 변경 이벤트를 생성한다. Aggregate 단위로 일관성 경계를 설정하며, 이벤트 스토어에 이벤트를 append한다.
**Query 측(Read Model)**은 사전에 프로젝션된 비정규화 뷰에서 데이터를 조회한다. 읽기 성능에 최적화된 스키마를 자유롭게 설계할 수 있으며, 쓰기 측의 도메인 모델과 독립적으로 변경 가능하다.
Write Model과 Read Model 분리의 이점
쓰기와 읽기를 분리하면 다음과 같은 이점이 생긴다.
- 독립적 스케일링: 읽기 트래픽이 쓰기 트래픽의 10배 이상인 시스템에서, 읽기 측만 수평 확장할 수 있다
- 모델 최적화: 쓰기 측은 정규화된 도메인 모델로 불변식을 강하게 보호하고, 읽기 측은 비정규화된 뷰로 조회 성능을 극대화한다
- 다중 프로젝션: 동일한 이벤트 스트림에서 검색용 Elasticsearch 인덱스, 분석용 데이터 웨어하우스, 실시간 대시보드 등 다양한 읽기 모델을 동시에 구축할 수 있다
- 기술 스택 다양화: 쓰기 측은 PostgreSQL, 읽기 측은 MongoDB나 Redis 등 용도에 맞는 저장소를 선택할 수 있다
Event Sourcing 개념과 이벤트 스토어
불변 이벤트 로그
Event Sourcing에서 시스템의 진실의 원천(Source of Truth)은 현재 상태가 아니라 도메인 이벤트의 불변 시퀀스다. 모든 상태 변경은 이벤트로 표현되며, 한번 기록된 이벤트는 절대 수정하거나 삭제하지 않는다. 현재 상태는 해당 Aggregate에 속한 모든 이벤트를 순서대로 재생(replay)하여 재구성한다.
OrderCreated { orderId: "ORD-001", customerId: "CUST-42", items: [...] }
→ OrderItemAdded { orderId: "ORD-001", productId: "PROD-7", qty: 2 }
→ PaymentReceived { orderId: "ORD-001", amount: 59000, method: "CARD" }
→ OrderShipped { orderId: "ORD-001", trackingNumber: "KR1234567890" }
위 이벤트 스트림을 처음부터 재생하면, 주문 ORD-001의 현재 상태(배송 완료, 결제 완료 등)를 정확하게 복원할 수 있다. 이벤트 중간에 OrderItemAdded가 어떤 맥락에서 발생했는지, 결제가 언제 이루어졌는지 등 상태 변경의 전체 이력을 완벽하게 보존할 수 있다.
상태 재구성(State Reconstruction)
Aggregate의 현재 상태를 구하려면, 해당 Aggregate의 모든 이벤트를 처음부터 순서대로 적용(apply)해야 한다. 이벤트 수가 많아지면 재생 비용이 증가하므로, 일정 간격으로 스냅샷(Snapshot)을 저장하여 재생 시작점을 단축한다.
[Snapshot at version 50: { status: "PAID", total: 59000, ... }]
→ [Event 51] OrderItemRemoved { ... }
→ [Event 52] RefundInitiated { ... }
→ ... (현재 버전까지)
스냅샷은 성능 최적화 수단일 뿐이며, 이벤트 로그 자체가 진실의 원천이라는 점은 변하지 않는다.
Kafka를 이벤트 스토어로 활용하기
왜 Kafka인가
Apache Kafka는 분산 커밋 로그 시스템으로, 다음과 같은 특성이 Event Sourcing과 잘 맞는다.
- Append-only 로그: 파티션 내에서 메시지는 순서가 보장되고, 한번 쓰인 메시지는 변경되지 않는다
- 내구성(Durability): 복제 팩터(replication factor) 설정으로 데이터 유실을 방지한다
- 수평 확장: 토픽 파티션을 늘려 처리량을 선형적으로 확장할 수 있다
- 컨슈머 그룹: 여러 읽기 모델이 독립적으로 이벤트를 소비할 수 있다
- Log Compaction: 키별 최신 메시지만 유지하여 스냅샷 역할을 수행할 수 있다
토픽 설계 전략
Event Sourcing에서 Kafka 토픽 설계는 아키텍처의 핵심 결정이다. Aggregate 타입별로 토픽을 분리하고, Aggregate ID를 메시지 키로 사용하여 동일 Aggregate의 이벤트가 같은 파티션에 순서대로 저장되도록 한다.
# kafka-topics.yml - 토픽 설계 예시
topics:
# 주문 Aggregate 이벤트 토픽
- name: order-events
partitions: 12
replication-factor: 3
config:
retention.ms: -1 # 무기한 보존 (이벤트 스토어)
cleanup.policy: delete # 이벤트 로그는 삭제 정책
min.insync.replicas: 2 # 최소 2개 ISR 유지
max.message.bytes: 1048576 # 1MB 메시지 크기 제한
message.timestamp.type: CreateTime
# 주문 스냅샷 토픽 (Log Compaction)
- name: order-snapshots
partitions: 12
replication-factor: 3
config:
cleanup.policy: compact # 키별 최신 메시지만 유지
min.compaction.lag.ms: 86400000 # 최소 24시간 후 compaction
delete.retention.ms: 604800000 # 삭제 마커 7일 유지
# Saga 상태 토픽
- name: order-saga-events
partitions: 12
replication-factor: 3
config:
retention.ms: 2592000000 # 30일 보존
cleanup.policy: delete
Kafka 프로듀서/컨슈머 구현 (Spring Boot + Kotlin)
Kafka를 이벤트 스토어로 활용하는 Spring Boot 프로젝트의 기본 설정과 프로듀서 구현이다.
// build.gradle.kts
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.kafka:spring-kafka")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
}
// application.yml
// spring:
// kafka:
// bootstrap-servers: kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092
// producer:
// acks: all
// retries: 3
// key-serializer: org.apache.kafka.common.serialization.StringSerializer
// value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
// properties:
// enable.idempotence: true
// max.in.flight.requests.per.connection: 5
// consumer:
// group-id: order-projection
// auto-offset-reset: earliest
// enable-auto-commit: false
// key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
// value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
// EventPublisher.kt - Kafka 기반 이벤트 발행
@Component
class KafkaEventPublisher(
private val kafkaTemplate: KafkaTemplate<String, DomainEvent>,
private val objectMapper: ObjectMapper
) {
private val logger = LoggerFactory.getLogger(javaClass)
/**
* 도메인 이벤트를 Kafka 토픽에 발행한다.
* Aggregate ID를 키로 사용하여 파티션 내 순서를 보장한다.
*/
suspend fun publish(event: DomainEvent): RecordMetadata {
val topic = resolveTopicName(event)
val key = event.aggregateId
val record = ProducerRecord<String, DomainEvent>(topic, key, event).apply {
headers().add("eventType", event.eventType.toByteArray())
headers().add("eventVersion", event.version.toString().toByteArray())
headers().add("correlationId", event.metadata.correlationId.toByteArray())
headers().add("timestamp", Instant.now().toString().toByteArray())
}
return try {
val result = kafkaTemplate.send(record).get()
logger.info(
"이벤트 발행 완료: topic={}, partition={}, offset={}, key={}",
result.recordMetadata.topic(),
result.recordMetadata.partition(),
result.recordMetadata.offset(),
key
)
result.recordMetadata
} catch (e: Exception) {
logger.error("이벤트 발행 실패: topic={}, key={}, error={}", topic, key, e.message)
throw EventPublishException("이벤트 발행에 실패했습니다: ${e.message}", e)
}
}
private fun resolveTopicName(event: DomainEvent): String {
return when (event) {
is OrderEvent -> "order-events"
is PaymentEvent -> "payment-events"
is InventoryEvent -> "inventory-events"
else -> throw IllegalArgumentException("알 수 없는 이벤트 타입: ${event::class}")
}
}
}
컨슈머 그룹 기반 프로젝션 구현
// OrderProjectionConsumer.kt - 읽기 모델 프로젝션
@Component
class OrderProjectionConsumer(
private val orderReadRepository: OrderReadRepository,
private val acknowledgment: Acknowledgment? // 수동 커밋
) {
private val logger = LoggerFactory.getLogger(javaClass)
@KafkaListener(
topics = ["order-events"],
groupId = "order-read-projection",
containerFactory = "kafkaListenerContainerFactory"
)
fun onOrderEvent(
record: ConsumerRecord<String, DomainEvent>,
ack: Acknowledgment
) {
val event = record.value()
val aggregateId = record.key()
logger.debug(
"이벤트 수신: type={}, aggregateId={}, offset={}",
event.eventType, aggregateId, record.offset()
)
try {
when (event) {
is OrderCreated -> handleOrderCreated(event)
is OrderItemAdded -> handleOrderItemAdded(event)
is PaymentReceived -> handlePaymentReceived(event)
is OrderShipped -> handleOrderShipped(event)
is OrderCancelled -> handleOrderCancelled(event)
}
// 프로젝션 성공 후 오프셋 수동 커밋
ack.acknowledge()
} catch (e: Exception) {
logger.error(
"프로젝션 실패: type={}, aggregateId={}, error={}",
event.eventType, aggregateId, e.message
)
// 재시도 로직 또는 DLQ로 전송
throw e
}
}
private fun handleOrderCreated(event: OrderCreated) {
val orderView = OrderReadModel(
orderId = event.aggregateId,
customerId = event.customerId,
status = OrderStatus.CREATED,
totalAmount = BigDecimal.ZERO,
items = mutableListOf(),
createdAt = event.timestamp,
updatedAt = event.timestamp
)
orderReadRepository.save(orderView)
}
private fun handlePaymentReceived(event: PaymentReceived) {
orderReadRepository.findById(event.aggregateId)?.let { order ->
order.status = OrderStatus.PAID
order.paymentMethod = event.method
order.paidAmount = event.amount
order.updatedAt = event.timestamp
orderReadRepository.save(order)
}
}
// 나머지 핸들러 구현 생략
}
Aggregate와 Command Handler 구현
도메인 이벤트 정의 (Kotlin)
// DomainEvent.kt - 도메인 이벤트 기반 클래스
sealed class DomainEvent(
open val aggregateId: String,
open val eventType: String,
open val version: Int,
open val timestamp: Instant,
open val metadata: EventMetadata
)
data class EventMetadata(
val correlationId: String = UUID.randomUUID().toString(),
val causationId: String = "",
val userId: String = ""
)
// OrderEvents.kt - 주문 도메인 이벤트
sealed class OrderEvent : DomainEvent()
data class OrderCreated(
override val aggregateId: String,
val customerId: String,
val items: List<OrderItemData>,
override val version: Int = 1,
override val timestamp: Instant = Instant.now(),
override val metadata: EventMetadata = EventMetadata()
) : OrderEvent() {
override val eventType: String = "OrderCreated"
}
data class PaymentReceived(
override val aggregateId: String,
val amount: BigDecimal,
val method: String,
val transactionId: String,
override val version: Int = 1,
override val timestamp: Instant = Instant.now(),
override val metadata: EventMetadata = EventMetadata()
) : OrderEvent() {
override val eventType: String = "PaymentReceived"
}
data class OrderShipped(
override val aggregateId: String,
val trackingNumber: String,
val carrier: String,
override val version: Int = 1,
override val timestamp: Instant = Instant.now(),
override val metadata: EventMetadata = EventMetadata()
) : OrderEvent() {
override val eventType: String = "OrderShipped"
}
data class OrderCancelled(
override val aggregateId: String,
val reason: String,
val cancelledBy: String,
override val version: Int = 1,
override val timestamp: Instant = Instant.now(),
override val metadata: EventMetadata = EventMetadata()
) : OrderEvent() {
override val eventType: String = "OrderCancelled"
}
Order Aggregate 구현
// OrderAggregate.kt - 이벤트 소싱 기반 Aggregate
class OrderAggregate private constructor() {
lateinit var orderId: String
private set
lateinit var customerId: String
private set
var status: OrderStatus = OrderStatus.DRAFT
private set
var totalAmount: BigDecimal = BigDecimal.ZERO
private set
val items: MutableList<OrderItem> = mutableListOf()
var version: Long = 0
private set
// 미발행 이벤트 목록
private val uncommittedEvents: MutableList<DomainEvent> = mutableListOf()
companion object {
/**
* 이벤트 스트림에서 Aggregate 상태를 복원한다.
*/
fun rehydrate(events: List<DomainEvent>): OrderAggregate {
val aggregate = OrderAggregate()
events.forEach { event -> aggregate.apply(event) }
return aggregate
}
}
// ──────────── Command Handlers ────────────
fun createOrder(command: CreateOrderCommand): OrderCreated {
require(status == OrderStatus.DRAFT) {
"이미 생성된 주문입니다: orderId=$orderId"
}
val event = OrderCreated(
aggregateId = command.orderId,
customerId = command.customerId,
items = command.items,
metadata = EventMetadata(
correlationId = command.correlationId,
userId = command.userId
)
)
applyAndRecord(event)
return event
}
fun receivePayment(command: ReceivePaymentCommand): PaymentReceived {
require(status == OrderStatus.CREATED) {
"결제 가능한 상태가 아닙니다: status=$status"
}
require(command.amount >= totalAmount) {
"결제 금액이 부족합니다: required=$totalAmount, received=${command.amount}"
}
val event = PaymentReceived(
aggregateId = orderId,
amount = command.amount,
method = command.method,
transactionId = command.transactionId
)
applyAndRecord(event)
return event
}
fun cancelOrder(command: CancelOrderCommand): OrderCancelled {
require(status != OrderStatus.SHIPPED && status != OrderStatus.CANCELLED) {
"취소할 수 없는 상태입니다: status=$status"
}
val event = OrderCancelled(
aggregateId = orderId,
reason = command.reason,
cancelledBy = command.cancelledBy
)
applyAndRecord(event)
return event
}
// ──────────── Event Handlers (상태 변경) ────────────
private fun apply(event: DomainEvent) {
when (event) {
is OrderCreated -> {
orderId = event.aggregateId
customerId = event.customerId
status = OrderStatus.CREATED
totalAmount = event.items.sumOf { it.price * it.quantity.toBigDecimal() }
items.addAll(event.items.map { OrderItem(it.productId, it.quantity, it.price) })
}
is PaymentReceived -> {
status = OrderStatus.PAID
}
is OrderShipped -> {
status = OrderStatus.SHIPPED
}
is OrderCancelled -> {
status = OrderStatus.CANCELLED
}
else -> { /* 알 수 없는 이벤트는 무시 */ }
}
version++
}
private fun applyAndRecord(event: DomainEvent) {
apply(event)
uncommittedEvents.add(event)
}
fun getUncommittedEvents(): List<DomainEvent> = uncommittedEvents.toList()
fun markEventsAsCommitted() {
uncommittedEvents.clear()
}
}
enum class OrderStatus {
DRAFT, CREATED, PAID, SHIPPED, DELIVERED, CANCELLED
}
Command Handler 서비스
// OrderCommandHandler.java - 커맨드 처리 서비스 (Java 예제)
@Service
@Transactional
public class OrderCommandHandler {
private final KafkaEventPublisher eventPublisher;
private final OrderEventStore eventStore;
private final SnapshotStore snapshotStore;
private static final int SNAPSHOT_INTERVAL = 50;
public OrderCommandHandler(
KafkaEventPublisher eventPublisher,
OrderEventStore eventStore,
SnapshotStore snapshotStore) {
this.eventPublisher = eventPublisher;
this.eventStore = eventStore;
this.snapshotStore = snapshotStore;
}
public String handleCreateOrder(CreateOrderCommand command) {
// 1. 새로운 Aggregate 생성
OrderAggregate aggregate = OrderAggregate.rehydrate(Collections.emptyList());
// 2. 커맨드 실행 (도메인 불변식 검증 포함)
OrderCreated event = aggregate.createOrder(command);
// 3. 이벤트를 Kafka에 발행
eventPublisher.publish(event);
// 4. 이벤트 커밋 완료 마킹
aggregate.markEventsAsCommitted();
return event.getAggregateId();
}
public void handleReceivePayment(ReceivePaymentCommand command) {
// 1. 이벤트 스토어에서 Aggregate 복원
OrderAggregate aggregate = loadAggregate(command.getOrderId());
// 2. 커맨드 실행
PaymentReceived event = aggregate.receivePayment(command);
// 3. 이벤트 발행
eventPublisher.publish(event);
// 4. 스냅샷 조건 확인
if (aggregate.getVersion() % SNAPSHOT_INTERVAL == 0) {
snapshotStore.saveSnapshot(aggregate);
}
aggregate.markEventsAsCommitted();
}
private OrderAggregate loadAggregate(String aggregateId) {
// 스냅샷이 있으면 스냅샷부터 로드
Optional<Snapshot> snapshot = snapshotStore.findLatest(aggregateId);
long fromVersion = snapshot.map(Snapshot::getVersion).orElse(0L);
// 스냅샷 이후의 이벤트만 로드
List<DomainEvent> events = eventStore.loadEvents(aggregateId, fromVersion);
if (snapshot.isPresent()) {
OrderAggregate aggregate = snapshot.get().toAggregate();
events.forEach(aggregate::apply);
return aggregate;
}
return OrderAggregate.rehydrate(events);
}
}
Saga 패턴으로 분산 트랜잭션 관리
마이크로서비스 환경에서 주문 처리는 주문 서비스, 결제 서비스, 재고 서비스, 배송 서비스 등 여러 서비스에 걸친 분산 트랜잭션을 필요로 한다. 전통적인 2PC(Two-Phase Commit)는 성능 저하와 가용성 문제로 마이크로서비스에 적합하지 않다. Saga 패턴은 각 서비스의 로컬 트랜잭션을 순차적으로 실행하고, 실패 시 보상 트랜잭션(Compensating Transaction)을 역순으로 실행하여 일관성을 유지한다.
Orchestration vs Choreography
Saga 구현 방식에는 두 가지 접근법이 있다.
Choreography(코레오그래피) 방식은 중앙 조정자 없이, 각 서비스가 이벤트를 발행하고 다른 서비스가 이벤트를 구독하여 자신의 작업을 수행한다. 서비스 간 결합도가 낮지만, Saga의 전체 흐름을 파악하기 어렵고 복잡한 시나리오에서 디버깅이 까다롭다.
Orchestration(오케스트레이션) 방식은 중앙의 Saga Orchestrator가 각 단계의 실행과 보상을 조율한다. Saga의 전체 흐름이 한 곳에 집중되어 가시성이 높고, 복잡한 비즈니스 로직을 관리하기 용이하다.
| 비교 항목 | Choreography | Orchestration |
|---|---|---|
| 결합도 | 낮음 (이벤트 기반) | 중간 (오케스트레이터 의존) |
| 가시성 | 낮음 (흐름 추적 어려움) | 높음 (중앙 집중 제어) |
| 복잡도 관리 | 단순 Saga에 적합 | 복잡한 Saga에 적합 |
| 단일 장애점 | 없음 | 오케스트레이터가 SPOF 가능 |
| 디버깅 | 이벤트 체인 추적 필요 | 오케스트레이터 로그로 쉽게 추적 |
| 테스트 용이성 | 통합 테스트 복잡 | 오케스트레이터 단위 테스트 가능 |
| 적합한 서비스 수 | 2-4개 | 4개 이상 |
Kafka 기반 Saga Orchestrator 구현
// OrderSagaOrchestrator.kt - 주문 처리 Saga
@Component
class OrderSagaOrchestrator(
private val kafkaTemplate: KafkaTemplate<String, SagaCommand>,
private val sagaStateStore: SagaStateStore
) {
private val logger = LoggerFactory.getLogger(javaClass)
/**
* Saga 시작: 주문 생성 이벤트를 수신하면 Saga를 시작한다.
*/
@KafkaListener(topics = ["order-events"], groupId = "order-saga-orchestrator")
fun onOrderCreated(record: ConsumerRecord<String, DomainEvent>, ack: Acknowledgment) {
val event = record.value()
if (event !is OrderCreated) return
val sagaId = UUID.randomUUID().toString()
val sagaState = SagaState(
sagaId = sagaId,
orderId = event.aggregateId,
currentStep = SagaStep.RESERVE_INVENTORY,
status = SagaStatus.STARTED,
startedAt = Instant.now()
)
sagaStateStore.save(sagaState)
// Step 1: 재고 예약 커맨드 발행
val reserveCommand = ReserveInventoryCommand(
sagaId = sagaId,
orderId = event.aggregateId,
items = event.items
)
kafkaTemplate.send("inventory-commands", event.aggregateId, reserveCommand)
logger.info("Saga 시작: sagaId={}, orderId={}, step=RESERVE_INVENTORY", sagaId, event.aggregateId)
ack.acknowledge()
}
/**
* Step 2: 재고 예약 성공 시 결제 요청
*/
@KafkaListener(topics = ["inventory-events"], groupId = "order-saga-orchestrator")
fun onInventoryReserved(record: ConsumerRecord<String, DomainEvent>, ack: Acknowledgment) {
val event = record.value()
if (event !is InventoryReserved) return
val sagaState = sagaStateStore.findBySagaId(event.sagaId) ?: return
sagaState.currentStep = SagaStep.PROCESS_PAYMENT
sagaStateStore.save(sagaState)
val paymentCommand = ProcessPaymentCommand(
sagaId = event.sagaId,
orderId = sagaState.orderId,
amount = sagaState.totalAmount
)
kafkaTemplate.send("payment-commands", sagaState.orderId, paymentCommand)
logger.info("Saga 진행: sagaId={}, step=PROCESS_PAYMENT", event.sagaId)
ack.acknowledge()
}
/**
* 보상 트랜잭션: 결제 실패 시 재고 예약 취소
*/
@KafkaListener(topics = ["payment-events"], groupId = "order-saga-orchestrator")
fun onPaymentFailed(record: ConsumerRecord<String, DomainEvent>, ack: Acknowledgment) {
val event = record.value()
if (event !is PaymentFailed) return
val sagaState = sagaStateStore.findBySagaId(event.sagaId) ?: return
sagaState.status = SagaStatus.COMPENSATING
sagaStateStore.save(sagaState)
// 보상: 재고 예약 취소
val cancelReservation = CancelInventoryReservationCommand(
sagaId = event.sagaId,
orderId = sagaState.orderId
)
kafkaTemplate.send("inventory-commands", sagaState.orderId, cancelReservation)
// 보상: 주문 취소
val cancelOrder = CancelOrderCommand(
orderId = sagaState.orderId,
reason = "결제 실패: ${event.failureReason}",
cancelledBy = "SAGA_ORCHESTRATOR"
)
kafkaTemplate.send("order-commands", sagaState.orderId, cancelOrder)
logger.warn("Saga 보상 실행: sagaId={}, reason={}", event.sagaId, event.failureReason)
ack.acknowledge()
}
}
Read Model 프로젝션과 동기화
Materialized View 패턴
CQRS의 읽기 측은 이벤트를 소비하여 Materialized View(구체화 뷰)를 구축한다. 이 프로젝션은 특정 쿼리 패턴에 최적화된 비정규화 데이터 구조다. 하나의 이벤트 스트림에서 여러 프로젝션을 동시에 유지할 수 있으며, 각 프로젝션은 자신만의 컨슈머 그룹과 저장소를 가진다.
주문 이벤트 스트림에서 생성할 수 있는 프로젝션 예시는 다음과 같다.
- 주문 상세 뷰: MongoDB에 주문의 모든 정보를 비정규화하여 저장. 주문 상세 화면에서 단일 조회로 모든 데이터를 반환
- 고객별 주문 목록 뷰: Redis Sorted Set으로 고객별 주문 목록을 최신순으로 유지. 마이페이지 주문 내역에 최적화
- 검색 인덱스: Elasticsearch에 주문 데이터를 인덱싱. 복합 조건 검색과 전문 검색 지원
- 분석 뷰: ClickHouse 같은 OLAP 데이터베이스에 주문 통계를 적재. 매출 분석 대시보드용
프로젝션 동기화와 Eventual Consistency
CQRS에서 읽기 모델은 쓰기 모델과 최종적으로 일관성(Eventually Consistent)을 유지한다. 이벤트가 발행되고 프로젝션이 갱신되기까지 시간 차이가 존재한다. 이 지연은 일반적으로 밀리초에서 초 단위이지만, 컨슈머 랙(Consumer Lag)이 발생하면 더 커질 수 있다.
Eventual Consistency를 다루는 전략은 다음과 같다.
- Read-your-writes 보장: 쓰기 후 읽기 요청에 이벤트 버전을 포함하고, 읽기 측에서 해당 버전 이상의 프로젝션이 완료될 때까지 대기하거나 쓰기 측 데이터를 직접 반환
- UI 낙관적 업데이트(Optimistic Update): 클라이언트에서 커맨드 전송 후 서버 응답을 기다리지 않고 UI를 먼저 갱신. 이후 프로젝션 결과로 UI를 동기화
- Polling/WebSocket: 프로젝션 갱신 완료 시 클라이언트에 알림
아키텍처 비교: CQRS vs 전통적 CRUD vs Event-Driven
각 아키텍처 패턴의 특성을 상세 비교한다. 프로젝트의 요구사항과 팀의 역량에 맞는 패턴을 선택해야 한다.
| 비교 항목 | 전통적 CRUD | Event-Driven (이벤트 기반) | CQRS + Event Sourcing |
|---|---|---|---|
| 데이터 저장 | 현재 상태만 저장 | 현재 상태 + 일부 이벤트 로그 | 전체 이벤트 히스토리 (불변 로그) |
| 읽기/쓰기 모델 | 단일 모델 공유 | 단일 또는 분리 | 완전 분리 |
| 감사 추적(Audit) | 별도 구현 필요 | 부분적 지원 | 네이티브 지원 (이벤트 자체가 감사 로그) |
| 스케일링 | 수직 확장 위주 | 수평 확장 가능 | 읽기/쓰기 독립적 수평 확장 |
| 상태 복원 | 불가 (현재 상태만 존재) | 부분적 가능 | 임의 시점 상태 완전 복원 가능 |
| 구현 복잡도 | 낮음 | 중간 | 높음 |
| 학습 곡선 | 낮음 | 중간 | 높음 |
| 디버깅 | 직관적 (DB 조회) | 이벤트 추적 필요 | 이벤트 리플레이로 상태 추적 |
| Eventual Consistency | 해당 없음 (즉시 일관성) | 발생 가능 | 읽기 모델에서 항상 발생 |
| 적합한 도메인 | 단순 CRUD, 내부 관리 시스템 | 서비스 간 비동기 통신 | 금융, 이커머스, 물류, 감사 필요 도메인 |
| 저장소 비용 | 낮음 | 중간 | 높음 (모든 이벤트 보관) |
| 트랜잭션 관리 | ACID (로컬 트랜잭션) | Eventually Consistent | Saga 패턴 필요 |
언제 CQRS + Event Sourcing을 도입해야 하는가
CQRS + Event Sourcing은 강력하지만, 모든 시스템에 필요한 것은 아니다. 도입을 고려해야 하는 상황은 다음과 같다.
- 상태 변경의 전체 이력 보존이 법적/사업적으로 요구되는 경우 (금융 거래, 의료 기록 등)
- 읽기와 쓰기의 트래픽 패턴이 극적으로 다른 경우 (읽기가 쓰기의 100배 이상)
- 여러 서비스가 동일한 이벤트를 기반으로 각기 다른 뷰를 생성해야 하는 경우
- 시간 여행(Time Travel) 쿼리가 필요한 경우 (특정 시점의 상태 조회)
- Temporal 쿼리가 핵심 비즈니스 기능인 경우 (과거 가격 이력, 재고 변동 이력)
반대로, 단순 CRUD 중심 애플리케이션이나, 팀이 이벤트 소싱 경험이 없고 일정이 촉박한 경우에는 전통적 아키텍처가 더 현실적인 선택이다.
운영 시 주의사항
Eventual Consistency 다루기
CQRS 시스템에서 Eventual Consistency는 본질적인 특성이다. 다음 상황에 대비해야 한다.
- 읽기 모델 동기화 지연: 쓰기 완료 후 읽기 모델이 즉시 갱신되지 않을 수 있다. 컨슈머 그룹의 Consumer Lag 모니터링은 필수다
- 사용자 혼란 방지: 주문 후 즉시 주문 목록을 조회했을 때 방금 주문이 보이지 않으면 사용자 민원이 발생한다. Read-your-writes 보장 또는 UI 낙관적 업데이트를 반드시 구현해야 한다
- 데이터 불일치 감지: 주기적으로 이벤트 스토어와 읽기 모델 간 정합성 검증 배치를 실행해야 한다
멱등성(Idempotency) 보장
네트워크 장애, 컨슈머 리밸런싱, Kafka의 at-least-once 전달 보장 등으로 인해 동일 이벤트가 중복 전달될 수 있다. 프로젝션 핸들러는 반드시 멱등하게 구현해야 한다.
멱등성 보장 전략은 다음과 같다.
- 이벤트 ID 기반 중복 방지: 처리한 이벤트 ID를 별도 테이블에 기록하고, 중복 이벤트 수신 시 무시한다
- Upsert 패턴: 읽기 모델 갱신 시 INSERT OR UPDATE(UPSERT)를 사용하여, 동일 이벤트를 여러 번 적용해도 결과가 같도록 한다
- 오프셋 기반 검증: 컨슈머가 마지막으로 처리한 오프셋을 저장소에 함께 기록하고, 이전 오프셋의 이벤트는 건너뛴다
이벤트 버전 관리(Event Versioning)
이벤트 스키마는 시간이 지나면서 반드시 변경된다. 불변 이벤트 로그에서 스키마를 변경하려면, 기존 이벤트를 수정하는 것이 아니라 호환 가능한 방식으로 진화시켜야 한다.
- 하위 호환(Backward Compatible) 변경: 새 필드를 옵셔널로 추가하는 것은 안전하다. 이전 이벤트에는 해당 필드가 없으므로 기본값을 사용한다
- Upcaster 패턴: 이전 버전의 이벤트를 최신 버전으로 변환하는 Upcaster를 등록한다. 이벤트 로드 시 자동으로 변환이 적용된다
- 새 이벤트 타입 추가: 기존 이벤트 타입의 의미가 크게 달라지면, 새 이벤트 타입을 정의하고 컨슈머에서 두 타입 모두를 처리한다
- 버전 필드 필수: 모든 이벤트에
version필드를 포함하여, 컨슈머가 이벤트 버전에 따라 분기 처리할 수 있도록 한다
Kafka 운영 관련 주의사항
- retention.ms 설정: 이벤트 스토어 토픽은
retention.ms=-1(무기한 보존)로 설정해야 한다. 기본값(7일)이 적용되면 이벤트가 삭제되어 상태 복원이 불가능해진다 - 파티션 수 결정: 파티션 수는 최대 컨슈머 병렬도를 결정한다. 한번 늘린 파티션은 줄일 수 없으므로 신중하게 결정해야 한다. Aggregate ID 기반 파티셔닝에서 파티션 수를 변경하면 동일 Aggregate의 이벤트가 다른 파티션으로 분산될 수 있어, 순서 보장이 깨진다
- min.insync.replicas: 프로덕션에서는 반드시 2 이상으로 설정한다.
acks=all과 함께 사용하여 데이터 유실을 방지한다 - Consumer Lag 모니터링:
kafka-consumer-groups.sh또는 Burrow 등의 도구로 컨슈머 랙을 지속적으로 모니터링한다. 랙이 지속적으로 증가하면 컨슈머 처리 능력을 확장해야 한다
# Consumer Lag 모니터링 스크립트
#!/bin/bash
BOOTSTRAP_SERVERS="kafka-broker-1:9092,kafka-broker-2:9092"
CONSUMER_GROUPS=("order-read-projection" "order-saga-orchestrator" "search-indexer")
ALERT_THRESHOLD=10000
for GROUP in "${CONSUMER_GROUPS[@]}"; do
echo "=== Consumer Group: $GROUP ==="
# 컨슈머 그룹 상태 조회
kafka-consumer-groups.sh \
--bootstrap-server "$BOOTSTRAP_SERVERS" \
--describe \
--group "$GROUP" 2>/dev/null
# 총 Lag 계산
TOTAL_LAG=$(kafka-consumer-groups.sh \
--bootstrap-server "$BOOTSTRAP_SERVERS" \
--describe \
--group "$GROUP" 2>/dev/null \
| tail -n +3 \
| awk '{sum += $6} END {print sum}')
echo "Total Lag: $TOTAL_LAG"
# 임계값 초과 시 알림
if [ "$TOTAL_LAG" -gt "$ALERT_THRESHOLD" ]; then
echo "[ALERT] Consumer group $GROUP lag ($TOTAL_LAG) exceeds threshold ($ALERT_THRESHOLD)"
# Slack 웹훅 또는 PagerDuty 호출
curl -s -X POST "$SLACK_WEBHOOK_URL" \
-H 'Content-Type: application/json' \
-d "{\"text\":\"[ALERT] Kafka Consumer Lag 임계값 초과\\nGroup: $GROUP\\nLag: $TOTAL_LAG\\nThreshold: $ALERT_THRESHOLD\"}"
fi
echo ""
done
실패 사례와 복구 절차
사례 1: 프로젝션 버그로 인한 읽기 모델 오염
프로젝션 핸들러의 버그로 읽기 모델이 잘못 갱신되는 것은 CQRS 시스템에서 가장 흔한 장애 시나리오다. Event Sourcing의 핵심 장점은, 이 경우 이벤트 스트림에서 읽기 모델을 완전히 재구축할 수 있다는 점이다.
복구 절차는 다음과 같다.
- 문제가 있는 프로젝션 컨슈머를 중지한다
- 프로젝션 핸들러의 버그를 수정하고 배포한다
- 읽기 모델 저장소(MongoDB, Elasticsearch 등)의 해당 인덱스/컬렉션을 삭제한다
- 컨슈머 그룹의 오프셋을 earliest로 리셋한다
- 수정된 프로젝션 컨슈머를 재시작하여 이벤트를 처음부터 재생한다
# 컨슈머 그룹 오프셋 리셋 (주의: 컨슈머가 중지된 상태에서만 실행)
# --to-earliest: 토픽의 가장 처음부터 재생
kafka-consumer-groups.sh \
--bootstrap-server kafka-broker-1:9092 \
--group order-read-projection \
--topic order-events \
--reset-offsets \
--to-earliest \
--execute
# 실행 결과 확인
kafka-consumer-groups.sh \
--bootstrap-server kafka-broker-1:9092 \
--describe \
--group order-read-projection
이벤트 수가 수백만 건 이상이면 전체 리플레이에 상당한 시간이 소요된다. 이 기간 동안 읽기 모델은 최신 상태와 차이가 발생하므로, 리플레이 진행 상황을 모니터링하고 사용자에게 데이터 갱신 중임을 안내해야 한다.
사례 2: 스냅샷 손상(Snapshot Corruption)
스냅샷 저장소에 손상된 스냅샷이 저장되면, Aggregate 로딩 시 잘못된 상태에서 이벤트 재생이 시작되어 비즈니스 로직 오류가 발생한다.
증상: 특정 Aggregate ID에 대한 커맨드가 예상과 다른 불변식 검증 오류를 발생시킨다. 예를 들어, 이미 결제된 주문에 대해 "결제 가능한 상태가 아닙니다" 오류 대신 "이미 생성된 주문입니다" 오류가 발생한다.
복구 절차는 다음과 같다.
- 해당 Aggregate의 스냅샷을 삭제한다
- 이벤트 스토어에서 해당 Aggregate의 전체 이벤트를 처음부터 재생하여 상태를 복원한다
- 정상 상태가 확인되면 새 스냅샷을 생성한다
- 스냅샷 생성 로직의 버그를 조사하고 수정한다
사례 3: Kafka 파티션 리밸런싱 중 이벤트 순서 역전
컨슈머 그룹 리밸런싱이 발생하면, 특정 파티션의 소유자가 변경된다. 수동 커밋 모드에서 처리 완료된 이벤트의 오프셋이 커밋되기 전에 리밸런싱이 발생하면, 새 컨슈머가 이미 처리된 이벤트를 다시 수신할 수 있다.
대응 전략은 다음과 같다.
- 프로젝션 핸들러를 멱등하게 구현하여, 중복 이벤트 처리에도 최종 상태가 동일하도록 보장한다
ConsumerRebalanceListener를 구현하여 리밸런싱 시 처리 중인 이벤트의 오프셋을 즉시 커밋한다max.poll.interval.ms를 적절히 설정하여, 이벤트 처리 시간이 긴 경우 불필요한 리밸런싱을 방지한다
사례 4: 이벤트 스키마 변경 시 하위 호환성 위반
이벤트의 필수 필드를 제거하거나 타입을 변경하면, 기존 이벤트의 역직렬화가 실패한다. 이벤트 로그는 불변이므로, 기존 이벤트의 스키마를 바꿀 수 없다.
예방 원칙은 다음과 같다.
- 필드 추가는 항상 옵셔널(Optional)로 한다
- 필드 제거 대신 deprecated 처리하고, 컨슈머에서 무시하도록 한다
- Schema Registry(Confluent Schema Registry)를 도입하여 호환성을 자동 검증한다
- CI/CD 파이프라인에서 이벤트 스키마 호환성 테스트를 필수 게이트로 설정한다
체크리스트
CQRS + Event Sourcing + Kafka 아키텍처를 프로덕션에 도입할 때 점검해야 할 항목이다.
설계 단계
- Aggregate 경계가 비즈니스 불변식 기준으로 올바르게 설정되었는가
- 이벤트 이름이 과거형 동사로 비즈니스 의도를 명확히 표현하는가
- 이벤트 스키마에 버전 필드가 포함되어 있는가
- Saga가 필요한 분산 트랜잭션이 식별되었는가
- 각 Saga의 보상 트랜잭션이 모든 단계에 대해 정의되었는가
- 읽기 모델의 쿼리 패턴이 사전 정의되었는가
Kafka 설정
- 이벤트 스토어 토픽의
retention.ms가-1(무기한)로 설정되었는가 -
min.insync.replicas가 2 이상이고,acks=all이 설정되었는가 - 프로듀서의
enable.idempotence=true가 설정되었는가 - 파티션 수가 향후 확장을 고려하여 충분히 설정되었는가
- Aggregate ID를 메시지 키로 사용하여 파티션 내 순서를 보장하는가
프로젝션 및 컨슈머
- 프로젝션 핸들러가 멱등하게 구현되었는가
- 수동 오프셋 커밋이 설정되었는가 (
enable.auto.commit=false) - DLQ(Dead Letter Queue)가 설정되어 실패한 이벤트를 격리하는가
- Consumer Lag 모니터링과 알림이 구성되었는가
- 전체 리플레이(Full Replay) 절차가 문서화되고 테스트되었는가
운영
- 이벤트 스키마 호환성 검증이 CI/CD에 포함되었는가
- 스냅샷 전략(간격, 저장소)이 결정되었는가
- Eventual Consistency에 대한 사용자 경험 대책이 마련되었는가 (UI 낙관적 업데이트 등)
- Kafka 클러스터의 디스크 용량 계획이 수립되었는가 (무기한 보존 고려)
- 장애 복구 런북(Runbook)이 작성되었는가
참고자료
- Event Sourcing, CQRS, Stream Processing and Apache Kafka: What's the Connection? - Confluent Blog
- Event Sourcing and CQRS - Confluent Developer Course
- Saga Pattern - Microservices.io
- Saga Pattern Demystified: Orchestration vs Choreography - ByteByteGo Blog
- CQRS Pattern - Microsoft Azure Architecture Docs
- Apache Kafka Documentation - Topic Configuration
- Martin Fowler - CQRS