Skip to content
Published on

이벤트 드리븐 아키텍처 설계 패턴 — Kafka, CQRS, Event Sourcing 실전 가이드

Authors
  • Name
    Twitter
이벤트 드리븐 아키텍처

들어가며

마이크로서비스가 늘어나면 서비스 간 통신이 복잡해집니다. 동기(Sync) 호출로 묶인 서비스들은 하나가 죽으면 연쇄 장애가 발생하죠. **이벤트 드리븐 아키텍처(EDA)**는 이 문제를 비동기 이벤트 기반으로 해결합니다. 이 글에서는 Kafka를 중심으로 EDA의 핵심 패턴인 CQRS와 Event Sourcing을 실전 코드와 함께 다룹니다.


1. 이벤트 드리븐 아키텍처 개요

동기 vs 비동기 통신

graph LR
    subgraph "동기 방식 (REST)"
        A[주문 서비스] -->|HTTP 호출| B[결제 서비스]
        B -->|HTTP 호출| C[재고 서비스]
        C -->|HTTP 호출| D[알림 서비스]
    end

동기 방식에서는 결제 서비스가 죽으면 주문도 실패합니다. 모든 서비스가 강하게 결합되어 있습니다.

graph LR
    subgraph "비동기 방식 (Event-Driven)"
        A2[주문 서비스] -->|이벤트 발행| K[Kafka]
        K -->|구독| B2[결제 서비스]
        K -->|구독| C2[재고 서비스]
        K -->|구독| D2[알림 서비스]
    end

비동기 방식에서는 주문 서비스가 이벤트만 발행하면 됩니다. 각 서비스가 독립적으로 이벤트를 소비합니다.

EDA의 핵심 구성요소

구성요소역할예시
Producer이벤트를 생성하고 발행주문 서비스
Event Broker이벤트를 저장하고 전달Apache Kafka
Consumer이벤트를 구독하고 처리결제, 재고, 알림 서비스
Event발생한 사실을 표현하는 메시지OrderCreated, PaymentCompleted

2. Apache Kafka 기초

Kafka 아키텍처

graph TB
    P1[Producer 1] --> T[Topic: orders]
    P2[Producer 2] --> T
    T --> PA0[Partition 0]
    T --> PA1[Partition 1]
    T --> PA2[Partition 2]
    PA0 --> C1[Consumer Group A - Consumer 1]
    PA1 --> C2[Consumer Group A - Consumer 2]
    PA2 --> C3[Consumer Group A - Consumer 3]
    PA0 --> C4[Consumer Group B - Consumer 1]
    PA1 --> C4
    PA2 --> C4

핵심 개념

  • Topic: 이벤트가 저장되는 논리적 채널 (예: orders, payments)
  • Partition: Topic 내 병렬 처리 단위. 파티션 키로 순서 보장
  • Consumer Group: 같은 그룹의 컨슈머는 파티션을 분배받아 병렬 처리
  • Offset: 각 파티션 내 메시지의 순번. 컨슈머가 어디까지 읽었는지 추적

Kafka Producer 구현 (Python)

from confluent_kafka import Producer
import json

config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'client.id': 'order-service',
    'acks': 'all',  # 모든 replica에 기록 확인
    'retries': 3,
    'retry.backoff.ms': 1000,
}

producer = Producer(config)

def publish_order_event(order: dict):
    """주문 이벤트 발행"""
    event = {
        'event_type': 'OrderCreated',
        'event_id': str(uuid.uuid4()),
        'timestamp': datetime.utcnow().isoformat(),
        'data': {
            'order_id': order['id'],
            'user_id': order['user_id'],
            'items': order['items'],
            'total_amount': order['total'],
        }
    }

    # 파티션 키 = user_id → 같은 유저의 주문은 같은 파티션
    producer.produce(
        topic='orders',
        key=str(order['user_id']).encode('utf-8'),
        value=json.dumps(event).encode('utf-8'),
        callback=delivery_callback,
    )
    producer.flush()

def delivery_callback(err, msg):
    if err:
        print(f'❌ 전송 실패: {err}')
    else:
        print(f'✅ 전송 성공: {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

Kafka Consumer 구현 (Python)

from confluent_kafka import Consumer

config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'group.id': 'payment-service',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,  # 수동 커밋으로 at-least-once 보장
}

