1. 왜 이벤트 드리븐 아키텍처인가
모놀리식에서 MSA로 전환하면 **분산 트랜잭션** 문제가 발생한다:
graph LR
A[주문 서비스] -->|HTTP 동기 호출| B[결제 서비스]
B -->|HTTP 동기 호출| C[재고 서비스]
C -->|HTTP 동기 호출| D[배송 서비스]
style A fill:#f96,stroke:#333
style D fill:#6f9,stroke:#333
**동기 방식의 문제:**
- 하나라도 실패하면 전체 실패 (cascading failure)
- 서비스 간 강결합
- 호출 체인이 길어질수록 지연 시간 증가
- 부분 실패 시 롤백 어려움
graph LR
A[주문 서비스] -->|이벤트 발행| K[Kafka]
K -->|이벤트 소비| B[결제 서비스]
K -->|이벤트 소비| C[재고 서비스]
K -->|이벤트 소비| D[배송 서비스]
style K fill:#ff9,stroke:#333
**이벤트 드리븐의 장점:**
- 서비스 간 느슨한 결합 (loose coupling)
- 비동기 처리로 응답 시간 단축
- 독립적 확장 가능
- 이벤트 리플레이로 상태 복구 가능
2. CQRS (Command Query Responsibility Segregation)
2.1 핵심 개념
**명령(Command)**과 **조회(Query)**를 분리하는 패턴:
graph TB
Client -->|Command: 주문 생성| CmdAPI[Command API]
Client -->|Query: 주문 조회| QryAPI[Query API]
CmdAPI --> WriteDB[(Write DB<br/>PostgreSQL)]
WriteDB -->|이벤트 발행| EventBus[Kafka]
EventBus -->|이벤트 소비| Projector[Projector]
Projector --> ReadDB[(Read DB<br/>Elasticsearch)]
QryAPI --> ReadDB
style CmdAPI fill:#f96,stroke:#333
style QryAPI fill:#6f9,stroke:#333
style EventBus fill:#ff9,stroke:#333
2.2 왜 분리하는가?
| 측면 | Command (쓰기) | Query (읽기) |
| ------ | ------------------- | ----------------------------- |
| 모델 | 정규화, 무결성 중심 | 비정규화, 성능 중심 |
| 확장 | 수직 확장 | 수평 확장 (복제) |
| DB | PostgreSQL, MySQL | Elasticsearch, Redis, MongoDB |
| 최적화 | 트랜잭션 안전성 | 읽기 성능 |
| 비율 | 전체의 10~20% | 전체의 80~90% |
2.3 구현 예제
Command Side
from dataclasses import dataclass
from datetime import datetime
@dataclass
class CreateOrderCommand:
customer_id: str
items: list[dict]
total_amount: float
class OrderCommandHandler:
def __init__(self, repo, event_publisher):
self.repo = repo
self.event_publisher = event_publisher
def handle_create_order(self, cmd: CreateOrderCommand):
order = Order(
id=str(uuid.uuid4()),
customer_id=cmd.customer_id,
items=cmd.items,
total_amount=cmd.total_amount,
status="PENDING",
created_at=datetime.utcnow()
)
Write DB에 저장
self.repo.save(order)
이벤트 발행
self.event_publisher.publish("order.created", {
"order_id": order.id,
"customer_id": order.customer_id,
"total_amount": order.total_amount,
"items": order.items,
})
return order.id
Query Side
class OrderQueryService:
def __init__(self, read_repo):
self.read_repo = read_repo # Elasticsearch
def get_order(self, order_id: str):
return self.read_repo.find_by_id(order_id)
def search_orders(self, customer_id: str, status: str = None):
return self.read_repo.search(customer_id=customer_id, status=status)
Projector (이벤트 → Read Model 동기화)
class OrderProjector:
def __init__(self, read_repo):
self.read_repo = read_repo
def on_order_created(self, event):
self.read_repo.upsert({
"id": event["order_id"],
"customer_id": event["customer_id"],
"total_amount": event["total_amount"],
"status": "PENDING",
"items": event["items"],
})
def on_order_paid(self, event):
self.read_repo.update(event["order_id"], {"status": "PAID"})
3. Saga 패턴: 분산 트랜잭션 관리
3.1 Choreography vs Orchestration
graph TB
subgraph "Choreography (이벤트 기반)"
O1[주문] -->|OrderCreated| P1[결제]
P1 -->|PaymentCompleted| S1[재고]
S1 -->|StockReserved| D1[배송]
D1 -->|DeliveryStarted| O1
end
graph TB
subgraph "Orchestration (중앙 조율)"
Orch[Saga Orchestrator]
Orch -->|1. 결제 요청| P2[결제]
P2 -->|결제 완료| Orch
Orch -->|2. 재고 예약| S2[재고]
S2 -->|예약 완료| Orch
Orch -->|3. 배송 요청| D2[배송]
D2 -->|배송 시작| Orch
end
| 방식 | 장점 | 단점 |
| ------------- | -------------------- | ----------------------------------- |
| Choreography | 단순, 느슨한 결합 | 흐름 파악 어려움, 순환 의존성 위험 |
| Orchestration | 흐름 명확, 관리 용이 | 중앙 집중, Orchestrator 단일 장애점 |
3.2 Orchestration Saga 구현
from enum import Enum
from kafka import KafkaProducer, KafkaConsumer
class SagaStep(Enum):
PAYMENT = "payment"
INVENTORY = "inventory"
DELIVERY = "delivery"
class OrderSagaOrchestrator:
STEPS = [SagaStep.PAYMENT, SagaStep.INVENTORY, SagaStep.DELIVERY]
def __init__(self, producer: KafkaProducer):
self.producer = producer
self.saga_state = {} # order_id -> {step, status}
def start_saga(self, order_id: str, order_data: dict):
self.saga_state[order_id] = {
"current_step": 0,
"data": order_data,
"completed_steps": [],
}
self._execute_step(order_id)
def _execute_step(self, order_id: str):
state = self.saga_state[order_id]
step_idx = state["current_step"]
if step_idx >= len(self.STEPS):
모든 단계 완료
self._publish(f"order.completed", {"order_id": order_id})
return
step = self.STEPS[step_idx]
self._publish(f"{step.value}.request", {
"order_id": order_id,
"saga_id": order_id,
**state["data"]
})
def handle_step_success(self, order_id: str, step: SagaStep):
state = self.saga_state[order_id]
state["completed_steps"].append(step)
state["current_step"] += 1
self._execute_step(order_id)
def handle_step_failure(self, order_id: str, failed_step: SagaStep):
"""보상 트랜잭션 실행 (역순)"""
state = self.saga_state[order_id]
for step in reversed(state["completed_steps"]):
self._publish(f"{step.value}.compensate", {
"order_id": order_id,
})
self._publish("order.failed", {
"order_id": order_id,
"failed_at": failed_step.value,
})
def _publish(self, topic: str, data: dict):
self.producer.send(topic, json.dumps(data).encode())
3.3 보상 트랜잭션 예시
sequenceDiagram
participant O as Orchestrator
participant P as 결제 서비스
participant I as 재고 서비스
participant D as 배송 서비스
O->>P: 1. 결제 요청
P-->>O: 결제 성공
O->>I: 2. 재고 예약
I-->>O: 재고 예약 성공
O->>D: 3. 배송 요청
D-->>O: ❌ 배송 실패 (주소 오류)
Note over O: 보상 트랜잭션 시작 (역순)
O->>I: 재고 예약 취소
I-->>O: 취소 완료
O->>P: 결제 환불
P-->>O: 환불 완료
O->>O: 주문 실패 처리
4. Event Sourcing
4.1 개념
상태를 직접 저장하는 대신, **상태 변경 이벤트**를 저장:
전통적 방식:
orders 테이블: {id: 1, status: "SHIPPED", amount: 50000}
Event Sourcing:
events 테이블:
1. OrderCreated {amount: 50000}
2. PaymentReceived {payment_id: "pay_123"}
3. StockReserved {warehouse: "seoul"}
4. OrderShipped {tracking: "KR12345"}
→ 이벤트를 순서대로 리플레이하면 현재 상태 복원
4.2 구현
class EventStore:
def __init__(self, db):
self.db = db
def append(self, aggregate_id: str, event_type: str, data: dict, version: int):
self.db.execute("""
INSERT INTO events (aggregate_id, event_type, data, version, created_at)
VALUES (%s, %s, %s, %s, NOW())
""", (aggregate_id, event_type, json.dumps(data), version))
def get_events(self, aggregate_id: str, after_version: int = 0):
return self.db.query("""
SELECT event_type, data, version
FROM events
WHERE aggregate_id = %s AND version > %s
ORDER BY version
""", (aggregate_id, after_version))
class Order:
def __init__(self):
self.id = None
self.status = None
self.version = 0
self._pending_events = []
def apply_event(self, event_type: str, data: dict):
if event_type == "OrderCreated":
self.id = data["order_id"]
self.status = "PENDING"
elif event_type == "PaymentReceived":
self.status = "PAID"
elif event_type == "OrderShipped":
self.status = "SHIPPED"
self.version += 1
@classmethod
def from_events(cls, events):
order = cls()
for e in events:
order.apply_event(e["event_type"], e["data"])
return order
5. Kafka 기반 전체 아키텍처
graph TB
Client[클라이언트] --> Gateway[API Gateway]
Gateway --> OrderCmd[주문 Command API]
Gateway --> OrderQry[주문 Query API]
OrderCmd --> WriteDB[(PostgreSQL)]
WriteDB --> CDC[Debezium CDC]
CDC --> Kafka[Apache Kafka]
Kafka --> Saga[Saga Orchestrator]
Kafka --> Projector[CQRS Projector]
Saga --> PaymentSvc[결제 서비스]
Saga --> InventorySvc[재고 서비스]
Saga --> DeliverySvc[배송 서비스]
Projector --> ReadDB[(Elasticsearch)]
OrderQry --> ReadDB
PaymentSvc --> Kafka
InventorySvc --> Kafka
DeliverySvc --> Kafka
style Kafka fill:#ff9,stroke:#333
style Saga fill:#f96,stroke:#333
6. 실전 고려사항
6.1 멱등성 (Idempotency)
이벤트 중복 처리 방지
class IdempotentConsumer:
def __init__(self, redis_client):
self.redis = redis_client
def process_event(self, event_id: str, handler):
key = f"processed:{event_id}"
if self.redis.setnx(key, "1"):
self.redis.expire(key, 86400) # 24시간 TTL
handler()
else:
pass # 이미 처리됨, 스킵
6.2 이벤트 순서 보장
Kafka 파티션 키로 순서 보장
producer.send(
"order.events",
key=order_id.encode(), # 같은 주문 → 같은 파티션 → 순서 보장
value=json.dumps(event).encode()
)
7. 퀴즈
읽기/쓰기의 **요구사항이 근본적으로 다르기 때문**. 쓰기는 정규화+트랜잭션, 읽기는 비정규화+성능 최적화. 각각에 최적화된 저장소와 모델 사용 가능.
**Choreography**: 각 서비스가 이벤트를 발행/구독하여 자율적으로 동작. **Orchestration**: 중앙 Orchestrator가 각 서비스에 명시적으로 명령. Orchestration이 흐름 파악과 관리가 용이.
Saga에서 한 단계 실패 시, 이미 완료된 단계를 **역순으로 취소**하는 트랜잭션. 예: 배송 실패 → 재고 예약 취소 → 결제 환불.
(1) 완전한 **감사 추적(Audit Trail)** (2) 이벤트 리플레이로 **시점 복구** 가능 (3) **새로운 Read Model을 과거 이벤트로 구축** 가능.
네트워크 장애, 재시도, 리밸런싱 등으로 **같은 이벤트가 여러 번 전달**될 수 있음. 멱등하지 않으면 중복 결제, 중복 재고 차감 등 심각한 문제 발생.
**같은 파티션 키**를 사용. 동일 키의 메시지는 같은 파티션으로 전달되어 순서가 보장됨. 예: order_id를 파티션 키로 사용.
**Eventually Consistent** — 최종적 일관성. 조회 시 최신 데이터가 아닐 수 있음. 해결: (1) 쓰기 직후 Write DB에서 직접 조회 (2) 이벤트 처리 완료 확인 후 응답.
현재 단락 (1/263)
모놀리식에서 MSA로 전환하면 **분산 트랜잭션** 문제가 발생한다: