Skip to content
Published on

CQRS + Event Sourcing Microservices Architecture: Kafka-Based Implementation and Saga Pattern Practical Guide

Authors
  • Name
    Twitter
CQRS Event Sourcing Kafka

Introduction -- Why CQRS + Event Sourcing

As microservices architecture matures, the requirements for inter-service data consistency and state management have evolved. Traditional CRUD-based architectures store only the current state in a single database, making it impossible to trace back "why this state came to be." Especially in domains like finance, e-commerce, and logistics where audit trails are critical, the complete history of state changes must be preserved.

CQRS (Command Query Responsibility Segregation) separates write and read models, allowing each to be independently optimized. Event Sourcing stores an immutable sequence of state change events instead of the current state, enabling any point-in-time state to be reconstructed through event replay. Combining these two patterns creates an architecture where the write side records events in an append-only fashion and the read side queries projections derived from events, achieving both scalability and traceability.

Apache Kafka naturally serves as an event store thanks to its fundamental characteristic as a distributed commit log. Per-partition ordering guarantees, consumer group-based horizontal scaling, and log compaction for snapshot maintenance provide a suitable foundation for Event Sourcing. This article covers implementing a CQRS + Event Sourcing architecture using Kafka as the event store with Spring Boot (Java/Kotlin), distributed transaction management through the Saga pattern, and production operation best practices and failure recovery procedures.

CQRS Pattern Core Principles

Separating Command and Query

The essence of CQRS is extending Bertrand Meyer's CQS (Command Query Separation) principle to the architecture level. CQS is a method-level principle stating "methods that change state (Commands) should not return values, and methods that return values (Queries) should not change state." CQRS elevates this to the system level, separating write and read paths into completely independent models (and potentially independent data stores).

The Command side (Write Model) validates domain invariants, executes business logic, and generates state change events. It sets consistency boundaries at the Aggregate level and appends events to the event store.

The Query side (Read Model) retrieves data from pre-projected denormalized views. Read schemas can be freely designed to optimize for read performance and can change independently from the write-side domain model.

Benefits of Write Model and Read Model Separation

Separating writes and reads provides the following benefits:

  • Independent Scaling: In systems where read traffic exceeds write traffic by 10x or more, only the read side can be scaled horizontally
  • Model Optimization: The write side uses a normalized domain model to strongly protect invariants, while the read side uses denormalized views to maximize query performance
  • Multiple Projections: From a single event stream, multiple read models can be built simultaneously -- Elasticsearch indices for search, data warehouses for analytics, real-time dashboards, etc.
  • Technology Stack Diversity: The write side can use PostgreSQL while the read side uses MongoDB or Redis, choosing storage suited to each purpose

Event Sourcing Concepts and Event Store

Immutable Event Log

In Event Sourcing, the Source of Truth is not the current state but an immutable sequence of domain events. All state changes are expressed as events, and once recorded, events are never modified or deleted. The current state is reconstructed by replaying all events belonging to that Aggregate in order.

OrderCreated { orderId: "ORD-001", customerId: "CUST-42", items: [...] }
OrderItemAdded { orderId: "ORD-001", productId: "PROD-7", qty: 2 }
PaymentReceived { orderId: "ORD-001", amount: 59000, method: "CARD" }
OrderShipped { orderId: "ORD-001", trackingNumber: "KR1234567890" }

Replaying the event stream above from the beginning accurately restores order ORD-001's current state (shipped, payment completed, etc.). The complete history of state changes is perfectly preserved -- including the context in which OrderItemAdded occurred and when the payment was made.

State Reconstruction

To obtain the current state of an Aggregate, all events belonging to that Aggregate must be applied in order from the beginning. As the number of events grows, replay costs increase, so snapshots are saved at regular intervals to shorten the replay starting point.

[Snapshot at version 50: { status: "PAID", total: 59000, ... }]
[Event 51] OrderItemRemoved { ... }
[Event 52] RefundInitiated { ... }
... (up to current version)

Snapshots are merely a performance optimization; the event log itself remains the source of truth.

Using Kafka as an Event Store

Why Kafka

Apache Kafka is a distributed commit log system with characteristics that align well with Event Sourcing:

  • Append-only log: Messages within a partition are ordered, and once written, messages are never modified
  • Durability: Replication factor configuration prevents data loss
  • Horizontal scaling: Throughput scales linearly by increasing topic partitions
  • Consumer groups: Multiple read models can consume events independently
  • Log Compaction: Retains only the latest message per key, serving as a snapshot mechanism

Topic Design Strategy

In Event Sourcing, Kafka topic design is a core architectural decision. Topics are separated by Aggregate type, and Aggregate IDs are used as message keys to ensure events for the same Aggregate are stored in order within the same partition.

# kafka-topics.yml - Topic design example
topics:
  # Order Aggregate event topic
  - name: order-events
    partitions: 12
    replication-factor: 3
    config:
      retention.ms: -1 # Indefinite retention (event store)
      cleanup.policy: delete # Event log uses delete policy
      min.insync.replicas: 2 # Maintain at least 2 ISR
      max.message.bytes: 1048576 # 1MB message size limit
      message.timestamp.type: CreateTime

  # Order snapshot topic (Log Compaction)
  - name: order-snapshots
    partitions: 12
    replication-factor: 3
    config:
      cleanup.policy: compact # Retain only latest message per key
      min.compaction.lag.ms: 86400000 # Minimum 24 hours before compaction
      delete.retention.ms: 604800000 # Retain delete markers for 7 days

  # Saga state topic
  - name: order-saga-events
    partitions: 12
    replication-factor: 3
    config:
      retention.ms: 2592000000 # 30-day retention
      cleanup.policy: delete

Kafka Producer/Consumer Implementation (Spring Boot + Kotlin)

Here are the basic configuration and producer implementation for a Spring Boot project using Kafka as an event store.

// build.gradle.kts
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.kafka:spring-kafka")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
}

// application.yml
// spring:
//   kafka:
//     bootstrap-servers: kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092
//     producer:
//       acks: all
//       retries: 3
//       key-serializer: org.apache.kafka.common.serialization.StringSerializer
//       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
//       properties:
//         enable.idempotence: true
//         max.in.flight.requests.per.connection: 5
//     consumer:
//       group-id: order-projection
//       auto-offset-reset: earliest
//       enable-auto-commit: false
//       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
//       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
// EventPublisher.kt - Kafka-based event publishing
@Component
class KafkaEventPublisher(
    private val kafkaTemplate: KafkaTemplate<String, DomainEvent>,
    private val objectMapper: ObjectMapper
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    /**
     * Publishes a domain event to a Kafka topic.
     * Uses Aggregate ID as the key to guarantee ordering within a partition.
     */
    suspend fun publish(event: DomainEvent): RecordMetadata {
        val topic = resolveTopicName(event)
        val key = event.aggregateId

        val record = ProducerRecord<String, DomainEvent>(topic, key, event).apply {
            headers().add("eventType", event.eventType.toByteArray())
            headers().add("eventVersion", event.version.toString().toByteArray())
            headers().add("correlationId", event.metadata.correlationId.toByteArray())
            headers().add("timestamp", Instant.now().toString().toByteArray())
        }

        return try {
            val result = kafkaTemplate.send(record).get()
            logger.info(
                "Event published: topic={}, partition={}, offset={}, key={}",
                result.recordMetadata.topic(),
                result.recordMetadata.partition(),
                result.recordMetadata.offset(),
                key
            )
            result.recordMetadata
        } catch (e: Exception) {
            logger.error("Event publishing failed: topic={}, key={}, error={}", topic, key, e.message)
            throw EventPublishException("Event publishing failed: ${e.message}", e)
        }
    }

    private fun resolveTopicName(event: DomainEvent): String {
        return when (event) {
            is OrderEvent -> "order-events"
            is PaymentEvent -> "payment-events"
            is InventoryEvent -> "inventory-events"
            else -> throw IllegalArgumentException("Unknown event type: ${event::class}")
        }
    }
}

Consumer Group-Based Projection Implementation

