Skip to content
Published on

Event-Driven Architecture Design Patterns — Kafka, CQRS, Event Sourcing Practical Guide

Authors
  • Name
    Twitter
Event-Driven Architecture

Introduction

As the number of microservices grows, inter-service communication becomes complex. Services tightly coupled through synchronous calls suffer cascading failures when one goes down. Event-Driven Architecture (EDA) solves this problem through asynchronous events. In this article, we cover the core EDA patterns -- CQRS and Event Sourcing -- with practical code centered around Kafka.


1. Event-Driven Architecture Overview

Synchronous vs Asynchronous Communication

graph LR
    subgraph "Synchronous (REST)"
        A[Order Service] -->|HTTP Call| B[Payment Service]
        B -->|HTTP Call| C[Inventory Service]
        C -->|HTTP Call| D[Notification Service]
    end

In the synchronous approach, if the Payment Service goes down, the order also fails. All services are tightly coupled.

graph LR
    subgraph "Asynchronous (Event-Driven)"
        A2[Order Service] -->|Publish Event| K[Kafka]
        K -->|Subscribe| B2[Payment Service]
        K -->|Subscribe| C2[Inventory Service]
        K -->|Subscribe| D2[Notification Service]
    end

In the asynchronous approach, the Order Service only needs to publish an event. Each service consumes events independently.

Core Components of EDA

ComponentRoleExample
ProducerCreates and publishes eventsOrder Service
Event BrokerStores and delivers eventsApache Kafka
ConsumerSubscribes to and processes eventsPayment, Inventory, Notification Services
EventA message representing a fact that occurredOrderCreated, PaymentCompleted

2. Apache Kafka Basics

Kafka Architecture

graph TB
    P1[Producer 1] --> T[Topic: orders]
    P2[Producer 2] --> T
    T --> PA0[Partition 0]
    T --> PA1[Partition 1]
    T --> PA2[Partition 2]
    PA0 --> C1[Consumer Group A - Consumer 1]
    PA1 --> C2[Consumer Group A - Consumer 2]
    PA2 --> C3[Consumer Group A - Consumer 3]
    PA0 --> C4[Consumer Group B - Consumer 1]
    PA1 --> C4
    PA2 --> C4

Key Concepts

  • Topic: A logical channel where events are stored (e.g., orders, payments)
  • Partition: A unit of parallel processing within a Topic. Ordering is guaranteed by partition key
  • Consumer Group: Consumers in the same group are assigned partitions for parallel processing
  • Offset: The sequence number of a message within a partition. Tracks how far a consumer has read

Kafka Producer Implementation (Python)

from confluent_kafka import Producer
import json

config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'client.id': 'order-service',
    'acks': 'all',  # Confirm written to all replicas
    'retries': 3,
    'retry.backoff.ms': 1000,
}

producer = Producer(config)

def publish_order_event(order: dict):
    """Publish an order event"""
    event = {
        'event_type': 'OrderCreated',
        'event_id': str(uuid.uuid4()),
        'timestamp': datetime.utcnow().isoformat(),
        'data': {
            'order_id': order['id'],
            'user_id': order['user_id'],
            'items': order['items'],
            'total_amount': order['total'],
        }
    }

    # Partition key = user_id -> Orders from the same user go to the same partition
    producer.produce(
        topic='orders',
        key=str(order['user_id']).encode('utf-8'),
        value=json.dumps(event).encode('utf-8'),
        callback=delivery_callback,
    )
    producer.flush()

def delivery_callback(err, msg):
    if err:
        print(f'Delivery failed: {err}')
    else:
        print(f'Delivery succeeded: {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

Kafka Consumer Implementation (Python)

from confluent_kafka import Consumer

config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'group.id': 'payment-service',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,  # Manual commit for at-least-once guarantee
}

consumer = Consumer(config)
consumer.subscribe(['orders'])

def process_events():
    """Event consumption loop"""
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f'Error: {msg.error()}')
            continue

        event = json.loads(msg.value().decode('utf-8'))

        try:
            if event['event_type'] == 'OrderCreated':
                handle_order_created(event['data'])

            # Manual commit after successful processing
            consumer.commit(msg)

        except Exception as e:
            print(f'Processing failed: {e}')
            # Send to DLQ (Dead Letter Queue)
            send_to_dlq(event, str(e))

def handle_order_created(data: dict):
    """Handle order created event -> initiate payment"""
    payment = create_payment(
        order_id=data['order_id'],
        amount=data['total_amount'],
    )
    # Publish payment completed event
    publish_payment_event(payment)

