Skip to content

Split View: 이벤트 드리븐 아키텍처 설계 패턴 — Kafka, CQRS, Event Sourcing 실전 가이드

|

이벤트 드리븐 아키텍처 설계 패턴 — Kafka, CQRS, Event Sourcing 실전 가이드

이벤트 드리븐 아키텍처

들어가며

마이크로서비스가 늘어나면 서비스 간 통신이 복잡해집니다. 동기(Sync) 호출로 묶인 서비스들은 하나가 죽으면 연쇄 장애가 발생하죠. **이벤트 드리븐 아키텍처(EDA)**는 이 문제를 비동기 이벤트 기반으로 해결합니다. 이 글에서는 Kafka를 중심으로 EDA의 핵심 패턴인 CQRS와 Event Sourcing을 실전 코드와 함께 다룹니다.


1. 이벤트 드리븐 아키텍처 개요

동기 vs 비동기 통신

graph LR
    subgraph "동기 방식 (REST)"
        A[주문 서비스] -->|HTTP 호출| B[결제 서비스]
        B -->|HTTP 호출| C[재고 서비스]
        C -->|HTTP 호출| D[알림 서비스]
    end

동기 방식에서는 결제 서비스가 죽으면 주문도 실패합니다. 모든 서비스가 강하게 결합되어 있습니다.

graph LR
    subgraph "비동기 방식 (Event-Driven)"
        A2[주문 서비스] -->|이벤트 발행| K[Kafka]
        K -->|구독| B2[결제 서비스]
        K -->|구독| C2[재고 서비스]
        K -->|구독| D2[알림 서비스]
    end

비동기 방식에서는 주문 서비스가 이벤트만 발행하면 됩니다. 각 서비스가 독립적으로 이벤트를 소비합니다.

EDA의 핵심 구성요소

구성요소역할예시
Producer이벤트를 생성하고 발행주문 서비스
Event Broker이벤트를 저장하고 전달Apache Kafka
Consumer이벤트를 구독하고 처리결제, 재고, 알림 서비스
Event발생한 사실을 표현하는 메시지OrderCreated, PaymentCompleted

2. Apache Kafka 기초

Kafka 아키텍처

graph TB
    P1[Producer 1] --> T[Topic: orders]
    P2[Producer 2] --> T
    T --> PA0[Partition 0]
    T --> PA1[Partition 1]
    T --> PA2[Partition 2]
    PA0 --> C1[Consumer Group A - Consumer 1]
    PA1 --> C2[Consumer Group A - Consumer 2]
    PA2 --> C3[Consumer Group A - Consumer 3]
    PA0 --> C4[Consumer Group B - Consumer 1]
    PA1 --> C4
    PA2 --> C4

핵심 개념

  • Topic: 이벤트가 저장되는 논리적 채널 (예: orders, payments)
  • Partition: Topic 내 병렬 처리 단위. 파티션 키로 순서 보장
  • Consumer Group: 같은 그룹의 컨슈머는 파티션을 분배받아 병렬 처리
  • Offset: 각 파티션 내 메시지의 순번. 컨슈머가 어디까지 읽었는지 추적

Kafka Producer 구현 (Python)

from confluent_kafka import Producer
import json

config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'client.id': 'order-service',
    'acks': 'all',  # 모든 replica에 기록 확인
    'retries': 3,
    'retry.backoff.ms': 1000,
}

producer = Producer(config)

def publish_order_event(order: dict):
    """주문 이벤트 발행"""
    event = {
        'event_type': 'OrderCreated',
        'event_id': str(uuid.uuid4()),
        'timestamp': datetime.utcnow().isoformat(),
        'data': {
            'order_id': order['id'],
            'user_id': order['user_id'],
            'items': order['items'],
            'total_amount': order['total'],
        }
    }

    # 파티션 키 = user_id → 같은 유저의 주문은 같은 파티션
    producer.produce(
        topic='orders',
        key=str(order['user_id']).encode('utf-8'),
        value=json.dumps(event).encode('utf-8'),
        callback=delivery_callback,
    )
    producer.flush()

def delivery_callback(err, msg):
    if err:
        print(f'❌ 전송 실패: {err}')
    else:
        print(f'✅ 전송 성공: {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

Kafka Consumer 구현 (Python)

from confluent_kafka import Consumer

config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'group.id': 'payment-service',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,  # 수동 커밋으로 at-least-once 보장
}

consumer = Consumer(config)
consumer.subscribe(['orders'])

def process_events():
    """이벤트 소비 루프"""
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f'Error: {msg.error()}')
            continue

        event = json.loads(msg.value().decode('utf-8'))

        try:
            if event['event_type'] == 'OrderCreated':
                handle_order_created(event['data'])

            # 처리 성공 후 수동 커밋
            consumer.commit(msg)

        except Exception as e:
            print(f'처리 실패: {e}')
            # DLQ(Dead Letter Queue)로 전송
            send_to_dlq(event, str(e))