// OrderProjectionConsumer.kt - Read model projection
@Component
class OrderProjectionConsumer(
    private val orderReadRepository: OrderReadRepository,
    private val acknowledgment: Acknowledgment?  // Manual commit
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    @KafkaListener(
        topics = ["order-events"],
        groupId = "order-read-projection",
        containerFactory = "kafkaListenerContainerFactory"
    )
    fun onOrderEvent(
        record: ConsumerRecord<String, DomainEvent>,
        ack: Acknowledgment
    ) {
        val event = record.value()
        val aggregateId = record.key()

        logger.debug(
            "Event received: type={}, aggregateId={}, offset={}",
            event.eventType, aggregateId, record.offset()
        )

        try {
            when (event) {
                is OrderCreated -> handleOrderCreated(event)
                is OrderItemAdded -> handleOrderItemAdded(event)
                is PaymentReceived -> handlePaymentReceived(event)
                is OrderShipped -> handleOrderShipped(event)
                is OrderCancelled -> handleOrderCancelled(event)
            }
            // Manual offset commit after successful projection
            ack.acknowledge()
        } catch (e: Exception) {
            logger.error(
                "Projection failed: type={}, aggregateId={}, error={}",
                event.eventType, aggregateId, e.message
            )
            // Retry logic or send to DLQ
            throw e
        }
    }

    private fun handleOrderCreated(event: OrderCreated) {
        val orderView = OrderReadModel(
            orderId = event.aggregateId,
            customerId = event.customerId,
            status = OrderStatus.CREATED,
            totalAmount = BigDecimal.ZERO,
            items = mutableListOf(),
            createdAt = event.timestamp,
            updatedAt = event.timestamp
        )
        orderReadRepository.save(orderView)
    }

    private fun handlePaymentReceived(event: PaymentReceived) {
        orderReadRepository.findById(event.aggregateId)?.let { order ->
            order.status = OrderStatus.PAID
            order.paymentMethod = event.method
            order.paidAmount = event.amount
            order.updatedAt = event.timestamp
            orderReadRepository.save(order)
        }
    }

    // Remaining handler implementations omitted
}

Aggregate and Command Handler Implementation

Domain Event Definition (Kotlin)

// DomainEvent.kt - Domain event base class
sealed class DomainEvent(
    open val aggregateId: String,
    open val eventType: String,
    open val version: Int,
    open val timestamp: Instant,
    open val metadata: EventMetadata
)

data class EventMetadata(
    val correlationId: String = UUID.randomUUID().toString(),
    val causationId: String = "",
    val userId: String = ""
)

// OrderEvents.kt - Order domain events
sealed class OrderEvent : DomainEvent()

data class OrderCreated(
    override val aggregateId: String,
    val customerId: String,
    val items: List<OrderItemData>,
    override val version: Int = 1,
    override val timestamp: Instant = Instant.now(),
    override val metadata: EventMetadata = EventMetadata()
) : OrderEvent() {
    override val eventType: String = "OrderCreated"
}

data class PaymentReceived(
    override val aggregateId: String,
    val amount: BigDecimal,
    val method: String,
    val transactionId: String,
    override val version: Int = 1,
    override val timestamp: Instant = Instant.now(),
    override val metadata: EventMetadata = EventMetadata()
) : OrderEvent() {
    override val eventType: String = "PaymentReceived"
}

data class OrderShipped(
    override val aggregateId: String,
    val trackingNumber: String,
    val carrier: String,
    override val version: Int = 1,
    override val timestamp: Instant = Instant.now(),
    override val metadata: EventMetadata = EventMetadata()
) : OrderEvent() {
    override val eventType: String = "OrderShipped"
}

data class OrderCancelled(
    override val aggregateId: String,
    val reason: String,
    val cancelledBy: String,
    override val version: Int = 1,
    override val timestamp: Instant = Instant.now(),
    override val metadata: EventMetadata = EventMetadata()
) : OrderEvent() {
    override val eventType: String = "OrderCancelled"
}

Order Aggregate Implementation

// OrderAggregate.kt - Event sourcing-based Aggregate
class OrderAggregate private constructor() {
    lateinit var orderId: String
        private set
    lateinit var customerId: String
        private set
    var status: OrderStatus = OrderStatus.DRAFT
        private set
    var totalAmount: BigDecimal = BigDecimal.ZERO
        private set
    val items: MutableList<OrderItem> = mutableListOf()
    var version: Long = 0
        private set

    // List of uncommitted events
    private val uncommittedEvents: MutableList<DomainEvent> = mutableListOf()

    companion object {
        /**
         * Restores Aggregate state from an event stream.
         */
        fun rehydrate(events: List<DomainEvent>): OrderAggregate {
            val aggregate = OrderAggregate()
            events.forEach { event -> aggregate.apply(event) }
            return aggregate
        }
    }