3. CQRS (Command Query Responsibility Segregation)

CQRS is a pattern that separates the write (Command) and read (Query) models.

CQRS Architecture Flow

graph TB
    Client[Client]

    Client -->|Command| CmdAPI[Command API]
    Client -->|Query| QryAPI[Query API]

    CmdAPI --> WriteDB[(Write DB - PostgreSQL)]
    CmdAPI -->|Publish Event| Kafka[Kafka]

    Kafka -->|Consume Event| Projector[Projector / Denormalizer]
    Projector --> ReadDB[(Read DB - Elasticsearch/Redis)]

    QryAPI --> ReadDB

Why CQRS?

Traditional ApproachCQRS
Single model for reads and writesSeparate read/write models
Read performance degraded by complex JOINsDenormalized model optimized for reads
Difficult to scaleIndependent scaling of reads/writes
Suitable for simple domainsSuitable for complex domains

CQRS Implementation Example

# === Command Side (Write) ===
class OrderCommandHandler:
    def __init__(self, db, event_publisher):
        self.db = db
        self.publisher = event_publisher

    def create_order(self, cmd: CreateOrderCommand):
        """Handle create order command"""
        # Business logic validation
        if not self._validate_stock(cmd.items):
            raise InsufficientStockError()

        # Save to Write DB
        order = Order(
            id=uuid.uuid4(),
            user_id=cmd.user_id,
            items=cmd.items,
            status='CREATED',
        )
        self.db.save(order)

        # Publish event (asynchronously update Read Model)
        self.publisher.publish('orders', OrderCreatedEvent(
            order_id=order.id,
            user_id=order.user_id,
            items=order.items,
            total=order.total,
            created_at=order.created_at,
        ))
        return order.id


# === Projector (Event -> Read Model Transformation) ===
class OrderProjector:
    def __init__(self, read_db):
        self.read_db = read_db  # Elasticsearch or Redis

    def handle_order_created(self, event: OrderCreatedEvent):
        """Order created event -> Save denormalized data to read model"""
        user = self.get_user_info(event.user_id)

        # Denormalized read model (no JOINs needed!)
        order_view = {
            'order_id': str(event.order_id),
            'user_name': user.name,
            'user_email': user.email,
            'items': [
                {'name': item.name, 'qty': item.qty, 'price': item.price}
                for item in event.items
            ],
            'total': event.total,
            'status': 'CREATED',
            'created_at': event.created_at.isoformat(),
        }
        self.read_db.index('orders', order_view)


# === Query Side (Read) ===
class OrderQueryHandler:
    def __init__(self, read_db):
        self.read_db = read_db

    def get_user_orders(self, user_id: str, page: int = 1):
        """Retrieve user's order list (returned directly from Read Model)"""
        return self.read_db.search('orders', {
            'query': {'term': {'user_id': user_id}},
            'sort': [{'created_at': 'desc'}],
            'from': (page - 1) * 20,
            'size': 20,
        })

4. Event Sourcing -- Events Are the Data

Event Sourcing is a pattern that stores the sequence of state change events instead of storing the current state.

Event Sourcing Flow

sequenceDiagram
    participant Client
    participant OrderAggregate
    participant EventStore
    participant Projector
    participant ReadDB

    Client->>OrderAggregate: CreateOrder Command
    OrderAggregate->>EventStore: Store OrderCreated Event
    EventStore->>Projector: Propagate Event
    Projector->>ReadDB: Update Read Model

    Client->>OrderAggregate: PayOrder Command
    OrderAggregate->>EventStore: Store OrderPaid Event
    EventStore->>Projector: Propagate Event
    Projector->>ReadDB: Update Read Model

    Note over EventStore: Events are never deleted or modified!

Traditional CRUD vs Event Sourcing

# Traditional CRUD -- Only stores current state
orders table:
| id | user_id | status    | total  |
|----|---------|-----------|--------|
| 1  | 100     | SHIPPED   | 50000  |

-> "When was it ordered, when was it paid, when was it shipped?" Unknown!

# Event Sourcing -- Preserves entire change history
events table:
| seq | aggregate_id | type           | data                    | timestamp           |
|-----|-------------|----------------|-------------------------|---------------------|
| 1   | order-1     | OrderCreated   | {items: [...], total: 50000} | 2026-03-02 10:00 |
| 2   | order-1     | PaymentReceived| {method: "card"}        | 2026-03-02 10:05    |
| 3   | order-1     | OrderShipped   | {tracking: "KR123"}     | 2026-03-02 14:30    |

-> The entire history can be reconstructed!

Event Sourcing Implementation