consumer = Consumer(config)
consumer.subscribe(['orders'])

def process_events():
    """이벤트 소비 루프"""
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f'Error: {msg.error()}')
            continue

        event = json.loads(msg.value().decode('utf-8'))

        try:
            if event['event_type'] == 'OrderCreated':
                handle_order_created(event['data'])

            # 처리 성공 후 수동 커밋
            consumer.commit(msg)

        except Exception as e:
            print(f'처리 실패: {e}')
            # DLQ(Dead Letter Queue)로 전송
            send_to_dlq(event, str(e))

def handle_order_created(data: dict):
    """주문 생성 이벤트 처리 → 결제 시작"""
    payment = create_payment(
        order_id=data['order_id'],
        amount=data['total_amount'],
    )
    # 결제 완료 이벤트 발행
    publish_payment_event(payment)

3. CQRS (Command Query Responsibility Segregation)

CQRS는 **쓰기(Command)**와 읽기(Query) 모델을 분리하는 패턴입니다.

CQRS 아키텍처 흐름

graph TB
    Client[클라이언트]

    Client -->|Command| CmdAPI[Command API]
    Client -->|Query| QryAPI[Query API]

    CmdAPI --> WriteDB[(Write DB - PostgreSQL)]
    CmdAPI -->|이벤트 발행| Kafka[Kafka]

    Kafka -->|이벤트 소비| Projector[Projector / Denormalizer]
    Projector --> ReadDB[(Read DB - Elasticsearch/Redis)]

    QryAPI --> ReadDB

왜 CQRS인가?

기존 방식CQRS
하나의 모델로 읽기/쓰기읽기/쓰기 모델 분리
복잡한 JOIN으로 읽기 성능 저하읽기에 최적화된 비정규화 모델
스케일링 어려움읽기/쓰기 독립적 스케일링
단순한 도메인에 적합복잡한 도메인에 적합

CQRS 구현 예제

# === Command Side (쓰기) ===
class OrderCommandHandler:
    def __init__(self, db, event_publisher):
        self.db = db
        self.publisher = event_publisher

    def create_order(self, cmd: CreateOrderCommand):
        """주문 생성 커맨드 처리"""
        # 비즈니스 로직 검증
        if not self._validate_stock(cmd.items):
            raise InsufficientStockError()

        # Write DB에 저장
        order = Order(
            id=uuid.uuid4(),
            user_id=cmd.user_id,
            items=cmd.items,
            status='CREATED',
        )
        self.db.save(order)

        # 이벤트 발행 (비동기로 Read Model 업데이트)
        self.publisher.publish('orders', OrderCreatedEvent(
            order_id=order.id,
            user_id=order.user_id,
            items=order.items,
            total=order.total,
            created_at=order.created_at,
        ))
        return order.id


# === Projector (이벤트 → Read Model 변환) ===
class OrderProjector:
    def __init__(self, read_db):
        self.read_db = read_db  # Elasticsearch or Redis

    def handle_order_created(self, event: OrderCreatedEvent):
        """주문 생성 이벤트 → 읽기 모델에 비정규화하여 저장"""
        user = self.get_user_info(event.user_id)

        # 비정규화된 읽기 모델 (JOIN 불필요!)
        order_view = {
            'order_id': str(event.order_id),
            'user_name': user.name,
            'user_email': user.email,
            'items': [
                {'name': item.name, 'qty': item.qty, 'price': item.price}
                for item in event.items
            ],
            'total': event.total,
            'status': 'CREATED',
            'created_at': event.created_at.isoformat(),
        }
        self.read_db.index('orders', order_view)


# === Query Side (읽기) ===
class OrderQueryHandler:
    def __init__(self, read_db):
        self.read_db = read_db

    def get_user_orders(self, user_id: str, page: int = 1):
        """사용자 주문 목록 조회 (Read Model에서 바로 반환)"""
        return self.read_db.search('orders', {
            'query': {'term': {'user_id': user_id}},
            'sort': [{'created_at': 'desc'}],
            'from': (page - 1) * 20,
            'size': 20,
        })

4. Event Sourcing — 이벤트가 곧 데이터

Event Sourcing은 현재 상태를 저장하는 대신, 상태 변경 이벤트의 시퀀스를 저장하는 패턴입니다.