    // ──────────── Command Handlers ────────────

    fun createOrder(command: CreateOrderCommand): OrderCreated {
        require(status == OrderStatus.DRAFT) {
            "Order already created: orderId=$orderId"
        }
        val event = OrderCreated(
            aggregateId = command.orderId,
            customerId = command.customerId,
            items = command.items,
            metadata = EventMetadata(
                correlationId = command.correlationId,
                userId = command.userId
            )
        )
        applyAndRecord(event)
        return event
    }

    fun receivePayment(command: ReceivePaymentCommand): PaymentReceived {
        require(status == OrderStatus.CREATED) {
            "Not in a payable state: status=$status"
        }
        require(command.amount >= totalAmount) {
            "Insufficient payment amount: required=$totalAmount, received=${command.amount}"
        }
        val event = PaymentReceived(
            aggregateId = orderId,
            amount = command.amount,
            method = command.method,
            transactionId = command.transactionId
        )
        applyAndRecord(event)
        return event
    }

    fun cancelOrder(command: CancelOrderCommand): OrderCancelled {
        require(status != OrderStatus.SHIPPED && status != OrderStatus.CANCELLED) {
            "Cannot cancel in this state: status=$status"
        }
        val event = OrderCancelled(
            aggregateId = orderId,
            reason = command.reason,
            cancelledBy = command.cancelledBy
        )
        applyAndRecord(event)
        return event
    }

    // ──────────── Event Handlers (State Changes) ────────────

    private fun apply(event: DomainEvent) {
        when (event) {
            is OrderCreated -> {
                orderId = event.aggregateId
                customerId = event.customerId
                status = OrderStatus.CREATED
                totalAmount = event.items.sumOf { it.price * it.quantity.toBigDecimal() }
                items.addAll(event.items.map { OrderItem(it.productId, it.quantity, it.price) })
            }
            is PaymentReceived -> {
                status = OrderStatus.PAID
            }
            is OrderShipped -> {
                status = OrderStatus.SHIPPED
            }
            is OrderCancelled -> {
                status = OrderStatus.CANCELLED
            }
            else -> { /* Ignore unknown events */ }
        }
        version++
    }

    private fun applyAndRecord(event: DomainEvent) {
        apply(event)
        uncommittedEvents.add(event)
    }

    fun getUncommittedEvents(): List<DomainEvent> = uncommittedEvents.toList()

    fun markEventsAsCommitted() {
        uncommittedEvents.clear()
    }
}

enum class OrderStatus {
    DRAFT, CREATED, PAID, SHIPPED, DELIVERED, CANCELLED
}

Command Handler Service

// OrderCommandHandler.java - Command processing service (Java example)
@Service
@Transactional
public class OrderCommandHandler {

    private final KafkaEventPublisher eventPublisher;
    private final OrderEventStore eventStore;
    private final SnapshotStore snapshotStore;

    private static final int SNAPSHOT_INTERVAL = 50;

    public OrderCommandHandler(
            KafkaEventPublisher eventPublisher,
            OrderEventStore eventStore,
            SnapshotStore snapshotStore) {
        this.eventPublisher = eventPublisher;
        this.eventStore = eventStore;
        this.snapshotStore = snapshotStore;
    }

    public String handleCreateOrder(CreateOrderCommand command) {
        // 1. Create a new Aggregate
        OrderAggregate aggregate = OrderAggregate.rehydrate(Collections.emptyList());

        // 2. Execute command (including domain invariant validation)
        OrderCreated event = aggregate.createOrder(command);

        // 3. Publish event to Kafka
        eventPublisher.publish(event);

        // 4. Mark events as committed
        aggregate.markEventsAsCommitted();

        return event.getAggregateId();
    }

    public void handleReceivePayment(ReceivePaymentCommand command) {
        // 1. Restore Aggregate from event store
        OrderAggregate aggregate = loadAggregate(command.getOrderId());

        // 2. Execute command
        PaymentReceived event = aggregate.receivePayment(command);

        // 3. Publish event
        eventPublisher.publish(event);

        // 4. Check snapshot condition
        if (aggregate.getVersion() % SNAPSHOT_INTERVAL == 0) {
            snapshotStore.saveSnapshot(aggregate);
        }

        aggregate.markEventsAsCommitted();
    }