from dataclasses import dataclass, field
from datetime import datetime
from typing import List
import json

# === Event Definitions ===
@dataclass
class DomainEvent:
    event_id: str
    aggregate_id: str
    event_type: str
    data: dict
    timestamp: datetime
    version: int

# === Aggregate (Domain Object) ===
class OrderAggregate:
    def __init__(self, order_id: str):
        self.id = order_id
        self.status = None
        self.items = []
        self.total = 0
        self.version = 0
        self._pending_events: List[DomainEvent] = []

    # --- Command Handler ---
    def create(self, user_id: str, items: list):
        """Create order command"""
        if self.status is not None:
            raise Exception("Order already exists")

        # Create event (no state change, only record the event)
        self._apply_event(DomainEvent(
            event_id=str(uuid.uuid4()),
            aggregate_id=self.id,
            event_type='OrderCreated',
            data={'user_id': user_id, 'items': items,
                  'total': sum(i['price'] * i['qty'] for i in items)},
            timestamp=datetime.utcnow(),
            version=self.version + 1,
        ))

    def pay(self, payment_method: str):
        """Payment command"""
        if self.status != 'CREATED':
            raise Exception(f"Cannot process payment in state: {self.status}")

        self._apply_event(DomainEvent(
            event_id=str(uuid.uuid4()),
            aggregate_id=self.id,
            event_type='OrderPaid',
            data={'payment_method': payment_method},
            timestamp=datetime.utcnow(),
            version=self.version + 1,
        ))

    # --- Event Handler (state changes happen only here!) ---
    def _on_order_created(self, event: DomainEvent):
        self.status = 'CREATED'
        self.items = event.data['items']
        self.total = event.data['total']

    def _on_order_paid(self, event: DomainEvent):
        self.status = 'PAID'

    def _apply_event(self, event: DomainEvent):
        """Apply event (state change + event storage)"""
        handler = {
            'OrderCreated': self._on_order_created,
            'OrderPaid': self._on_order_paid,
        }.get(event.event_type)

        if handler:
            handler(event)
        self.version = event.version
        self._pending_events.append(event)

    @classmethod
    def from_events(cls, order_id: str, events: List[DomainEvent]):
        """Restore current state from event sequence"""
        aggregate = cls(order_id)
        for event in events:
            aggregate._apply_event(event)
        aggregate._pending_events.clear()  # Already persisted events
        return aggregate


# === Event Store ===
class EventStore:
    def __init__(self, db):
        self.db = db

    def save(self, aggregate: OrderAggregate):
        """Write unsaved events to the Event Store"""
        for event in aggregate._pending_events:
            self.db.execute(
                """INSERT INTO events
                   (event_id, aggregate_id, event_type, data, timestamp, version)
                   VALUES (%s, %s, %s, %s, %s, %s)""",
                (event.event_id, event.aggregate_id, event.event_type,
                 json.dumps(event.data), event.timestamp, event.version)
            )
        aggregate._pending_events.clear()

    def load(self, aggregate_id: str) -> OrderAggregate:
        """Read events from the Event Store and restore the Aggregate"""
        rows = self.db.query(
            "SELECT * FROM events WHERE aggregate_id = %s ORDER BY version",
            (aggregate_id,)
        )
        events = [DomainEvent(**row) for row in rows]
        return OrderAggregate.from_events(aggregate_id, events)

5. Saga Pattern -- Distributed Transaction Management

A pattern for managing transactions that span multiple services in microservices.

Choreography Saga (Event-Based)

sequenceDiagram
    participant Order as Order Service
    participant Kafka
    participant Payment as Payment Service
    participant Stock as Inventory Service
    participant Notify as Notification Service

    Order->>Kafka: OrderCreated
    Kafka->>Payment: Receive OrderCreated
    Payment->>Kafka: PaymentCompleted
    Kafka->>Stock: Receive PaymentCompleted
    Stock->>Kafka: StockReserved
    Kafka->>Notify: Receive StockReserved
    Notify->>Notify: Send notification to user

    Note over Payment,Stock: What if payment fails?
    Payment->>Kafka: PaymentFailed
    Kafka->>Order: Receive PaymentFailed
    Order->>Order: Cancel order (compensating transaction)

Orchestration Saga (Orchestrator-Based)

graph TB
    Orch[Saga Orchestrator]

    Orch -->|1. Create Order| Order[Order Service]
    Order -->|Success| Orch

    Orch -->|2. Request Payment| Payment[Payment Service]
    Payment -->|Success| Orch

    Orch -->|3. Deduct Stock| Stock[Inventory Service]
    Stock -->|Success| Orch

    Orch -->|4. Send Notification| Notify[Notification Service]

    Stock -->|Failure| Orch
    Orch -->|Compensate: Cancel Payment| Payment
    Orch -->|Compensate: Cancel Order| Order