def handle_order_created(data: dict):
    """주문 생성 이벤트 처리 → 결제 시작"""
    payment = create_payment(
        order_id=data['order_id'],
        amount=data['total_amount'],
    )
    # 결제 완료 이벤트 발행
    publish_payment_event(payment)

3. CQRS (Command Query Responsibility Segregation)

CQRS는 **쓰기(Command)**와 읽기(Query) 모델을 분리하는 패턴입니다.

CQRS 아키텍처 흐름

graph TB
    Client[클라이언트]

    Client -->|Command| CmdAPI[Command API]
    Client -->|Query| QryAPI[Query API]

    CmdAPI --> WriteDB[(Write DB - PostgreSQL)]
    CmdAPI -->|이벤트 발행| Kafka[Kafka]

    Kafka -->|이벤트 소비| Projector[Projector / Denormalizer]
    Projector --> ReadDB[(Read DB - Elasticsearch/Redis)]

    QryAPI --> ReadDB

왜 CQRS인가?

기존 방식CQRS
하나의 모델로 읽기/쓰기읽기/쓰기 모델 분리
복잡한 JOIN으로 읽기 성능 저하읽기에 최적화된 비정규화 모델
스케일링 어려움읽기/쓰기 독립적 스케일링
단순한 도메인에 적합복잡한 도메인에 적합

CQRS 구현 예제

# === Command Side (쓰기) ===
class OrderCommandHandler:
    def __init__(self, db, event_publisher):
        self.db = db
        self.publisher = event_publisher

    def create_order(self, cmd: CreateOrderCommand):
        """주문 생성 커맨드 처리"""
        # 비즈니스 로직 검증
        if not self._validate_stock(cmd.items):
            raise InsufficientStockError()

        # Write DB에 저장
        order = Order(
            id=uuid.uuid4(),
            user_id=cmd.user_id,
            items=cmd.items,
            status='CREATED',
        )
        self.db.save(order)

        # 이벤트 발행 (비동기로 Read Model 업데이트)
        self.publisher.publish('orders', OrderCreatedEvent(
            order_id=order.id,
            user_id=order.user_id,
            items=order.items,
            total=order.total,
            created_at=order.created_at,
        ))
        return order.id


# === Projector (이벤트 → Read Model 변환) ===
class OrderProjector:
    def __init__(self, read_db):
        self.read_db = read_db  # Elasticsearch or Redis

    def handle_order_created(self, event: OrderCreatedEvent):
        """주문 생성 이벤트 → 읽기 모델에 비정규화하여 저장"""
        user = self.get_user_info(event.user_id)

        # 비정규화된 읽기 모델 (JOIN 불필요!)
        order_view = {
            'order_id': str(event.order_id),
            'user_name': user.name,
            'user_email': user.email,
            'items': [
                {'name': item.name, 'qty': item.qty, 'price': item.price}
                for item in event.items
            ],
            'total': event.total,
            'status': 'CREATED',
            'created_at': event.created_at.isoformat(),
        }
        self.read_db.index('orders', order_view)


# === Query Side (읽기) ===
class OrderQueryHandler:
    def __init__(self, read_db):
        self.read_db = read_db

    def get_user_orders(self, user_id: str, page: int = 1):
        """사용자 주문 목록 조회 (Read Model에서 바로 반환)"""
        return self.read_db.search('orders', {
            'query': {'term': {'user_id': user_id}},
            'sort': [{'created_at': 'desc'}],
            'from': (page - 1) * 20,
            'size': 20,
        })

4. Event Sourcing — 이벤트가 곧 데이터

Event Sourcing은 현재 상태를 저장하는 대신, 상태 변경 이벤트의 시퀀스를 저장하는 패턴입니다.

Event Sourcing 흐름

sequenceDiagram
    participant Client
    participant OrderAggregate
    participant EventStore
    participant Projector
    participant ReadDB

    Client->>OrderAggregate: CreateOrder 커맨드
    OrderAggregate->>EventStore: OrderCreated 이벤트 저장
    EventStore->>Projector: 이벤트 전파
    Projector->>ReadDB: 읽기 모델 업데이트

    Client->>OrderAggregate: PayOrder 커맨드
    OrderAggregate->>EventStore: OrderPaid 이벤트 저장
    EventStore->>Projector: 이벤트 전파
    Projector->>ReadDB: 읽기 모델 업데이트

    Note over EventStore: 이벤트는 절대 삭제/수정되지 않음!

기존 CRUD vs Event Sourcing

# 기존 CRUD — 현재 상태만 저장
orders 테이블:
| id | user_id | status    | total  |
|----|---------|-----------|--------|
| 1  | 100     | SHIPPED   | 50000  |

→ "언제 주문했고, 언제 결제되고, 언제 배송됐는지?" 알 수 없음!

# Event Sourcing — 모든 변경 이력 보존
events 테이블:
| seq | aggregate_id | type           | data                    | timestamp           |
|-----|-------------|----------------|-------------------------|---------------------|
| 1   | order-1     | OrderCreated   | {items: [...], total: 50000} | 2026-03-02 10:00 |
| 2   | order-1     | PaymentReceived| {method: "card"}        | 2026-03-02 10:05    |
| 3   | order-1     | OrderShipped   | {tracking: "KR123"}     | 2026-03-02 14:30    |

→ 전체 히스토리를 재구성 가능!

Event Sourcing 구현

from dataclasses import dataclass, field
from datetime import datetime
from typing import List
import json

# === 이벤트 정의 ===
@dataclass
class DomainEvent:
    event_id: str
    aggregate_id: str
    event_type: str
    data: dict
    timestamp: datetime
    version: int

# === Aggregate (도메인 객체) ===
class OrderAggregate:
    def __init__(self, order_id: str):
        self.id = order_id
        self.status = None
        self.items = []
        self.total = 0
        self.version = 0
        self._pending_events: List[DomainEvent] = []

    # --- Command Handler ---
    def create(self, user_id: str, items: list):
        """주문 생성 커맨드"""
        if self.status is not None:
            raise Exception("이미 생성된 주문입니다")

        # 이벤트 생성 (상태 변경 X, 이벤트만 기록)
        self._apply_event(DomainEvent(
            event_id=str(uuid.uuid4()),
            aggregate_id=self.id,
            event_type='OrderCreated',
            data={'user_id': user_id, 'items': items,
                  'total': sum(i['price'] * i['qty'] for i in items)},
            timestamp=datetime.utcnow(),
            version=self.version + 1,
        ))

    def pay(self, payment_method: str):
        """결제 커맨드"""
        if self.status != 'CREATED':
            raise Exception(f"결제할 수 없는 상태: {self.status}")

        self._apply_event(DomainEvent(
            event_id=str(uuid.uuid4()),
            aggregate_id=self.id,
            event_type='OrderPaid',
            data={'payment_method': payment_method},
            timestamp=datetime.utcnow(),
            version=self.version + 1,
        ))

    # --- Event Handler (상태 변경은 여기서만!) ---
    def _on_order_created(self, event: DomainEvent):
        self.status = 'CREATED'
        self.items = event.data['items']
        self.total = event.data['total']

    def _on_order_paid(self, event: DomainEvent):
        self.status = 'PAID'

    def _apply_event(self, event: DomainEvent):
        """이벤트 적용 (상태 변경 + 이벤트 저장)"""
        handler = {
            'OrderCreated': self._on_order_created,
            'OrderPaid': self._on_order_paid,
        }.get(event.event_type)

        if handler:
            handler(event)
        self.version = event.version
        self._pending_events.append(event)

    @classmethod
    def from_events(cls, order_id: str, events: List[DomainEvent]):
        """이벤트 시퀀스에서 현재 상태 복원"""
        aggregate = cls(order_id)
        for event in events:
            aggregate._apply_event(event)
        aggregate._pending_events.clear()  # 이미 저장된 이벤트
        return aggregate


# === Event Store ===
class EventStore:
    def __init__(self, db):
        self.db = db

    def save(self, aggregate: OrderAggregate):
        """미저장 이벤트를 Event Store에 기록"""
        for event in aggregate._pending_events:
            self.db.execute(
                """INSERT INTO events
                   (event_id, aggregate_id, event_type, data, timestamp, version)
                   VALUES (%s, %s, %s, %s, %s, %s)""",
                (event.event_id, event.aggregate_id, event.event_type,
                 json.dumps(event.data), event.timestamp, event.version)
            )
        aggregate._pending_events.clear()

    def load(self, aggregate_id: str) -> OrderAggregate:
        """Event Store에서 이벤트를 읽어 Aggregate 복원"""
        rows = self.db.query(
            "SELECT * FROM events WHERE aggregate_id = %s ORDER BY version",
            (aggregate_id,)
        )
        events = [DomainEvent(**row) for row in rows]
        return OrderAggregate.from_events(aggregate_id, events)

5. Saga 패턴 — 분산 트랜잭션 관리

마이크로서비스에서 여러 서비스에 걸친 트랜잭션을 관리하는 패턴입니다.

Choreography Saga (이벤트 기반)

sequenceDiagram
    participant Order as 주문 서비스
    participant Kafka
    participant Payment as 결제 서비스
    participant Stock as 재고 서비스
    participant Notify as 알림 서비스

    Order->>Kafka: OrderCreated
    Kafka->>Payment: OrderCreated 수신
    Payment->>Kafka: PaymentCompleted
    Kafka->>Stock: PaymentCompleted 수신
    Stock->>Kafka: StockReserved
    Kafka->>Notify: StockReserved 수신
    Notify->>Notify: 사용자에게 알림 전송

    Note over Payment,Stock: 결제 실패 시?
    Payment->>Kafka: PaymentFailed
    Kafka->>Order: PaymentFailed 수신
    Order->>Order: 주문 취소 (보상 트랜잭션)

Orchestration Saga (오케스트레이터 기반)

graph TB
    Orch[Saga Orchestrator]

    Orch -->|1. 주문 생성| Order[주문 서비스]
    Order -->|성공| Orch

    Orch -->|2. 결제 요청| Payment[결제 서비스]
    Payment -->|성공| Orch

    Orch -->|3. 재고 차감| Stock[재고 서비스]
    Stock -->|성공| Orch

    Orch -->|4. 알림 전송| Notify[알림 서비스]

    Stock -->|실패| Orch
    Orch -->|보상: 결제 취소| Payment
    Orch -->|보상: 주문 취소| Order

두 방식 비교

항목ChoreographyOrchestration
중앙 제어없음 (각 서비스가 자율)오케스트레이터가 제어
결합도느슨오케스트레이터에 의존
디버깅어려움 (이벤트 추적 필요)상대적으로 쉬움
복잡도서비스 수 증가 시 복잡오케스트레이터가 복잡
적합한 경우단순한 흐름 (3-4 단계)복잡한 흐름 (5+ 단계)

6. 실전 고려사항

멱등성 (Idempotency)

이벤트가 중복 전달될 수 있으므로 멱등성 보장이 필수입니다:

def handle_payment_event(event: dict):
    """멱등성 보장 — 같은 이벤트 중복 처리 방지"""
    event_id = event['event_id']

    # 이미 처리된 이벤트인지 확인
    if is_already_processed(event_id):
        logger.info(f'이미 처리된 이벤트: {event_id}')
        return

    # 비즈니스 로직 처리
    process_payment(event['data'])

    # 처리 완료 기록
    mark_as_processed(event_id)

Dead Letter Queue (DLQ)

처리 실패한 이벤트를 별도 큐에 보관:

MAX_RETRIES = 3

def consume_with_retry(event: dict, retry_count: int = 0):
    try:
        handle_event(event)
    except Exception as e:
        if retry_count < MAX_RETRIES:
            # 재시도 (exponential backoff)
            time.sleep(2 ** retry_count)
            consume_with_retry(event, retry_count + 1)
        else:
            # DLQ로 전송
            producer.produce(
                topic='orders.dlq',
                value=json.dumps({
                    'original_event': event,
                    'error': str(e),
                    'failed_at': datetime.utcnow().isoformat(),
                }).encode('utf-8'),
            )

이벤트 스키마 진화

# v1 이벤트
{"event_type": "OrderCreated", "version": 1,
 "data": {"order_id": "123", "total": 50000}}

# v2 이벤트 (currency 필드 추가)
{"event_type": "OrderCreated", "version": 2,
 "data": {"order_id": "123", "total": 50000, "currency": "KRW"}}

# 업캐스터로 v1 → v2 변환
def upcast_order_created_v1_to_v2(event: dict) -> dict:
    if event.get('version', 1) == 1:
        event['data']['currency'] = 'KRW'  # 기본값
        event['version'] = 2
    return event

7. EDA 도입 체크리스트

  • 서비스 간 동기 호출이 장애 전파를 일으키는가?
  • 읽기/쓰기 비율이 크게 다른가? → CQRS 고려
  • 감사 로그/이력 추적이 필요한가? → Event Sourcing 고려
  • 이벤트 멱등성 처리 전략이 있는가?
  • DLQ와 모니터링/알림이 설정되어 있는가?
  • 이벤트 스키마 버전 관리 전략이 있는가?
  • Saga 패턴으로 분산 트랜잭션을 관리하고 있는가?
  • 이벤트 순서 보장이 필요한 곳에 파티션 키를 설정했는가?

퀴즈

Q1: 이벤트 드리븐 아키텍처에서 동기 방식 대비 가장 큰 장점은? 서비스 간 느슨한 결합(loose coupling)입니다. 한 서비스의 장애가 다른 서비스로 전파되지 않고, 각 서비스가 독립적으로 스케일링할 수 있습니다.

Q2: Kafka에서 같은 Consumer Group 내의 컨슈머들은 어떻게 동작하는가? 같은 Consumer Group 내에서는 각 파티션이 하나의 컨슈머에게만 할당됩니다. 이를 통해 파티션 단위로 병렬 처리하면서도 파티션 내 순서를 보장합니다.

Q3: CQRS에서 읽기/쓰기 모델을 분리하는 이유는? 읽기와 쓰기의 요구사항이 다르기 때문입니다. 쓰기는 정규화된 모델로 데이터 정합성을, 읽기는 비정규화된 모델로 조회 성능을 각각 최적화할 수 있습니다.

Q4: Event Sourcing에서 현재 상태를 어떻게 복원하는가? Event Store에 저장된 이벤트 시퀀스를 처음부터 순서대로 재생(replay)하여 현재 상태를 구성합니다. Aggregate의 from_events() 메서드가 이 역할을 합니다.

Q5: Choreography Saga와 Orchestration Saga의 주요 차이점은? Choreography는 중앙 제어 없이 각 서비스가 이벤트를 주고받으며 자율적으로 진행하고, Orchestration은 중앙 오케스트레이터가 각 서비스에 순서대로 명령을 내립니다.

Q6: 이벤트 처리에서 멱등성(idempotency)이 중요한 이유는? 네트워크 오류나 컨슈머 재시작으로 같은 이벤트가 여러 번 전달될 수 있습니다. 멱등성을 보장하지 않으면 중복 결제, 중복 재고 차감 등의 문제가 발생합니다.

Q7: Dead Letter Queue(DLQ)의 역할은? 최대 재시도 횟수를 초과하여 처리에 실패한 이벤트를 별도 큐에 보관하여, 나중에 수동으로 확인하고 재처리할 수 있도록 합니다.

Q8: Kafka에서 파티션 키를 user_id로 설정하는 이유는? 같은 user_id를 가진 이벤트가 항상 같은 파티션에 들어가므로, 해당 사용자의 이벤트 순서가 보장됩니다.

Q9: Event Sourcing의 이벤트를 수정하거나 삭제할 수 있는가? 원칙적으로 불가능합니다. 이벤트는 불변(immutable)이며, 상태를 변경하려면 새로운 보상 이벤트를 추가해야 합니다. 이것이 Event Sourcing의 핵심 원칙입니다.

Q10: CQRS의 단점은 무엇인가? 읽기/쓰기 모델 간의 데이터 일관성이 즉각적이지 않은 최종 일관성(eventual consistency) 모델이 되며, 시스템 복잡도가 증가합니다. 단순한 CRUD 애플리케이션에는 과도한 설계가 될 수 있습니다.

Event-Driven Architecture Design Patterns — Kafka, CQRS, Event Sourcing Practical Guide

Event-Driven Architecture

Introduction

As the number of microservices grows, inter-service communication becomes complex. Services tightly coupled through synchronous calls suffer cascading failures when one goes down. Event-Driven Architecture (EDA) solves this problem through asynchronous events. In this article, we cover the core EDA patterns -- CQRS and Event Sourcing -- with practical code centered around Kafka.


1. Event-Driven Architecture Overview

Synchronous vs Asynchronous Communication

graph LR
    subgraph "Synchronous (REST)"
        A[Order Service] -->|HTTP Call| B[Payment Service]
        B -->|HTTP Call| C[Inventory Service]
        C -->|HTTP Call| D[Notification Service]
    end

In the synchronous approach, if the Payment Service goes down, the order also fails. All services are tightly coupled.

graph LR
    subgraph "Asynchronous (Event-Driven)"
        A2[Order Service] -->|Publish Event| K[Kafka]
        K -->|Subscribe| B2[Payment Service]
        K -->|Subscribe| C2[Inventory Service]
        K -->|Subscribe| D2[Notification Service]
    end

In the asynchronous approach, the Order Service only needs to publish an event. Each service consumes events independently.

Core Components of EDA

ComponentRoleExample
ProducerCreates and publishes eventsOrder Service
Event BrokerStores and delivers eventsApache Kafka
ConsumerSubscribes to and processes eventsPayment, Inventory, Notification Services
EventA message representing a fact that occurredOrderCreated, PaymentCompleted

2. Apache Kafka Basics

Kafka Architecture

graph TB
    P1[Producer 1] --> T[Topic: orders]
    P2[Producer 2] --> T
    T --> PA0[Partition 0]
    T --> PA1[Partition 1]
    T --> PA2[Partition 2]
    PA0 --> C1[Consumer Group A - Consumer 1]
    PA1 --> C2[Consumer Group A - Consumer 2]
    PA2 --> C3[Consumer Group A - Consumer 3]
    PA0 --> C4[Consumer Group B - Consumer 1]
    PA1 --> C4
    PA2 --> C4

Key Concepts

  • Topic: A logical channel where events are stored (e.g., orders, payments)
  • Partition: A unit of parallel processing within a Topic. Ordering is guaranteed by partition key
  • Consumer Group: Consumers in the same group are assigned partitions for parallel processing
  • Offset: The sequence number of a message within a partition. Tracks how far a consumer has read

Kafka Producer Implementation (Python)

from confluent_kafka import Producer
import json

config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'client.id': 'order-service',
    'acks': 'all',  # Confirm written to all replicas
    'retries': 3,
    'retry.backoff.ms': 1000,
}

producer = Producer(config)

def publish_order_event(order: dict):
    """Publish an order event"""
    event = {
        'event_type': 'OrderCreated',
        'event_id': str(uuid.uuid4()),
        'timestamp': datetime.utcnow().isoformat(),
        'data': {
            'order_id': order['id'],
            'user_id': order['user_id'],
            'items': order['items'],
            'total_amount': order['total'],
        }
    }

    # Partition key = user_id -> Orders from the same user go to the same partition
    producer.produce(
        topic='orders',
        key=str(order['user_id']).encode('utf-8'),
        value=json.dumps(event).encode('utf-8'),
        callback=delivery_callback,
    )
    producer.flush()

def delivery_callback(err, msg):
    if err:
        print(f'Delivery failed: {err}')
    else:
        print(f'Delivery succeeded: {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

Kafka Consumer Implementation (Python)

from confluent_kafka import Consumer

config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'group.id': 'payment-service',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,  # Manual commit for at-least-once guarantee
}

consumer = Consumer(config)
consumer.subscribe(['orders'])

def process_events():
    """Event consumption loop"""
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f'Error: {msg.error()}')
            continue

        event = json.loads(msg.value().decode('utf-8'))

        try:
            if event['event_type'] == 'OrderCreated':
                handle_order_created(event['data'])

            # Manual commit after successful processing
            consumer.commit(msg)

        except Exception as e:
            print(f'Processing failed: {e}')
            # Send to DLQ (Dead Letter Queue)
            send_to_dlq(event, str(e))

def handle_order_created(data: dict):
    """Handle order created event -> initiate payment"""
    payment = create_payment(
        order_id=data['order_id'],
        amount=data['total_amount'],
    )
    # Publish payment completed event
    publish_payment_event(payment)

3. CQRS (Command Query Responsibility Segregation)

CQRS is a pattern that separates the write (Command) and read (Query) models.

CQRS Architecture Flow

graph TB
    Client[Client]

    Client -->|Command| CmdAPI[Command API]
    Client -->|Query| QryAPI[Query API]

    CmdAPI --> WriteDB[(Write DB - PostgreSQL)]
    CmdAPI -->|Publish Event| Kafka[Kafka]

    Kafka -->|Consume Event| Projector[Projector / Denormalizer]
    Projector --> ReadDB[(Read DB - Elasticsearch/Redis)]

    QryAPI --> ReadDB

Why CQRS?

Traditional ApproachCQRS
Single model for reads and writesSeparate read/write models
Read performance degraded by complex JOINsDenormalized model optimized for reads
Difficult to scaleIndependent scaling of reads/writes
Suitable for simple domainsSuitable for complex domains

CQRS Implementation Example

# === Command Side (Write) ===
class OrderCommandHandler:
    def __init__(self, db, event_publisher):
        self.db = db
        self.publisher = event_publisher

    def create_order(self, cmd: CreateOrderCommand):
        """Handle create order command"""
        # Business logic validation
        if not self._validate_stock(cmd.items):
            raise InsufficientStockError()

        # Save to Write DB
        order = Order(
            id=uuid.uuid4(),
            user_id=cmd.user_id,
            items=cmd.items,
            status='CREATED',
        )
        self.db.save(order)

        # Publish event (asynchronously update Read Model)
        self.publisher.publish('orders', OrderCreatedEvent(
            order_id=order.id,
            user_id=order.user_id,
            items=order.items,
            total=order.total,
            created_at=order.created_at,
        ))
        return order.id


# === Projector (Event -> Read Model Transformation) ===
class OrderProjector:
    def __init__(self, read_db):
        self.read_db = read_db  # Elasticsearch or Redis

    def handle_order_created(self, event: OrderCreatedEvent):
        """Order created event -> Save denormalized data to read model"""
        user = self.get_user_info(event.user_id)

        # Denormalized read model (no JOINs needed!)
        order_view = {
            'order_id': str(event.order_id),
            'user_name': user.name,
            'user_email': user.email,
            'items': [
                {'name': item.name, 'qty': item.qty, 'price': item.price}
                for item in event.items
            ],
            'total': event.total,
            'status': 'CREATED',
            'created_at': event.created_at.isoformat(),
        }
        self.read_db.index('orders', order_view)


# === Query Side (Read) ===
class OrderQueryHandler:
    def __init__(self, read_db):
        self.read_db = read_db

    def get_user_orders(self, user_id: str, page: int = 1):
        """Retrieve user's order list (returned directly from Read Model)"""
        return self.read_db.search('orders', {
            'query': {'term': {'user_id': user_id}},
            'sort': [{'created_at': 'desc'}],
            'from': (page - 1) * 20,
            'size': 20,
        })

4. Event Sourcing -- Events Are the Data

Event Sourcing is a pattern that stores the sequence of state change events instead of storing the current state.

Event Sourcing Flow

sequenceDiagram
    participant Client
    participant OrderAggregate
    participant EventStore
    participant Projector
    participant ReadDB

    Client->>OrderAggregate: CreateOrder Command
    OrderAggregate->>EventStore: Store OrderCreated Event
    EventStore->>Projector: Propagate Event
    Projector->>ReadDB: Update Read Model

    Client->>OrderAggregate: PayOrder Command
    OrderAggregate->>EventStore: Store OrderPaid Event
    EventStore->>Projector: Propagate Event
    Projector->>ReadDB: Update Read Model

    Note over EventStore: Events are never deleted or modified!

Traditional CRUD vs Event Sourcing

# Traditional CRUD -- Only stores current state
orders table:
| id | user_id | status    | total  |
|----|---------|-----------|--------|
| 1  | 100     | SHIPPED   | 50000  |

-> "When was it ordered, when was it paid, when was it shipped?" Unknown!

# Event Sourcing -- Preserves entire change history
events table:
| seq | aggregate_id | type           | data                    | timestamp           |
|-----|-------------|----------------|-------------------------|---------------------|
| 1   | order-1     | OrderCreated   | {items: [...], total: 50000} | 2026-03-02 10:00 |
| 2   | order-1     | PaymentReceived| {method: "card"}        | 2026-03-02 10:05    |
| 3   | order-1     | OrderShipped   | {tracking: "KR123"}     | 2026-03-02 14:30    |

-> The entire history can be reconstructed!

Event Sourcing Implementation

from dataclasses import dataclass, field
from datetime import datetime
from typing import List
import json

# === Event Definitions ===
@dataclass
class DomainEvent:
    event_id: str
    aggregate_id: str
    event_type: str
    data: dict
    timestamp: datetime
    version: int

# === Aggregate (Domain Object) ===
class OrderAggregate:
    def __init__(self, order_id: str):
        self.id = order_id
        self.status = None
        self.items = []
        self.total = 0
        self.version = 0
        self._pending_events: List[DomainEvent] = []

    # --- Command Handler ---
    def create(self, user_id: str, items: list):
        """Create order command"""
        if self.status is not None:
            raise Exception("Order already exists")

        # Create event (no state change, only record the event)
        self._apply_event(DomainEvent(
            event_id=str(uuid.uuid4()),
            aggregate_id=self.id,
            event_type='OrderCreated',
            data={'user_id': user_id, 'items': items,
                  'total': sum(i['price'] * i['qty'] for i in items)},
            timestamp=datetime.utcnow(),
            version=self.version + 1,
        ))

    def pay(self, payment_method: str):
        """Payment command"""
        if self.status != 'CREATED':
            raise Exception(f"Cannot process payment in state: {self.status}")

        self._apply_event(DomainEvent(
            event_id=str(uuid.uuid4()),
            aggregate_id=self.id,
            event_type='OrderPaid',
            data={'payment_method': payment_method},
            timestamp=datetime.utcnow(),
            version=self.version + 1,
        ))

    # --- Event Handler (state changes happen only here!) ---
    def _on_order_created(self, event: DomainEvent):
        self.status = 'CREATED'
        self.items = event.data['items']
        self.total = event.data['total']

    def _on_order_paid(self, event: DomainEvent):
        self.status = 'PAID'

    def _apply_event(self, event: DomainEvent):
        """Apply event (state change + event storage)"""
        handler = {
            'OrderCreated': self._on_order_created,
            'OrderPaid': self._on_order_paid,
        }.get(event.event_type)

        if handler:
            handler(event)
        self.version = event.version
        self._pending_events.append(event)

    @classmethod
    def from_events(cls, order_id: str, events: List[DomainEvent]):
        """Restore current state from event sequence"""
        aggregate = cls(order_id)
        for event in events:
            aggregate._apply_event(event)
        aggregate._pending_events.clear()  # Already persisted events
        return aggregate


# === Event Store ===
class EventStore:
    def __init__(self, db):
        self.db = db

    def save(self, aggregate: OrderAggregate):
        """Write unsaved events to the Event Store"""
        for event in aggregate._pending_events:
            self.db.execute(
                """INSERT INTO events
                   (event_id, aggregate_id, event_type, data, timestamp, version)
                   VALUES (%s, %s, %s, %s, %s, %s)""",
                (event.event_id, event.aggregate_id, event.event_type,
                 json.dumps(event.data), event.timestamp, event.version)
            )
        aggregate._pending_events.clear()

    def load(self, aggregate_id: str) -> OrderAggregate:
        """Read events from the Event Store and restore the Aggregate"""
        rows = self.db.query(
            "SELECT * FROM events WHERE aggregate_id = %s ORDER BY version",
            (aggregate_id,)
        )
        events = [DomainEvent(**row) for row in rows]
        return OrderAggregate.from_events(aggregate_id, events)

5. Saga Pattern -- Distributed Transaction Management

A pattern for managing transactions that span multiple services in microservices.

Choreography Saga (Event-Based)

sequenceDiagram
    participant Order as Order Service
    participant Kafka
    participant Payment as Payment Service
    participant Stock as Inventory Service
    participant Notify as Notification Service

    Order->>Kafka: OrderCreated
    Kafka->>Payment: Receive OrderCreated
    Payment->>Kafka: PaymentCompleted
    Kafka->>Stock: Receive PaymentCompleted
    Stock->>Kafka: StockReserved
    Kafka->>Notify: Receive StockReserved
    Notify->>Notify: Send notification to user

    Note over Payment,Stock: What if payment fails?
    Payment->>Kafka: PaymentFailed
    Kafka->>Order: Receive PaymentFailed
    Order->>Order: Cancel order (compensating transaction)

Orchestration Saga (Orchestrator-Based)

graph TB
    Orch[Saga Orchestrator]

    Orch -->|1. Create Order| Order[Order Service]
    Order -->|Success| Orch

    Orch -->|2. Request Payment| Payment[Payment Service]
    Payment -->|Success| Orch

    Orch -->|3. Deduct Stock| Stock[Inventory Service]
    Stock -->|Success| Orch

    Orch -->|4. Send Notification| Notify[Notification Service]

    Stock -->|Failure| Orch
    Orch -->|Compensate: Cancel Payment| Payment
    Orch -->|Compensate: Cancel Order| Order

Comparison of Both Approaches

ItemChoreographyOrchestration
Central ControlNone (each service is autonomous)Orchestrator controls
CouplingLooseDepends on orchestrator
DebuggingDifficult (requires event tracing)Relatively easier
ComplexityComplex as services increaseOrchestrator becomes complex
Best Suited ForSimple flows (3-4 steps)Complex flows (5+ steps)

6. Practical Considerations

Idempotency

Since events can be delivered more than once, idempotency is essential:

def handle_payment_event(event: dict):
    """Ensure idempotency -- prevent duplicate event processing"""
    event_id = event['event_id']

    # Check if event was already processed
    if is_already_processed(event_id):
        logger.info(f'Event already processed: {event_id}')
        return

    # Process business logic
    process_payment(event['data'])

    # Record as processed
    mark_as_processed(event_id)

Dead Letter Queue (DLQ)

Store failed events in a separate queue:

MAX_RETRIES = 3

def consume_with_retry(event: dict, retry_count: int = 0):
    try:
        handle_event(event)
    except Exception as e:
        if retry_count < MAX_RETRIES:
            # Retry (exponential backoff)
            time.sleep(2 ** retry_count)
            consume_with_retry(event, retry_count + 1)
        else:
            # Send to DLQ
            producer.produce(
                topic='orders.dlq',
                value=json.dumps({
                    'original_event': event,
                    'error': str(e),
                    'failed_at': datetime.utcnow().isoformat(),
                }).encode('utf-8'),
            )

Event Schema Evolution

# v1 event
{"event_type": "OrderCreated", "version": 1,
 "data": {"order_id": "123", "total": 50000}}

# v2 event (currency field added)
{"event_type": "OrderCreated", "version": 2,
 "data": {"order_id": "123", "total": 50000, "currency": "KRW"}}

# Upcaster to convert v1 -> v2
def upcast_order_created_v1_to_v2(event: dict) -> dict:
    if event.get('version', 1) == 1:
        event['data']['currency'] = 'KRW'  # Default value
        event['version'] = 2
    return event

7. EDA Adoption Checklist

  • Are synchronous calls between services causing failure propagation?
  • Is the read/write ratio significantly different? Consider CQRS
  • Do you need audit logs/history tracking? Consider Event Sourcing
  • Do you have an event idempotency handling strategy?
  • Are DLQ monitoring and alerting configured?
  • Do you have an event schema versioning strategy?
  • Are you managing distributed transactions with the Saga pattern?
  • Have you set partition keys where event ordering is required?

Quiz

Q1: What is the biggest advantage of Event-Driven Architecture over synchronous approaches?

Loose coupling between services. A failure in one service does not propagate to others, and each service can scale independently.

Q2: How do consumers within the same Consumer Group behave in Kafka? Within the same Consumer Group, each partition is assigned to only one consumer. This enables parallel processing at the partition level while guaranteeing ordering within each partition.

Q3: Why does CQRS separate the read and write models? Because the requirements for reading and writing are different. Writes use a normalized model for data consistency, while reads use a denormalized model for optimized query performance.

Q4: How is the current state restored in Event Sourcing? By replaying the event sequence stored in the Event Store from the beginning in order to reconstruct the current state. The Aggregate's from_events() method fulfills this role.

Q5: What is the main difference between Choreography Saga and Orchestration Saga?

Choreography has no central control -- each service autonomously exchanges events to progress. Orchestration has a central orchestrator that issues commands to each service in order.

Q6: Why is idempotency important in event processing? The same event can be delivered multiple times due to network errors or consumer restarts. Without idempotency guarantees, issues like duplicate payments or duplicate inventory deductions can occur.

Q7: What is the role of a Dead Letter Queue (DLQ)? It stores events that have failed processing after exceeding the maximum retry count in a separate queue, allowing them to be manually inspected and reprocessed later.

Q8: Why set the partition key to user_id in Kafka? Events with the same user_id always go to the same partition, ensuring that events for a given user are processed in order.

Q9: Can events in Event Sourcing be modified or deleted? In principle, no. Events are immutable, and to change the state, a new compensating event must be added. This is a core principle of Event Sourcing.

Q10: What are the drawbacks of CQRS? Data consistency between read and write models becomes eventual consistency rather than immediate, and system complexity increases. It can be over-engineering for simple CRUD applications.