    private OrderAggregate loadAggregate(String aggregateId) {
        // Load from snapshot if available
        Optional<Snapshot> snapshot = snapshotStore.findLatest(aggregateId);
        long fromVersion = snapshot.map(Snapshot::getVersion).orElse(0L);

        // Load only events after the snapshot
        List<DomainEvent> events = eventStore.loadEvents(aggregateId, fromVersion);

        if (snapshot.isPresent()) {
            OrderAggregate aggregate = snapshot.get().toAggregate();
            events.forEach(aggregate::apply);
            return aggregate;
        }

        return OrderAggregate.rehydrate(events);
    }
}

Distributed Transaction Management with the Saga Pattern

In a microservices environment, order processing requires distributed transactions across multiple services -- order service, payment service, inventory service, shipping service, etc. Traditional 2PC (Two-Phase Commit) is unsuitable for microservices due to performance degradation and availability issues. The Saga pattern sequentially executes local transactions in each service and, upon failure, executes compensating transactions in reverse order to maintain consistency.

Orchestration vs Choreography

There are two approaches to Saga implementation.

The Choreography approach works without a central coordinator -- each service publishes events and other services subscribe to events to perform their tasks. Coupling between services is low, but it's difficult to understand the overall Saga flow and debugging in complex scenarios is challenging.

The Orchestration approach uses a central Saga Orchestrator to coordinate the execution and compensation of each step. The entire Saga flow is concentrated in one place, providing high visibility and making complex business logic easier to manage.

ComparisonChoreographyOrchestration
CouplingLow (event-based)Medium (orchestrator dependency)
VisibilityLow (difficult to trace flow)High (centralized control)
Complexity ManagementSuited for simple SagasSuited for complex Sagas
Single Point of FailureNoneOrchestrator can be SPOF
DebuggingRequires event chain tracingEasy tracing via orchestrator logs
Test EaseComplex integration testsOrchestrator unit testing possible
Suited Service Count2-44 or more

Kafka-Based Saga Orchestrator Implementation

// OrderSagaOrchestrator.kt - Order processing Saga
@Component
class OrderSagaOrchestrator(
    private val kafkaTemplate: KafkaTemplate<String, SagaCommand>,
    private val sagaStateStore: SagaStateStore
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    /**
     * Saga start: Begins the Saga upon receiving an order creation event.
     */
    @KafkaListener(topics = ["order-events"], groupId = "order-saga-orchestrator")
    fun onOrderCreated(record: ConsumerRecord<String, DomainEvent>, ack: Acknowledgment) {
        val event = record.value()
        if (event !is OrderCreated) return

        val sagaId = UUID.randomUUID().toString()
        val sagaState = SagaState(
            sagaId = sagaId,
            orderId = event.aggregateId,
            currentStep = SagaStep.RESERVE_INVENTORY,
            status = SagaStatus.STARTED,
            startedAt = Instant.now()
        )
        sagaStateStore.save(sagaState)

        // Step 1: Publish inventory reservation command
        val reserveCommand = ReserveInventoryCommand(
            sagaId = sagaId,
            orderId = event.aggregateId,
            items = event.items
        )
        kafkaTemplate.send("inventory-commands", event.aggregateId, reserveCommand)
        logger.info("Saga started: sagaId={}, orderId={}, step=RESERVE_INVENTORY", sagaId, event.aggregateId)
        ack.acknowledge()
    }

    /**
     * Step 2: Request payment upon successful inventory reservation
     */
    @KafkaListener(topics = ["inventory-events"], groupId = "order-saga-orchestrator")
    fun onInventoryReserved(record: ConsumerRecord<String, DomainEvent>, ack: Acknowledgment) {
        val event = record.value()
        if (event !is InventoryReserved) return

        val sagaState = sagaStateStore.findBySagaId(event.sagaId) ?: return
        sagaState.currentStep = SagaStep.PROCESS_PAYMENT
        sagaStateStore.save(sagaState)

        val paymentCommand = ProcessPaymentCommand(
            sagaId = event.sagaId,
            orderId = sagaState.orderId,
            amount = sagaState.totalAmount
        )
        kafkaTemplate.send("payment-commands", sagaState.orderId, paymentCommand)
        logger.info("Saga progressing: sagaId={}, step=PROCESS_PAYMENT", event.sagaId)
        ack.acknowledge()
    }

    /**
     * Compensating transaction: Cancel inventory reservation upon payment failure
     */
    @KafkaListener(topics = ["payment-events"], groupId = "order-saga-orchestrator")
    fun onPaymentFailed(record: ConsumerRecord<String, DomainEvent>, ack: Acknowledgment) {
        val event = record.value()
        if (event !is PaymentFailed) return

        val sagaState = sagaStateStore.findBySagaId(event.sagaId) ?: return
        sagaState.status = SagaStatus.COMPENSATING
        sagaStateStore.save(sagaState)

        // Compensation: Cancel inventory reservation
        val cancelReservation = CancelInventoryReservationCommand(
            sagaId = event.sagaId,
            orderId = sagaState.orderId
        )
        kafkaTemplate.send("inventory-commands", sagaState.orderId, cancelReservation)

        // Compensation: Cancel order
        val cancelOrder = CancelOrderCommand(
            orderId = sagaState.orderId,
            reason = "Payment failed: ${event.failureReason}",
            cancelledBy = "SAGA_ORCHESTRATOR"
        )
        kafkaTemplate.send("order-commands", sagaState.orderId, cancelOrder)

        logger.warn("Saga compensation executed: sagaId={}, reason={}", event.sagaId, event.failureReason)
        ack.acknowledge()
    }
}

Read Model Projection and Synchronization

Materialized View Pattern

The read side of CQRS consumes events to build Materialized Views. These projections are denormalized data structures optimized for specific query patterns. Multiple projections can be maintained simultaneously from a single event stream, each with its own consumer group and storage.

Examples of projections that can be created from the order event stream:

  • Order Detail View: Stores all order information denormalized in MongoDB. Returns all data in a single query on the order detail screen
  • Customer Order List View: Maintains per-customer order lists sorted by recency using Redis Sorted Sets. Optimized for the My Page order history
  • Search Index: Indexes order data in Elasticsearch. Supports compound condition searches and full-text search
  • Analytics View: Loads order statistics into an OLAP database like ClickHouse. For sales analytics dashboards

Projection Synchronization and Eventual Consistency

In CQRS, read models maintain eventual consistency with the write model. A time gap exists between event publication and projection update. This delay is typically milliseconds to seconds, but can grow larger if consumer lag occurs.

Strategies for handling Eventual Consistency:

  • Read-your-writes guarantee: Include an event version in read requests after writing, and either wait until the read-side projection catches up to that version or directly return write-side data
  • UI Optimistic Update: After sending a command from the client, update the UI without waiting for the server response. Then synchronize the UI with the projection result
  • Polling/WebSocket: Notify the client when projection updates are complete

Architecture Comparison: CQRS vs Traditional CRUD vs Event-Driven

Here is a detailed comparison of each architecture pattern's characteristics. Choose the pattern that fits your project requirements and team capabilities.

ComparisonTraditional CRUDEvent-DrivenCQRS + Event Sourcing
Data StorageStores only current stateCurrent state + partial event logsComplete event history (immutable log)
Read/Write ModelSingle shared modelSingle or separatedFully separated
Audit TrailRequires separate implementationPartial supportNative support (events are the audit log)
ScalingPrimarily verticalHorizontal scaling possibleIndependent horizontal scaling of read/write
State RestorationNot possible (only current state)Partially possibleFull restoration at any point in time
Implementation ComplexityLowMediumHigh
Learning CurveLowMediumHigh
DebuggingIntuitive (DB queries)Requires event tracingState tracking via event replay
Eventual ConsistencyN/A (immediate consistency)Can occurAlways occurs in read models
Suited DomainsSimple CRUD, internal admin systemsInter-service async communicationFinance, e-commerce, logistics, audit-required domains
Storage CostLowMediumHigh (all events preserved)
Transaction ManagementACID (local transactions)Eventually ConsistentSaga pattern required

When to Adopt CQRS + Event Sourcing

CQRS + Event Sourcing is powerful, but not necessary for every system. Consider adoption in these situations:

  • When complete state change history preservation is legally or business-critical (financial transactions, medical records, etc.)
  • When read and write traffic patterns are dramatically different (reads exceed writes by 100x or more)
  • When multiple services need to generate different views based on the same events
  • When time travel queries are needed (querying state at a specific point in time)
  • When temporal queries are a core business feature (price history, inventory change history)

Conversely, for simple CRUD-centric applications, or when the team lacks event sourcing experience and timelines are tight, traditional architecture is a more realistic choice.

Operational Considerations

Handling Eventual Consistency

Eventual Consistency is an inherent characteristic of CQRS systems. Prepare for the following situations:

  • Read model synchronization delay: The read model may not update immediately after a write completes. Consumer group Consumer Lag monitoring is essential
  • Preventing user confusion: If a user doesn't see their order immediately after placing it, customer complaints will arise. Read-your-writes guarantees or UI optimistic updates must be implemented
  • Data inconsistency detection: Periodically run batch jobs to verify consistency between the event store and read models

Idempotency Guarantee

Due to network failures, consumer rebalancing, and Kafka's at-least-once delivery guarantee, duplicate events can be delivered. Projection handlers must be implemented idempotently.

Idempotency guarantee strategies:

  • Event ID-based deduplication: Record processed event IDs in a separate table and ignore duplicate events
  • Upsert pattern: Use INSERT OR UPDATE (UPSERT) when updating read models so that applying the same event multiple times produces the same result
  • Offset-based validation: Record the last processed offset alongside the data store, and skip events with earlier offsets

Event Versioning

Event schemas will inevitably change over time. To change schemas in an immutable event log, existing events must not be modified -- instead, schemas must be evolved in a compatible manner.

  • Backward compatible changes: Adding new optional fields is safe. Previous events without the field use default values
  • Upcaster pattern: Register Upcasters that transform previous event versions to the latest version. Transformations are automatically applied when loading events
  • Adding new event types: When the meaning of an existing event type changes significantly, define a new event type and handle both types in consumers
  • Mandatory version field: Include a version field in all events so consumers can branch processing based on event version

Kafka Operations Considerations

  • retention.ms setting: Event store topics must have retention.ms=-1 (indefinite retention). If the default (7 days) is applied, events will be deleted and state restoration becomes impossible
  • Partition count determination: Partition count determines maximum consumer parallelism. Once increased, partitions cannot be reduced, so decide carefully. In Aggregate ID-based partitioning, changing the partition count can distribute events for the same Aggregate across different partitions, breaking ordering guarantees
  • min.insync.replicas: Must be set to 2 or more in production. Combined with acks=all to prevent data loss
  • Consumer Lag monitoring: Continuously monitor consumer lag using kafka-consumer-groups.sh or tools like Burrow. If lag consistently increases, consumer processing capacity must be expanded
# Consumer Lag monitoring script
#!/bin/bash

BOOTSTRAP_SERVERS="kafka-broker-1:9092,kafka-broker-2:9092"
CONSUMER_GROUPS=("order-read-projection" "order-saga-orchestrator" "search-indexer")
ALERT_THRESHOLD=10000

for GROUP in "${CONSUMER_GROUPS[@]}"; do
    echo "=== Consumer Group: $GROUP ==="

    # Query consumer group status
    kafka-consumer-groups.sh \
        --bootstrap-server "$BOOTSTRAP_SERVERS" \
        --describe \
        --group "$GROUP" 2>/dev/null

    # Calculate total lag
    TOTAL_LAG=$(kafka-consumer-groups.sh \
        --bootstrap-server "$BOOTSTRAP_SERVERS" \
        --describe \
        --group "$GROUP" 2>/dev/null \
        | tail -n +3 \
        | awk '{sum += $6} END {print sum}')

    echo "Total Lag: $TOTAL_LAG"

    # Alert when threshold exceeded
    if [ "$TOTAL_LAG" -gt "$ALERT_THRESHOLD" ]; then
        echo "[ALERT] Consumer group $GROUP lag ($TOTAL_LAG) exceeds threshold ($ALERT_THRESHOLD)"
        # Slack webhook or PagerDuty call
        curl -s -X POST "$SLACK_WEBHOOK_URL" \
            -H 'Content-Type: application/json' \
            -d "{\"text\":\"[ALERT] Kafka Consumer Lag threshold exceeded\\nGroup: $GROUP\\nLag: $TOTAL_LAG\\nThreshold: $ALERT_THRESHOLD\"}"
    fi

    echo ""
done

Failure Cases and Recovery Procedures

Case 1: Read Model Corruption Due to Projection Bug

A projection handler bug causing incorrect read model updates is the most common failure scenario in CQRS systems. The core advantage of Event Sourcing is that read models can be completely rebuilt from the event stream in such cases.

Recovery procedure is as follows:

  1. Stop the problematic projection consumer
  2. Fix the projection handler bug and deploy
  3. Delete the corresponding index/collection in the read model store (MongoDB, Elasticsearch, etc.)
  4. Reset the consumer group offsets to earliest
  5. Restart the fixed projection consumer to replay events from the beginning
# Consumer group offset reset (WARNING: only execute when consumers are stopped)
# --to-earliest: Replay from the very beginning of the topic
kafka-consumer-groups.sh \
    --bootstrap-server kafka-broker-1:9092 \
    --group order-read-projection \
    --topic order-events \
    --reset-offsets \
    --to-earliest \
    --execute

# Verify execution result
kafka-consumer-groups.sh \
    --bootstrap-server kafka-broker-1:9092 \
    --describe \
    --group order-read-projection

If the event count is in the millions or more, a full replay can take considerable time. During this period, the read model will differ from the latest state, so monitor replay progress and inform users that data is being refreshed.

Case 2: Snapshot Corruption

If a corrupted snapshot is saved to the snapshot store, Aggregate loading starts event replay from an incorrect state, causing business logic errors.

Symptoms: Commands for a specific Aggregate ID produce unexpected invariant validation errors. For example, instead of a "Not in a payable state" error for an already-paid order, an "Order already created" error occurs.

Recovery procedure is as follows:

  1. Delete the snapshot for the affected Aggregate
  2. Replay all events for that Aggregate from the event store from the beginning to restore the state
  3. Once normal state is confirmed, generate a new snapshot
  4. Investigate and fix the snapshot creation logic bug

Case 3: Event Order Reversal During Kafka Partition Rebalancing

When consumer group rebalancing occurs, partition ownership changes. In manual commit mode, if offsets for processed events are not committed before rebalancing occurs, the new consumer may receive already-processed events again.

Mitigation strategies:

  • Implement projection handlers idempotently to ensure the final state is identical even with duplicate event processing
  • Implement ConsumerRebalanceListener to immediately commit offsets for in-progress events during rebalancing
  • Set max.poll.interval.ms appropriately to prevent unnecessary rebalancing when event processing takes long

Case 4: Backward Compatibility Violation During Event Schema Changes

Removing required fields or changing types in events will cause deserialization failures for existing events. Since the event log is immutable, existing event schemas cannot be changed.

Prevention principles:

  • Field additions must always be Optional
  • Instead of removing fields, mark them as deprecated and have consumers ignore them
  • Introduce Schema Registry (Confluent Schema Registry) for automatic compatibility validation
  • Set event schema compatibility tests as mandatory gates in CI/CD pipelines

Checklist

Items to review when introducing a CQRS + Event Sourcing + Kafka architecture into production.

Design Phase

  • Are Aggregate boundaries correctly set based on business invariants?
  • Do event names use past-tense verbs that clearly express business intent?
  • Do event schemas include a version field?
  • Have distributed transactions requiring Sagas been identified?
  • Are compensating transactions defined for all steps of each Saga?
  • Have read model query patterns been pre-defined?

Kafka Configuration

  • Is the event store topic's retention.ms set to -1 (indefinite)?
  • Is min.insync.replicas set to 2 or more, and is acks=all configured?
  • Is the producer's enable.idempotence=true configured?
  • Is the partition count sufficient considering future scaling?
  • Is the Aggregate ID used as the message key to guarantee ordering within partitions?

Projections and Consumers

  • Are projection handlers implemented idempotently?
  • Is manual offset commit configured (enable.auto.commit=false)?
  • Is a DLQ (Dead Letter Queue) configured to isolate failed events?
  • Are Consumer Lag monitoring and alerts configured?
  • Is the full replay procedure documented and tested?

Operations

  • Is event schema compatibility validation included in CI/CD?
  • Has the snapshot strategy (interval, storage) been determined?
  • Are user experience measures in place for Eventual Consistency (UI optimistic updates, etc.)?
  • Has disk capacity planning been established for the Kafka cluster (considering indefinite retention)?
  • Has a failure recovery runbook been written?

References