- Published on
Async Processing Patterns Mastery: CQRS, Saga, Event Sourcing — Senior Backend Developer Essential Guide
- Authors

- Name
- Youngju Kim
- @fjvbn20031
- 1. Why Async? The Survival Strategy for Large-Scale Systems
- 2. Event-Driven Architecture Fundamentals
- 3. CQRS Deep Dive
- 4. Saga Pattern Mastery
- 5. Event Sourcing
- 6. Outbox Pattern (The Core of Reliability)
- 7. Idempotency Patterns
- 8. Dead Letter Queue Strategy
- 9. Exactly-Once Semantics
- 10. Reactive Streams vs Virtual Threads (2025)
- 11. Production Architecture: Payment System Design
- Quiz
- References
1. Why Async? The Survival Strategy for Large-Scale Systems
Modern large-scale systems handle traffic volumes that synchronous request-response models simply cannot manage.
Real-World Examples:
| Company | Scale | Async Processing Approach |
|---|---|---|
| Netflix | 260M subscribers, millions of events/sec | Apache Kafka-based event streaming |
| Uber | Petabytes of daily real-time data | Apache Kafka + Apache Flink |
| 7T+ messages/day via Kafka | Self-developed Kafka infrastructure | |
| Shopify | 800K requests/sec on Black Friday | Async order processing pipeline |
In synchronous architecture, when Service A calls Service B, and B calls C, the total latency is the sum of each service call. If any single service slows down, the entire chain blocks.
In asynchronous architecture, a message broker decouples services, enabling each to scale independently.
Synchronous: Client -> A(50ms) -> B(100ms) -> C(200ms) = 350ms total wait
Asynchronous: Client -> A(50ms) -> Message Broker -> (B, C process in parallel)
Client gets response after 50ms, B and C process asynchronously
Key Benefits of Going Async:
- Temporal decoupling — Producers and consumers do not need to be running simultaneously
- Independent scaling — Read and write workloads can be scaled separately
- Fault isolation — A single service failure does not cascade to the entire system
- Peak traffic absorption — Message queues act as buffers
However, async is not free. Eventual Consistency, message ordering, duplicate handling, and distributed transactions are new challenges. The patterns covered in this guide are the answers to these problems.
2. Event-Driven Architecture Fundamentals
Event-Driven Architecture (EDA) is the foundation of async patterns. State changes in the system are expressed as "events," and interested services subscribe and react to them.
2-1. Domain Events vs Integration Events
| Aspect | Domain Event | Integration Event |
|---|---|---|
| Scope | Within a single bounded context | Between bounded contexts |
| Example | OrderItemAdded | OrderPlaced |
| Delivery | In-memory event bus | Message broker (Kafka, RabbitMQ) |
| Schema mgmt | Freely changeable (internal) | Contract management required |
// Domain Event - within bounded context
public record OrderItemAdded(
UUID orderId,
UUID productId,
int quantity,
BigDecimal price,
Instant occurredAt
) implements DomainEvent {}
// Integration Event - published to external services
public record OrderPlacedEvent(
UUID orderId,
UUID customerId,
BigDecimal totalAmount,
List<OrderLineItem> items,
Instant occurredAt,
int version // schema version management
) implements IntegrationEvent {}
2-2. Event Bus vs Event Store
An Event Bus (e.g., Kafka, RabbitMQ) is a pipeline that delivers events. Once consumers receive events, they are typically deleted (or expire after TTL).
An Event Store permanently persists all events. Events can be replayed to reconstruct state at any point in time.
Event Bus (Kafka):
Producer -> [Topic: orders] -> Consumer Group A (Order Service)
-> Consumer Group B (Notification Service)
-> Consumer Group C (Analytics Service)
Event Store:
[Event #1: OrderCreated]
[Event #2: ItemAdded]
[Event #3: PaymentReceived]
[Event #4: OrderShipped]
-> Replay events #1-#4 to reconstruct current order state
2-3. Understanding Eventual Consistency
Maintaining strong consistency in distributed systems requires distributed locks, 2PC (2-Phase Commit), etc., which significantly degrade performance and availability.
According to the CAP theorem, during network partitions you must choose between Consistency (C) and Availability (A). Most large-scale systems choose Availability and accept Eventual Consistency.
Eventual Consistency means "data may be inconsistent right now, but given enough time, all nodes will converge to the same state."
Practical Implications:
- Inventory might not decrease immediately after an order (milliseconds to seconds delay)
- Show users "Order is being processed" first, then update after confirmation
- Minimizing the consistency window is the key concern
3. CQRS Deep Dive
CQRS (Command Query Responsibility Segregation) completely separates the read and write models.
3-1. Core Concept
In traditional CRUD, a single data model handles both reads and writes. In reality, reads and writes have very different requirements.
Traditional CRUD:
Client -> [Single Model] -> Database
(both read/write)
CQRS:
Client --Command--> [Write Model] -> Command DB (normalized)
| Events
Client <--Query---- [Read Model] <- Read DB (denormalized, optimized)
Write Model (Command Side):
- Business rule validation
- Domain invariant enforcement
- Normalized schema
- Strong consistency
Read Model (Query Side):
- Denormalized views optimized for queries
- Multiple storage options (Elasticsearch, Redis, MongoDB)
- Eventual Consistency acceptable
3-2. When to Use CQRS
Good fit:
- Extreme read/write ratio (reads 100:1 or more)
- Complex domain logic + diverse query requirements
- Different scaling needs for reads and writes
- When used together with Event Sourcing
Not a good fit:
- Simple CRUD applications
- When strong consistency is absolutely required
- Simple domain with predictable query patterns
3-3. Implementation with Spring Boot + Kafka
// === Command Side: Order Creation ===
@Service
@RequiredArgsConstructor
public class OrderCommandService {
private final OrderRepository orderRepository;
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
@Transactional
public UUID createOrder(CreateOrderCommand command) {
// 1. Business rule validation
validateOrder(command);
// 2. Process in domain model
Order order = Order.create(
command.customerId(),
command.items(),
command.shippingAddress()
);
// 3. Save to Write DB
orderRepository.save(order);
// 4. Publish event (for Read Model sync)
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getCustomerId(),
order.getTotalAmount(),
order.getItems().stream()
.map(this::toEventItem)
.toList(),
Instant.now()
);
kafkaTemplate.send("order-events", order.getId().toString(), event);
return order.getId();
}
private void validateOrder(CreateOrderCommand command) {
if (command.items().isEmpty()) {
throw new InvalidOrderException("Order items are empty");
}
// Check inventory, customer status, etc.
}
}
// === Query Side: Order Query Projection ===
@Service
@RequiredArgsConstructor
public class OrderQueryService {
private final OrderReadRepository readRepository;
public OrderDetailView getOrderDetail(UUID orderId) {
return readRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
}
public Page<OrderSummaryView> getCustomerOrders(
UUID customerId, Pageable pageable) {
return readRepository.findByCustomerId(customerId, pageable);
}
}
// Read Model - denormalized view optimized for queries
@Document(collection = "order_views")
public record OrderDetailView(
UUID orderId,
UUID customerId,
String customerName, // Denormalized: includes customer name
String customerEmail,
BigDecimal totalAmount,
String status,
List<OrderItemView> items,
AddressView shippingAddress,
Instant createdAt,
Instant lastUpdatedAt
) {}
// === Event Handler: Write -> Read Sync ===
@Component
@RequiredArgsConstructor
public class OrderProjectionHandler {
private final OrderReadRepository readRepository;
private final CustomerQueryClient customerClient;
@KafkaListener(topics = "order-events", groupId = "order-projection")
public void handleOrderEvent(OrderEvent event) {
switch (event) {
case OrderCreatedEvent e -> handleOrderCreated(e);
case OrderStatusChangedEvent e -> handleStatusChanged(e);
case OrderCancelledEvent e -> handleOrderCancelled(e);
default -> log.warn("Unknown event type: {}", event.getClass());
}
}
private void handleOrderCreated(OrderCreatedEvent event) {
// Fetch customer info to create denormalized view
CustomerInfo customer = customerClient.getCustomer(event.customerId());
OrderDetailView view = new OrderDetailView(
event.orderId(),
event.customerId(),
customer.name(),
customer.email(),
event.totalAmount(),
"CREATED",
event.items().stream().map(this::toItemView).toList(),
null,
event.occurredAt(),
event.occurredAt()
);
readRepository.save(view);
}
}
3-4. Read Model Synchronization Strategies
| Strategy | Latency | Consistency | Complexity | Best For |
|---|---|---|---|---|
| Event Handler (Kafka) | ms to seconds | Eventual | Medium | Most cases |
| CDC (Debezium) | Seconds | Eventual | High | Legacy system integration |
| Polling (Scheduled) | Seconds to minutes | Eventual | Low | Simple cases |
| Synchronous update | Immediate | Strong | Low | Low performance requirements |
4. Saga Pattern Mastery
How do you manage business transactions spanning multiple services in microservices? Traditional DB transactions from monoliths are not available. The Saga pattern decomposes distributed transactions into a series of local transactions.
4-1. Choreography
In the Choreography approach, there is no central coordinator. Each service publishes events, and the next service subscribes to those events to perform its local transaction.
E-commerce Order Flow (Choreography):
Order Service --OrderPlaced--> Payment Service
|
PaymentCompleted
|
v
Inventory Service
|
InventoryReserved
|
v
Shipping Service
|
ShipmentCreated
|
v
Order Service (status update)
On Failure (Compensating Transactions):
Inventory Service --InventoryReservationFailed--> Payment Service (refund)
|
PaymentRefunded
|
v
Order Service (cancel order)
// === Choreography Saga: Order Service ===
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
@Transactional
public UUID placeOrder(PlaceOrderCommand command) {
Order order = Order.create(command);
order.setStatus(OrderStatus.PENDING);
orderRepository.save(order);
// Publish event - Payment Service subscribes
kafkaTemplate.send("order-events", new OrderPlacedEvent(
order.getId(),
order.getCustomerId(),
order.getTotalAmount(),
order.getItems()
));
return order.getId();
}
// Payment completed event listener
@KafkaListener(topics = "payment-events", groupId = "order-service")
public void onPaymentCompleted(PaymentCompletedEvent event) {
Order order = orderRepository.findById(event.orderId()).orElseThrow();
order.setStatus(OrderStatus.PAID);
orderRepository.save(order);
}
// Payment failed event listener
@KafkaListener(topics = "payment-events", groupId = "order-service")
public void onPaymentFailed(PaymentFailedEvent event) {
Order order = orderRepository.findById(event.orderId()).orElseThrow();
order.setStatus(OrderStatus.CANCELLED);
order.setCancellationReason(event.reason());
orderRepository.save(order);
}
}
// === Choreography Saga: Payment Service ===
@Service
@RequiredArgsConstructor
public class PaymentService {
private final PaymentRepository paymentRepository;
private final PaymentGateway paymentGateway;
private final KafkaTemplate<String, Object> kafkaTemplate;
@KafkaListener(topics = "order-events", groupId = "payment-service")
@Transactional
public void onOrderPlaced(OrderPlacedEvent event) {
try {
// Process payment
PaymentResult result = paymentGateway.charge(
event.customerId(),
event.totalAmount()
);
Payment payment = Payment.create(
event.orderId(),
event.customerId(),
event.totalAmount(),
result.transactionId()
);
paymentRepository.save(payment);
// Publish success event - Inventory Service subscribes
kafkaTemplate.send("payment-events", new PaymentCompletedEvent(
event.orderId(),
payment.getId(),
result.transactionId()
));
} catch (PaymentException e) {
// Publish failure event - Order Service subscribes and cancels
kafkaTemplate.send("payment-events", new PaymentFailedEvent(
event.orderId(),
e.getMessage()
));
}
}
}
Choreography Pros and Cons:
| Pros | Cons |
|---|---|
| Loose coupling between services | Difficult to understand full flow |
| Each service independently deployable | Complex debugging |
| No Single Point of Failure (SPOF) | Possible circular dependencies |
| Relatively simple implementation | Event tracking complexity grows with services |
4-2. Orchestration
In the Orchestration approach, a central coordinator (Orchestrator) manages the entire workflow. It sends commands to each service and decides the next step based on responses.
E-commerce Order Flow (Orchestration):
Order Saga Orchestrator
/ | \
v v v
Payment Service Inventory Shipping
(charge) (reserve) (create)
v v v
(result) (result) (result)
\ | /
Order Saga Orchestrator
(final decision)
// === Orchestration Saga: Using Temporal Framework ===
@WorkflowInterface
public interface OrderSagaWorkflow {
@WorkflowMethod
OrderResult processOrder(PlaceOrderCommand command);
}
@WorkflowImpl(workflows = OrderSagaWorkflow.class)
public class OrderSagaWorkflowImpl implements OrderSagaWorkflow {
// Activity stubs for each service
private final PaymentActivities paymentActivities =
Workflow.newActivityStub(PaymentActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.setRetryOptions(RetryOptions.newBuilder()
.setMaximumAttempts(3)
.build())
.build());
private final InventoryActivities inventoryActivities =
Workflow.newActivityStub(InventoryActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.build());
private final ShippingActivities shippingActivities =
Workflow.newActivityStub(ShippingActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(60))
.build());
@Override
public OrderResult processOrder(PlaceOrderCommand command) {
String paymentId = null;
String reservationId = null;
try {
// Step 1: Payment
paymentId = paymentActivities.processPayment(
command.customerId(),
command.totalAmount()
);
// Step 2: Inventory reservation
reservationId = inventoryActivities.reserveInventory(
command.items()
);
// Step 3: Create shipment
String shipmentId = shippingActivities.createShipment(
command.orderId(),
command.shippingAddress()
);
return OrderResult.success(command.orderId(), shipmentId);
} catch (Exception e) {
// Execute compensating transactions
compensate(paymentId, reservationId);
return OrderResult.failed(command.orderId(), e.getMessage());
}
}
private void compensate(String paymentId, String reservationId) {
// Execute compensating transactions in reverse order
if (reservationId != null) {
inventoryActivities.cancelReservation(reservationId);
}
if (paymentId != null) {
paymentActivities.refundPayment(paymentId);
}
}
}
// === Activity Implementation ===
@ActivityInterface
public interface PaymentActivities {
String processPayment(UUID customerId, BigDecimal amount);
void refundPayment(String paymentId);
}
@Component
@RequiredArgsConstructor
public class PaymentActivitiesImpl implements PaymentActivities {
private final PaymentGateway paymentGateway;
private final PaymentRepository paymentRepository;
@Override
public String processPayment(UUID customerId, BigDecimal amount) {
PaymentResult result = paymentGateway.charge(customerId, amount);
Payment payment = Payment.create(customerId, amount, result.transactionId());
paymentRepository.save(payment);
return payment.getId().toString();
}
@Override
public void refundPayment(String paymentId) {
Payment payment = paymentRepository.findById(UUID.fromString(paymentId))
.orElseThrow();
paymentGateway.refund(payment.getTransactionId());
payment.setStatus(PaymentStatus.REFUNDED);
paymentRepository.save(payment);
}
}
Orchestration Pros and Cons:
| Pros | Cons |
|---|---|
| Entire workflow managed in one place | Orchestrator can become SPOF |
| Easy debugging and monitoring | Higher coupling than Choreography |
| Easier complex compensation logic | Risk of logic concentration in orchestrator |
| Framework support (Temporal/Cadence) | Additional infrastructure (workflow engine) needed |
4-3. Choreography vs Orchestration Selection Guide
| Criteria | Choreography | Orchestration |
|---|---|---|
| Number of services | 3-4 or fewer | 5 or more |
| Workflow complexity | Linear, simple | Conditional branching, parallel processing |
| Observability needs | Low | High |
| Team structure | Independent teams | Central platform team |
| Compensation logic | Simple | Complex (multiple paths) |
4-4. Compensating Transactions
Compensating transactions semantically undo already committed transactions. They do not physically roll back but perform reverse actions.
// Compensating transaction mapping
public enum CompensationAction {
PAYMENT_CHARGE -> PAYMENT_REFUND,
INVENTORY_RESERVE -> INVENTORY_RELEASE,
SHIPPING_CREATE -> SHIPPING_CANCEL,
COUPON_APPLY -> COUPON_RESTORE,
POINTS_DEDUCT -> POINTS_RESTORE
}
Semantic Compensation vs Exact Compensation:
- Exact Compensation: Restores precisely to original state (e.g., payment cancel -> full refund)
- Semantic Compensation: Provides a business-meaningful compensation (e.g., flight cancellation -> partial refund + credit)
In practice, Semantic Compensation is more common because exact restoration is often impossible after time passes.
5. Event Sourcing
Event Sourcing stores all events that caused state changes in order, instead of storing only the current state of an entity.
5-1. Core Concept
Traditional CRUD:
Account table: id=1, balance=750
Event Sourcing:
Event #1: AccountCreated(id=1, initialBalance=1000)
Event #2: MoneyWithdrawn(id=1, amount=200)
Event #3: MoneyDeposited(id=1, amount=150)
Event #4: MoneyWithdrawn(id=1, amount=200)
-> Replay: 1000 - 200 + 150 - 200 = 750 (current balance)
All state changes are appended to an immutable, append-only event log. No deletions or updates.
5-2. Event Store Implementation
// === Event Store Interface ===
public interface EventStore {
void saveEvents(UUID aggregateId, List<DomainEvent> events, int expectedVersion);
List<DomainEvent> getEvents(UUID aggregateId);
List<DomainEvent> getEvents(UUID aggregateId, int fromVersion);
}
// === PostgreSQL-based Event Store Implementation ===
@Repository
@RequiredArgsConstructor
public class PostgresEventStore implements EventStore {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
@Override
@Transactional
public void saveEvents(UUID aggregateId, List<DomainEvent> events,
int expectedVersion) {
// Optimistic concurrency control
Integer currentVersion = jdbcTemplate.queryForObject(
"SELECT COALESCE(MAX(version), 0) FROM event_store WHERE aggregate_id = ?",
Integer.class, aggregateId
);
if (currentVersion != expectedVersion) {
throw new ConcurrencyException(
"Concurrency conflict: expected version " + expectedVersion +
", actual version " + currentVersion
);
}
int version = expectedVersion;
for (DomainEvent event : events) {
version++;
jdbcTemplate.update(
"""
INSERT INTO event_store
(event_id, aggregate_id, aggregate_type, event_type,
event_data, version, created_at)
VALUES (?, ?, ?, ?, ?::jsonb, ?, ?)
""",
UUID.randomUUID(),
aggregateId,
event.getAggregateType(),
event.getClass().getSimpleName(),
objectMapper.writeValueAsString(event),
version,
Instant.now()
);
}
}
@Override
public List<DomainEvent> getEvents(UUID aggregateId) {
return jdbcTemplate.query(
"SELECT * FROM event_store WHERE aggregate_id = ? ORDER BY version",
this::mapRowToEvent,
aggregateId
);
}
}
-- Event Store table schema
CREATE TABLE event_store (
event_id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
version INTEGER NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (aggregate_id, version) -- optimistic concurrency control
);
CREATE INDEX idx_event_store_aggregate ON event_store (aggregate_id, version);
CREATE INDEX idx_event_store_type ON event_store (event_type, created_at);
5-3. Applying Events on Aggregates
// === Event-Sourced Aggregate ===
public class OrderAggregate {
private UUID id;
private UUID customerId;
private OrderStatus status;
private BigDecimal totalAmount;
private List<OrderItem> items = new ArrayList<>();
private int version = 0;
// Uncommitted events list
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
// Restore state from event history
public static OrderAggregate fromHistory(List<DomainEvent> history) {
OrderAggregate aggregate = new OrderAggregate();
for (DomainEvent event : history) {
aggregate.apply(event, false);
}
return aggregate;
}
// Handle new commands
public void placeOrder(UUID customerId, List<OrderItem> items) {
if (this.status != null) {
throw new IllegalStateException("Order already created");
}
BigDecimal total = items.stream()
.map(item -> item.price().multiply(BigDecimal.valueOf(item.quantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
apply(new OrderPlacedEvent(this.id, customerId, total, items, Instant.now()), true);
}
public void cancelOrder(String reason) {
if (this.status == OrderStatus.SHIPPED) {
throw new IllegalStateException("Cannot cancel shipped orders");
}
apply(new OrderCancelledEvent(this.id, reason, Instant.now()), true);
}
// Apply event (state change)
private void apply(DomainEvent event, boolean isNew) {
when(event); // State mutation
this.version++;
if (isNew) {
uncommittedEvents.add(event);
}
}
// State change logic per event type
private void when(DomainEvent event) {
switch (event) {
case OrderPlacedEvent e -> {
this.id = e.orderId();
this.customerId = e.customerId();
this.totalAmount = e.totalAmount();
this.items = new ArrayList<>(e.items());
this.status = OrderStatus.PLACED;
}
case OrderCancelledEvent e -> {
this.status = OrderStatus.CANCELLED;
}
case OrderShippedEvent e -> {
this.status = OrderStatus.SHIPPED;
}
default -> throw new IllegalArgumentException("Unknown event: " + event);
}
}
public List<DomainEvent> getUncommittedEvents() {
return Collections.unmodifiableList(uncommittedEvents);
}
public void clearUncommittedEvents() {
uncommittedEvents.clear();
}
}
5-4. Snapshot Pattern
When tens of thousands of events accumulate, replaying all of them every time is inefficient. Snapshots save the aggregate state at specific intervals, so only events after the snapshot need to be replayed.
// Snapshot save and restore
@Service
public class SnapshotService {
private static final int SNAPSHOT_INTERVAL = 100; // every 100 events
public OrderAggregate loadAggregate(UUID aggregateId) {
// 1. Get latest snapshot
Optional<Snapshot> snapshot = snapshotRepository
.findLatest(aggregateId);
if (snapshot.isPresent()) {
// 2. Restore state from snapshot
OrderAggregate aggregate = deserialize(snapshot.get().getData());
int fromVersion = snapshot.get().getVersion();
// 3. Replay only events after snapshot
List<DomainEvent> newEvents = eventStore
.getEvents(aggregateId, fromVersion + 1);
for (DomainEvent event : newEvents) {
aggregate.apply(event, false);
}
return aggregate;
} else {
// No snapshot, replay all
List<DomainEvent> allEvents = eventStore.getEvents(aggregateId);
return OrderAggregate.fromHistory(allEvents);
}
}
public void saveIfNeeded(OrderAggregate aggregate) {
if (aggregate.getVersion() % SNAPSHOT_INTERVAL == 0) {
snapshotRepository.save(new Snapshot(
aggregate.getId(),
aggregate.getVersion(),
serialize(aggregate),
Instant.now()
));
}
}
}
5-5. Event Sourcing vs CRUD Comparison
| Criteria | CRUD | Event Sourcing |
|---|---|---|
| Storage | Current state only | All state change events |
| Audit trail | Requires separate implementation | Built-in (events are the audit log) |
| Time travel | Not possible | Possible (reconstruct state at any point) |
| Storage volume | Relatively small | Grows with event accumulation |
| Complexity | Low | High (event design, snapshots, etc.) |
| Concurrency | Pessimistic/optimistic locking | Event version-based optimistic control |
| Performance | Easy read optimization | Replay cost (mitigated by snapshots) |
| Schema changes | Migration required | Event upcasting |
| CQRS integration | Optional | Natural combination |
5-6. When NOT to Use Event Sourcing
- Simple CRUD domains (blogs, configuration management)
- Strong consistency is mandatory (ATMs requiring real-time balance checks)
- Unstable event design in early stages (frequent changes mean high upcasting cost)
- Team lacks experience (steep learning curve)
- Regulatory requirements for data deletion (conflicts with GDPR right to be forgotten)
6. Outbox Pattern (The Core of Reliability)
6-1. The Problem: Dual Write
One of the most common mistakes in distributed systems is performing database saves and message publishing as separate operations.
// Dangerous code: Dual write problem
@Transactional
public void createOrder(Order order) {
orderRepository.save(order); // Step 1: DB save
kafkaTemplate.send("orders", event); // Step 2: Message publish
// Step 1 success + Step 2 failure = In DB but event not published
// Step 1 success + Step 2 success + DB commit failure = Event published but not in DB
}
DB transactions and message broker sends are different infrastructure, so they cannot be processed atomically.
6-2. Solution: Outbox Pattern
The Outbox Pattern stores events in an outbox table within the DB transaction, and a separate process delivers them to the message broker.
1. Business logic + outbox INSERT = single DB transaction (atomicity guaranteed)
2. Separate process (CDC/Poller) reads outbox table and publishes to Kafka
3. After successful publish, mark outbox record as completed
-- Outbox table schema
CREATE TABLE outbox (
id UUID PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id UUID NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
status VARCHAR(20) DEFAULT 'PENDING'
);
CREATE INDEX idx_outbox_pending ON outbox (status, created_at)
WHERE status = 'PENDING';
// === Outbox Pattern Implementation ===
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
@Transactional // Guaranteed by single transaction
public UUID createOrder(CreateOrderCommand command) {
// 1. Execute business logic
Order order = Order.create(command);
orderRepository.save(order);
// 2. Save event to outbox in same transaction
OutboxMessage message = OutboxMessage.builder()
.aggregateType("Order")
.aggregateId(order.getId())
.eventType("OrderCreated")
.payload(objectMapper.writeValueAsString(
new OrderCreatedEvent(order.getId(), order.getTotalAmount())
))
.build();
outboxRepository.save(message);
return order.getId();
// When transaction commits, both order and outbox are saved together
}
}
6-3. CDC (Debezium) Approach
Debezium monitors the DB transaction log (WAL) to capture changes and deliver them to Kafka.
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "dbz_password",
"database.dbname": "orderdb",
"database.server.name": "order-service",
"table.include.list": "public.outbox",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "events.order"
}
}
6-4. Polling vs CDC Comparison
| Criteria | Polling | CDC (Debezium) |
|---|---|---|
| Latency | Depends on polling interval (seconds-minutes) | Near real-time (ms) |
| DB load | Periodic queries cause load | Minimal (reads WAL) |
| Implementation complexity | Low (scheduler + query) | High (Debezium infrastructure) |
| Order guarantee | Outbox table order | WAL order preserved |
| Operational overhead | Low | Kafka Connect cluster needed |
| Scalability | Single poller bottleneck | Highly scalable |
7. Idempotency Patterns
In distributed systems, messages are delivered at-least-once. Network failures, broker restarts, consumer redeployments cause duplicate delivery. Idempotency guarantees that processing the same message multiple times produces the same result as processing it once.
7-1. Producer Idempotency (Kafka)
# Kafka Producer configuration
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.properties.acks=all
spring.kafka.producer.properties.max.in.flight.requests.per.connection=5
spring.kafka.producer.properties.retries=2147483647
Kafka's idempotent producer assigns sequence numbers to each message, enabling the broker to detect duplicates.
7-2. Consumer Idempotency
// === Idempotent Consumer Implementation ===
@Component
@RequiredArgsConstructor
public class IdempotentOrderEventHandler {
private final ProcessedEventRepository processedEventRepository;
private final OrderProjectionRepository orderProjectionRepository;
@KafkaListener(topics = "order-events", groupId = "order-projection")
@Transactional
public void handleOrderEvent(
@Payload OrderEvent event,
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Header("eventId") String eventId) {
// 1. Check if event already processed
if (processedEventRepository.existsById(eventId)) {
log.info("Ignoring duplicate event: eventId={}", eventId);
return;
}
// 2. Process business logic
processEvent(event);
// 3. Record as processed (same transaction)
processedEventRepository.save(new ProcessedEvent(
eventId,
event.getClass().getSimpleName(),
Instant.now()
));
}
private void processEvent(OrderEvent event) {
switch (event) {
case OrderCreatedEvent e -> createOrderProjection(e);
case OrderStatusChangedEvent e -> updateOrderStatus(e);
default -> log.warn("Unknown event type");
}
}
}
-- Processed events table
CREATE TABLE processed_events (
event_id VARCHAR(255) PRIMARY KEY,
event_type VARCHAR(255) NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
7-3. Unique Constraint vs Redis SET NX
| Approach | Pros | Cons |
|---|---|---|
| DB Unique Constraint | Atomic with transaction | Increases DB load |
| Redis SET NX | Very fast lookups | Not guaranteed if Redis fails |
| DB + Redis combo | Fast primary filter + reliable guarantee | Complex implementation |
// Redis SET NX for fast duplicate check
@Component
public class RedisIdempotencyGuard {
private final StringRedisTemplate redisTemplate;
private static final Duration TTL = Duration.ofHours(24);
public boolean tryAcquire(String idempotencyKey) {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(
"idempotency:" + idempotencyKey,
"processed",
TTL
);
return Boolean.TRUE.equals(result);
}
}
8. Dead Letter Queue Strategy
When message processing fails, infinite retries can paralyze the system. DLQ (Dead Letter Queue) isolates unprocessable messages into a separate queue to ensure system stability.
8-1. Retry with Exponential Backoff
// === Spring Kafka Retry + DLQ Configuration ===
@Configuration
public class KafkaConsumerConfig {
@Bean
public DefaultErrorHandler errorHandler(
KafkaTemplate<String, Object> kafkaTemplate) {
// Recoverer that sends to DLQ
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(
record.topic() + ".DLQ", record.partition()));
// Exponential Backoff retry configuration
ExponentialBackOff backOff = new ExponentialBackOff();
backOff.setInitialInterval(1000L); // 1 second
backOff.setMultiplier(2.0); // 2x increase
backOff.setMaxInterval(60000L); // max 60 seconds
backOff.setMaxElapsedTime(300000L); // max 5 minutes
DefaultErrorHandler errorHandler =
new DefaultErrorHandler(recoverer, backOff);
// Non-retryable exceptions (business errors)
errorHandler.addNotRetryableExceptions(
InvalidOrderException.class,
DuplicateEventException.class
);
return errorHandler;
}
}
8-2. DLQ Monitoring and Alerting
// === DLQ Monitoring Service ===
@Component
@RequiredArgsConstructor
public class DlqMonitoringService {
private final MeterRegistry meterRegistry;
private final AlertService alertService;
@KafkaListener(topics = "order-events.DLQ", groupId = "dlq-monitor")
public void monitorDlq(
ConsumerRecord<String, Object> record,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String errorMessage,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic) {
// Record metrics
meterRegistry.counter("dlq.messages.received",
"originalTopic", originalTopic,
"errorType", classifyError(errorMessage)
).increment();
// Send alert
alertService.sendAlert(DlqAlert.builder()
.originalTopic(originalTopic)
.messageKey(record.key())
.errorMessage(errorMessage)
.timestamp(Instant.ofEpochMilli(record.timestamp()))
.severity(classifySeverity(errorMessage))
.build()
);
log.error("DLQ message received: topic={}, key={}, error={}",
originalTopic, record.key(), errorMessage);
}
}
8-3. DLQ Reprocessing Pipeline
// === DLQ Message Reprocessing ===
@RestController
@RequestMapping("/api/dlq")
@RequiredArgsConstructor
public class DlqReprocessController {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final DlqMessageRepository dlqRepository;
// Single message reprocessing
@PostMapping("/reprocess/single")
public ResponseEntity<String> reprocessSingle(
@RequestParam String messageId) {
DlqMessage message = dlqRepository.findById(messageId).orElseThrow();
// Republish to original topic
kafkaTemplate.send(
message.getOriginalTopic(),
message.getKey(),
message.getValue()
);
message.setStatus("REPROCESSED");
message.setReprocessedAt(Instant.now());
dlqRepository.save(message);
return ResponseEntity.ok("Reprocessing requested: " + messageId);
}
// Batch reprocessing
@PostMapping("/reprocess/batch")
public ResponseEntity<String> reprocessBatch(
@RequestParam String originalTopic,
@RequestParam(defaultValue = "100") int batchSize) {
List<DlqMessage> messages = dlqRepository
.findByOriginalTopicAndStatus(originalTopic, "PENDING",
PageRequest.of(0, batchSize));
int count = 0;
for (DlqMessage message : messages) {
kafkaTemplate.send(message.getOriginalTopic(),
message.getKey(), message.getValue());
message.setStatus("REPROCESSED");
count++;
}
dlqRepository.saveAll(messages);
return ResponseEntity.ok(count + " messages reprocessing requested");
}
}
9. Exactly-Once Semantics
There are three levels of message delivery guarantees:
| Guarantee Level | Description | Complexity |
|---|---|---|
| At-most-once | Delivered at most once (may be lost) | Low |
| At-least-once | Delivered at least once (may be duplicated) | Medium |
| Exactly-once | Delivered exactly once | High |
9-1. Kafka Exactly-Once Implementation
Kafka supports Exactly-Once Semantics (EOS) since version 0.11.
// === Kafka Exactly-Once Configuration ===
@Configuration
public class KafkaExactlyOnceConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-");
config.put(ProducerConfig.ACKS_CONFIG, "all");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config);
}
}
9-2. End-to-End Exactly-Once
True End-to-End Exactly-Once must guarantee from Producer through Consumer to the final state persistence.
End-to-End Exactly-Once Chain:
Producer (idempotent + transactions)
-> Kafka Broker (transaction log)
-> Consumer (read_committed + manual offset)
-> Database (idempotency key + business logic)
All links in the chain must work together for Exactly-Once guarantee
// === End-to-End Exactly-Once Processing ===
@Component
@RequiredArgsConstructor
public class ExactlyOnceOrderProcessor {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final OrderRepository orderRepository;
private final ProcessedOffsetRepository offsetRepository;
@KafkaListener(topics = "payment-completed")
@Transactional("chainedKafkaTransactionManager")
public void processPaymentCompleted(
ConsumerRecord<String, PaymentCompletedEvent> record) {
String offsetKey = record.topic() + "-" +
record.partition() + "-" + record.offset();
// Check if offset already processed
if (offsetRepository.existsById(offsetKey)) {
return;
}
// Business logic
PaymentCompletedEvent event = record.value();
Order order = orderRepository.findById(event.orderId()).orElseThrow();
order.markAsPaid(event.paymentId());
orderRepository.save(order);
// Publish next step event (same Kafka transaction)
kafkaTemplate.send("order-paid",
new OrderPaidEvent(order.getId(), event.paymentId()));
// Record processed offset (same DB transaction)
offsetRepository.save(new ProcessedOffset(offsetKey, Instant.now()));
}
}
10. Reactive Streams vs Virtual Threads (2025)
10-1. Project Reactor: Backpressure as Core Design
Reactive Streams have non-blocking I/O and backpressure mechanisms as core design principles.
// === Reactor-based Async Processing ===
@Service
public class ReactiveOrderService {
public Flux<OrderEvent> processOrderStream() {
return kafkaReceiver.receive()
.flatMap(record -> {
OrderEvent event = record.value();
return processEvent(event)
.then(record.receiverOffset().commit())
.thenReturn(event);
}, 16) // Concurrency limit (backpressure)
.onErrorResume(e -> {
log.error("Processing failed", e);
return Mono.empty();
})
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
}
private Mono<Void> processEvent(OrderEvent event) {
return Mono.fromCallable(() -> {
// Business logic
return null;
}).subscribeOn(Schedulers.boundedElastic()).then();
}
}
10-2. Java 21 Virtual Threads: Simpler Code
Virtual Threads achieve high concurrency while maintaining traditional synchronous code style.
// === Virtual Threads-based Async Processing ===
@Configuration
public class VirtualThreadConfig {
@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerCustomizer() {
return protocolHandler -> {
protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
};
}
}
@Service
public class VirtualThreadOrderService {
// Synchronous code but running on Virtual Threads
// -> blocking I/O only blocks Virtual Thread, not OS thread
public OrderResult processOrder(UUID orderId) {
// Parallel processing: using StructuredTaskScope
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<PaymentResult> paymentTask = scope.fork(() ->
paymentService.processPayment(orderId)
);
Subtask<InventoryResult> inventoryTask = scope.fork(() ->
inventoryService.checkInventory(orderId)
);
scope.join();
scope.throwIfFailed();
return new OrderResult(
paymentTask.get(),
inventoryTask.get()
);
}
}
}
10-3. Selection Guide: Decision Matrix
| Criteria | Reactive (Reactor/WebFlux) | Virtual Threads |
|---|---|---|
| Code complexity | High (reactive operator learning) | Low (existing sync code) |
| Backpressure control | Built-in (Flux/Mono) | Manual implementation needed |
| Debugging | Difficult (complex stack traces) | Easy (traditional stack traces) |
| Memory efficiency | Excellent | Good (better than OS threads) |
| I/O-intensive workloads | Optimal | Very good |
| CPU-intensive workloads | Not ideal | Not ideal (use platform threads) |
| Legacy code migration | Complete rewrite needed | Configuration change only |
| Ecosystem maturity | High (5+ years) | Growing (2023~) |
Recommendations:
- New project + simple I/O: Virtual Threads
- Streaming data processing: Reactive Streams
- Existing Spring MVC project: Virtual Threads (minimal migration cost)
- Real-time data pipeline: Reactive Streams
11. Production Architecture: Payment System Design
A complete e-commerce payment system architecture combining all patterns.
E-commerce Payment System Architecture
======================================
[Client] -> [API Gateway]
|
v
[Order Service] --- Write DB (PostgreSQL)
| |
| (Outbox Pattern) | (CDC - Debezium)
| v
| [Kafka: order-events]
| / | \
v v v v
[Saga Orchestrator] [Payment] [Inventory] [Notification]
(Temporal) Service Service Service
| | |
| commands | |
v v v
[payment-commands] [payment- [inventory-
events] events]
|
[DLQ + Monitoring]
|
[Alerting (PagerDuty)]
Read Side:
[Kafka: order-events] -> [Order Projection] -> Read DB (MongoDB)
-> [Analytics Service] -> Data Warehouse
-> [Search Service] -> Elasticsearch
Applied Patterns Summary
| Pattern | Applied At | Purpose |
|---|---|---|
| CQRS | Order Service | Separate order read/write |
| Saga (Orchestration) | Temporal Orchestrator | Payment-inventory-shipping transaction mgmt |
| Event Sourcing | Order Aggregate | Order state change history |
| Outbox Pattern | Order Service to Kafka | DB-message atomicity guarantee |
| Idempotency | All Consumers | Prevent duplicate message processing |
| DLQ | All Kafka Consumers | Isolate failed messages |
| Exactly-Once | Payment processing | Prevent double charging |
Quiz
Q1. When the Write Model and Read Model in CQRS use different databases, what is the most commonly used approach to ensure data consistency between the two models?
Answer: Event-based asynchronous synchronization (Eventual Consistency)
When the Write Model changes state, it publishes events. The Read Model's projection handler subscribes to these events and updates the Read DB. There may be temporary inconsistency (ms to seconds) between the two models, but consistency is eventually guaranteed.
If strong consistency is required, reconsider whether CQRS is the right fit.
Q2. In a Choreography Saga, when Service A publishes an event and Service B fails to process it, how does Service A become aware of the failure?
Answer: Service B publishes a failure event (e.g., PaymentFailed), and Service A subscribes to it to become aware. In Choreography, each service must publish both success and failure events, and related services subscribe to respond appropriately.
This is one of the downsides of Choreography. As the number of services grows, event flow tracking becomes increasingly complex, and timeout-based detection may be needed as an additional mechanism.
Q3. When an Aggregate in Event Sourcing has 1 million accumulated events, what is the key technique to solve the performance issue of loading it?
Answer: Snapshot Pattern
Periodically (e.g., every 100 events), save the current state of the Aggregate as a snapshot. When loading, start from the most recent snapshot and only replay events after that point.
Example: Out of 1 million events, if the last snapshot is at event 999,900, only 100 events need to be replayed.
Q4. In the Outbox Pattern, what is the main reason the CDC (Change Data Capture) approach is preferred over the Polling approach?
Answer: CDC reads the DB transaction log (WAL) directly, detecting changes in near real-time (milliseconds). Polling executes queries periodically, causing higher latency (seconds to minutes) and additional DB load from frequent queries.
Additionally, CDC delivers events in WAL order, naturally preserving ordering, and since it reads the transaction log, there is virtually no additional query overhead.
Q5. To achieve Exactly-Once Semantics in Kafka, what configuration is needed at the Producer, Broker, and Consumer levels respectively?
Answer:
- Producer: enable.idempotence=true + transactional.id configuration (idempotent producer + transactions)
- Broker: Transaction log maintenance (enabled by default)
- Consumer: isolation.level=read_committed + manual offset commit + idempotent processing
For End-to-End Exactly-Once, the Consumer must also ensure idempotency when saving results to the DB. Kafka transactions only guarantee Exactly-Once within Kafka; to include external systems (DB), consumer-side idempotency is essential.
References
- Martin Fowler — "Event Sourcing" (martinfowler.com)
- Chris Richardson — "Microservices Patterns" (Manning, 2018)
- Vaughn Vernon — "Implementing Domain-Driven Design" (Addison-Wesley, 2013)
- Greg Young — "CQRS Documents" (cqrs.files.wordpress.com)
- Apache Kafka Documentation — "Exactly-Once Semantics" (kafka.apache.org)
- Debezium Documentation — "Outbox Event Router" (debezium.io)
- Temporal.io Documentation — "Saga Pattern with Temporal" (docs.temporal.io)
- Netflix Tech Blog — "Evolution of the Netflix Data Pipeline" (netflixtechblog.com)
- Uber Engineering Blog — "Building Reliable Reprocessing and Dead Letter Queues" (eng.uber.com)
- Confluent Blog — "Exactly-Once Semantics Are Possible" (confluent.io)
- AWS Architecture Blog — "Saga Orchestration Pattern" (aws.amazon.com)
- Microsoft Azure Docs — "CQRS Pattern" (learn.microsoft.com)
- Spring for Apache Kafka Reference — "Error Handling and DLQ" (docs.spring.io)
- JEP 444 — "Virtual Threads" (openjdk.org)
- Project Reactor Reference — "Backpressure" (projectreactor.io)
- Pat Helland — "Idempotence Is Not a Medical Condition" (ACM Queue, 2012)