Comparison of Both Approaches

ItemChoreographyOrchestration
Central ControlNone (each service is autonomous)Orchestrator controls
CouplingLooseDepends on orchestrator
DebuggingDifficult (requires event tracing)Relatively easier
ComplexityComplex as services increaseOrchestrator becomes complex
Best Suited ForSimple flows (3-4 steps)Complex flows (5+ steps)

6. Practical Considerations

Idempotency

Since events can be delivered more than once, idempotency is essential:

def handle_payment_event(event: dict):
    """Ensure idempotency -- prevent duplicate event processing"""
    event_id = event['event_id']

    # Check if event was already processed
    if is_already_processed(event_id):
        logger.info(f'Event already processed: {event_id}')
        return

    # Process business logic
    process_payment(event['data'])

    # Record as processed
    mark_as_processed(event_id)

Dead Letter Queue (DLQ)

Store failed events in a separate queue:

MAX_RETRIES = 3

def consume_with_retry(event: dict, retry_count: int = 0):
    try:
        handle_event(event)
    except Exception as e:
        if retry_count < MAX_RETRIES:
            # Retry (exponential backoff)
            time.sleep(2 ** retry_count)
            consume_with_retry(event, retry_count + 1)
        else:
            # Send to DLQ
            producer.produce(
                topic='orders.dlq',
                value=json.dumps({
                    'original_event': event,
                    'error': str(e),
                    'failed_at': datetime.utcnow().isoformat(),
                }).encode('utf-8'),
            )

Event Schema Evolution

# v1 event
{"event_type": "OrderCreated", "version": 1,
 "data": {"order_id": "123", "total": 50000}}

# v2 event (currency field added)
{"event_type": "OrderCreated", "version": 2,
 "data": {"order_id": "123", "total": 50000, "currency": "KRW"}}

# Upcaster to convert v1 -> v2
def upcast_order_created_v1_to_v2(event: dict) -> dict:
    if event.get('version', 1) == 1:
        event['data']['currency'] = 'KRW'  # Default value
        event['version'] = 2
    return event

7. EDA Adoption Checklist

  • Are synchronous calls between services causing failure propagation?
  • Is the read/write ratio significantly different? Consider CQRS
  • Do you need audit logs/history tracking? Consider Event Sourcing
  • Do you have an event idempotency handling strategy?
  • Are DLQ monitoring and alerting configured?
  • Do you have an event schema versioning strategy?
  • Are you managing distributed transactions with the Saga pattern?
  • Have you set partition keys where event ordering is required?

Quiz

Q1: What is the biggest advantage of Event-Driven Architecture over synchronous approaches?

Loose coupling between services. A failure in one service does not propagate to others, and each service can scale independently.

Q2: How do consumers within the same Consumer Group behave in Kafka? Within the same Consumer Group, each partition is assigned to only one consumer. This enables parallel processing at the partition level while guaranteeing ordering within each partition.

Q3: Why does CQRS separate the read and write models? Because the requirements for reading and writing are different. Writes use a normalized model for data consistency, while reads use a denormalized model for optimized query performance.

Q4: How is the current state restored in Event Sourcing? By replaying the event sequence stored in the Event Store from the beginning in order to reconstruct the current state. The Aggregate's from_events() method fulfills this role.

Q5: What is the main difference between Choreography Saga and Orchestration Saga?

Choreography has no central control -- each service autonomously exchanges events to progress. Orchestration has a central orchestrator that issues commands to each service in order.

Q6: Why is idempotency important in event processing? The same event can be delivered multiple times due to network errors or consumer restarts. Without idempotency guarantees, issues like duplicate payments or duplicate inventory deductions can occur.

Q7: What is the role of a Dead Letter Queue (DLQ)? It stores events that have failed processing after exceeding the maximum retry count in a separate queue, allowing them to be manually inspected and reprocessed later.

Q8: Why set the partition key to user_id in Kafka? Events with the same user_id always go to the same partition, ensuring that events for a given user are processed in order.

Q9: Can events in Event Sourcing be modified or deleted? In principle, no. Events are immutable, and to change the state, a new compensating event must be added. This is a core principle of Event Sourcing.

Q10: What are the drawbacks of CQRS? Data consistency between read and write models becomes eventual consistency rather than immediate, and system complexity increases. It can be over-engineering for simple CRUD applications.