Event Sourcing 흐름

sequenceDiagram
    participant Client
    participant OrderAggregate
    participant EventStore
    participant Projector
    participant ReadDB

    Client->>OrderAggregate: CreateOrder 커맨드
    OrderAggregate->>EventStore: OrderCreated 이벤트 저장
    EventStore->>Projector: 이벤트 전파
    Projector->>ReadDB: 읽기 모델 업데이트

    Client->>OrderAggregate: PayOrder 커맨드
    OrderAggregate->>EventStore: OrderPaid 이벤트 저장
    EventStore->>Projector: 이벤트 전파
    Projector->>ReadDB: 읽기 모델 업데이트

    Note over EventStore: 이벤트는 절대 삭제/수정되지 않음!

기존 CRUD vs Event Sourcing

# 기존 CRUD — 현재 상태만 저장
orders 테이블:
| id | user_id | status    | total  |
|----|---------|-----------|--------|
| 1  | 100     | SHIPPED   | 50000  |

→ "언제 주문했고, 언제 결제되고, 언제 배송됐는지?" 알 수 없음!

# Event Sourcing — 모든 변경 이력 보존
events 테이블:
| seq | aggregate_id | type           | data                    | timestamp           |
|-----|-------------|----------------|-------------------------|---------------------|
| 1   | order-1     | OrderCreated   | {items: [...], total: 50000} | 2026-03-02 10:00 |
| 2   | order-1     | PaymentReceived| {method: "card"}        | 2026-03-02 10:05    |
| 3   | order-1     | OrderShipped   | {tracking: "KR123"}     | 2026-03-02 14:30    |

→ 전체 히스토리를 재구성 가능!

Event Sourcing 구현

from dataclasses import dataclass, field
from datetime import datetime
from typing import List
import json

# === 이벤트 정의 ===
@dataclass
class DomainEvent:
    event_id: str
    aggregate_id: str
    event_type: str
    data: dict
    timestamp: datetime
    version: int

# === Aggregate (도메인 객체) ===
class OrderAggregate:
    def __init__(self, order_id: str):
        self.id = order_id
        self.status = None
        self.items = []
        self.total = 0
        self.version = 0
        self._pending_events: List[DomainEvent] = []

    # --- Command Handler ---
    def create(self, user_id: str, items: list):
        """주문 생성 커맨드"""
        if self.status is not None:
            raise Exception("이미 생성된 주문입니다")

        # 이벤트 생성 (상태 변경 X, 이벤트만 기록)
        self._apply_event(DomainEvent(
            event_id=str(uuid.uuid4()),
            aggregate_id=self.id,
            event_type='OrderCreated',
            data={'user_id': user_id, 'items': items,
                  'total': sum(i['price'] * i['qty'] for i in items)},
            timestamp=datetime.utcnow(),
            version=self.version + 1,
        ))

    def pay(self, payment_method: str):
        """결제 커맨드"""
        if self.status != 'CREATED':
            raise Exception(f"결제할 수 없는 상태: {self.status}")

        self._apply_event(DomainEvent(
            event_id=str(uuid.uuid4()),
            aggregate_id=self.id,
            event_type='OrderPaid',
            data={'payment_method': payment_method},
            timestamp=datetime.utcnow(),
            version=self.version + 1,
        ))

    # --- Event Handler (상태 변경은 여기서만!) ---
    def _on_order_created(self, event: DomainEvent):
        self.status = 'CREATED'
        self.items = event.data['items']
        self.total = event.data['total']

    def _on_order_paid(self, event: DomainEvent):
        self.status = 'PAID'

    def _apply_event(self, event: DomainEvent):
        """이벤트 적용 (상태 변경 + 이벤트 저장)"""
        handler = {
            'OrderCreated': self._on_order_created,
            'OrderPaid': self._on_order_paid,
        }.get(event.event_type)

        if handler:
            handler(event)
        self.version = event.version
        self._pending_events.append(event)

    @classmethod
    def from_events(cls, order_id: str, events: List[DomainEvent]):
        """이벤트 시퀀스에서 현재 상태 복원"""
        aggregate = cls(order_id)
        for event in events:
            aggregate._apply_event(event)
        aggregate._pending_events.clear()  # 이미 저장된 이벤트
        return aggregate


# === Event Store ===
class EventStore:
    def __init__(self, db):
        self.db = db

    def save(self, aggregate: OrderAggregate):
        """미저장 이벤트를 Event Store에 기록"""
        for event in aggregate._pending_events:
            self.db.execute(
                """INSERT INTO events
                   (event_id, aggregate_id, event_type, data, timestamp, version)
                   VALUES (%s, %s, %s, %s, %s, %s)""",
                (event.event_id, event.aggregate_id, event.event_type,
                 json.dumps(event.data), event.timestamp, event.version)
            )
        aggregate._pending_events.clear()

    def load(self, aggregate_id: str) -> OrderAggregate:
        """Event Store에서 이벤트를 읽어 Aggregate 복원"""
        rows = self.db.query(
            "SELECT * FROM events WHERE aggregate_id = %s ORDER BY version",
            (aggregate_id,)
        )
        events = [DomainEvent(**row) for row in rows]
        return OrderAggregate.from_events(aggregate_id, events)

5. Saga 패턴 — 분산 트랜잭션 관리

마이크로서비스에서 여러 서비스에 걸친 트랜잭션을 관리하는 패턴입니다.

Choreography Saga (이벤트 기반)

sequenceDiagram
    participant Order as 주문 서비스
    participant Kafka
    participant Payment as 결제 서비스
    participant Stock as 재고 서비스
    participant Notify as 알림 서비스

    Order->>Kafka: OrderCreated
    Kafka->>Payment: OrderCreated 수신
    Payment->>Kafka: PaymentCompleted
    Kafka->>Stock: PaymentCompleted 수신
    Stock->>Kafka: StockReserved
    Kafka->>Notify: StockReserved 수신
    Notify->>Notify: 사용자에게 알림 전송

    Note over Payment,Stock: 결제 실패 시?
    Payment->>Kafka: PaymentFailed
    Kafka->>Order: PaymentFailed 수신
    Order->>Order: 주문 취소 (보상 트랜잭션)

Orchestration Saga (오케스트레이터 기반)

graph TB
    Orch[Saga Orchestrator]

    Orch -->|1. 주문 생성| Order[주문 서비스]
    Order -->|성공| Orch

    Orch -->|2. 결제 요청| Payment[결제 서비스]
    Payment -->|성공| Orch

    Orch -->|3. 재고 차감| Stock[재고 서비스]
    Stock -->|성공| Orch

    Orch -->|4. 알림 전송| Notify[알림 서비스]

    Stock -->|실패| Orch
    Orch -->|보상: 결제 취소| Payment
    Orch -->|보상: 주문 취소| Order

두 방식 비교

항목ChoreographyOrchestration
중앙 제어없음 (각 서비스가 자율)오케스트레이터가 제어
결합도느슨오케스트레이터에 의존
디버깅어려움 (이벤트 추적 필요)상대적으로 쉬움
복잡도서비스 수 증가 시 복잡오케스트레이터가 복잡
적합한 경우단순한 흐름 (3-4 단계)복잡한 흐름 (5+ 단계)

6. 실전 고려사항

멱등성 (Idempotency)

이벤트가 중복 전달될 수 있으므로 멱등성 보장이 필수입니다:

def handle_payment_event(event: dict):
    """멱등성 보장 — 같은 이벤트 중복 처리 방지"""
    event_id = event['event_id']

    # 이미 처리된 이벤트인지 확인
    if is_already_processed(event_id):
        logger.info(f'이미 처리된 이벤트: {event_id}')
        return

    # 비즈니스 로직 처리
    process_payment(event['data'])

    # 처리 완료 기록
    mark_as_processed(event_id)

Dead Letter Queue (DLQ)

처리 실패한 이벤트를 별도 큐에 보관:

MAX_RETRIES = 3

def consume_with_retry(event: dict, retry_count: int = 0):
    try:
        handle_event(event)
    except Exception as e:
        if retry_count < MAX_RETRIES:
            # 재시도 (exponential backoff)
            time.sleep(2 ** retry_count)
            consume_with_retry(event, retry_count + 1)
        else:
            # DLQ로 전송
            producer.produce(
                topic='orders.dlq',
                value=json.dumps({
                    'original_event': event,
                    'error': str(e),
                    'failed_at': datetime.utcnow().isoformat(),
                }).encode('utf-8'),
            )

이벤트 스키마 진화

# v1 이벤트
{"event_type": "OrderCreated", "version": 1,
 "data": {"order_id": "123", "total": 50000}}

# v2 이벤트 (currency 필드 추가)
{"event_type": "OrderCreated", "version": 2,
 "data": {"order_id": "123", "total": 50000, "currency": "KRW"}}

# 업캐스터로 v1 → v2 변환
def upcast_order_created_v1_to_v2(event: dict) -> dict:
    if event.get('version', 1) == 1:
        event['data']['currency'] = 'KRW'  # 기본값
        event['version'] = 2
    return event

7. EDA 도입 체크리스트

  • 서비스 간 동기 호출이 장애 전파를 일으키는가?
  • 읽기/쓰기 비율이 크게 다른가? → CQRS 고려
  • 감사 로그/이력 추적이 필요한가? → Event Sourcing 고려
  • 이벤트 멱등성 처리 전략이 있는가?
  • DLQ와 모니터링/알림이 설정되어 있는가?
  • 이벤트 스키마 버전 관리 전략이 있는가?
  • Saga 패턴으로 분산 트랜잭션을 관리하고 있는가?
  • 이벤트 순서 보장이 필요한 곳에 파티션 키를 설정했는가?

퀴즈

Q1: 이벤트 드리븐 아키텍처에서 동기 방식 대비 가장 큰 장점은? 서비스 간 느슨한 결합(loose coupling)입니다. 한 서비스의 장애가 다른 서비스로 전파되지 않고, 각 서비스가 독립적으로 스케일링할 수 있습니다.

Q2: Kafka에서 같은 Consumer Group 내의 컨슈머들은 어떻게 동작하는가? 같은 Consumer Group 내에서는 각 파티션이 하나의 컨슈머에게만 할당됩니다. 이를 통해 파티션 단위로 병렬 처리하면서도 파티션 내 순서를 보장합니다.

Q3: CQRS에서 읽기/쓰기 모델을 분리하는 이유는? 읽기와 쓰기의 요구사항이 다르기 때문입니다. 쓰기는 정규화된 모델로 데이터 정합성을, 읽기는 비정규화된 모델로 조회 성능을 각각 최적화할 수 있습니다.

Q4: Event Sourcing에서 현재 상태를 어떻게 복원하는가? Event Store에 저장된 이벤트 시퀀스를 처음부터 순서대로 재생(replay)하여 현재 상태를 구성합니다. Aggregate의 from_events() 메서드가 이 역할을 합니다.

Q5: Choreography Saga와 Orchestration Saga의 주요 차이점은? Choreography는 중앙 제어 없이 각 서비스가 이벤트를 주고받으며 자율적으로 진행하고, Orchestration은 중앙 오케스트레이터가 각 서비스에 순서대로 명령을 내립니다.

Q6: 이벤트 처리에서 멱등성(idempotency)이 중요한 이유는? 네트워크 오류나 컨슈머 재시작으로 같은 이벤트가 여러 번 전달될 수 있습니다. 멱등성을 보장하지 않으면 중복 결제, 중복 재고 차감 등의 문제가 발생합니다.

Q7: Dead Letter Queue(DLQ)의 역할은? 최대 재시도 횟수를 초과하여 처리에 실패한 이벤트를 별도 큐에 보관하여, 나중에 수동으로 확인하고 재처리할 수 있도록 합니다.

Q8: Kafka에서 파티션 키를 user_id로 설정하는 이유는? 같은 user_id를 가진 이벤트가 항상 같은 파티션에 들어가므로, 해당 사용자의 이벤트 순서가 보장됩니다.

Q9: Event Sourcing의 이벤트를 수정하거나 삭제할 수 있는가? 원칙적으로 불가능합니다. 이벤트는 불변(immutable)이며, 상태를 변경하려면 새로운 보상 이벤트를 추가해야 합니다. 이것이 Event Sourcing의 핵심 원칙입니다.

Q10: CQRS의 단점은 무엇인가? 읽기/쓰기 모델 간의 데이터 일관성이 즉각적이지 않은 최종 일관성(eventual consistency) 모델이 되며, 시스템 복잡도가 증가합니다. 단순한 CRUD 애플리케이션에는 과도한 설계가 될 수 있습니다.