들어가며
마이크로서비스 아키텍처에서 가장 어려운 문제 중 하나가 **분산 트랜잭션**입니다. 모놀리스에서는 데이터베이스의 ACID 트랜잭션으로 해결했지만, 각 서비스가 자체 데이터베이스를 가진 마이크로서비스에서는 2PC(Two-Phase Commit)가 현실적이지 않습니다.
**Saga 패턴**은 긴 트랜잭션을 여러 로컬 트랜잭션으로 분해하고, 실패 시 보상 트랜잭션(Compensating Transaction)으로 일관성을 유지하는 패턴입니다.
왜 2PC가 아닌 Saga인가?
| 항목 | 2PC | Saga |
| --------- | ---------------- | ------------- |
| 정합성 | 강한 정합성 | 결과적 정합성 |
| 가용성 | 낮음 (동기 잠금) | 높음 (비동기) |
| 지연시간 | 높음 | 낮음 |
| 확장성 | 제한적 | 우수 |
| 장애 격리 | 전체 롤백 | 부분 보상 |
주문 시나리오
온라인 쇼핑몰의 주문 처리를 예로 듭니다:
1. **주문 서비스**: 주문 생성
2. **결제 서비스**: 결제 처리
3. **재고 서비스**: 재고 차감
4. **배송 서비스**: 배송 생성
만약 3단계(재고 차감)에서 실패하면, 2단계(결제)를 환불하고 1단계(주문)를 취소해야 합니다.
Choreography 방식 — 이벤트 기반
각 서비스가 이벤트를 발행하고, 다른 서비스가 이벤트를 구독하여 자신의 로컬 트랜잭션을 실행합니다.
이벤트 정의
events.py
from dataclasses import dataclass
from datetime import datetime
@dataclass
class OrderCreated:
order_id: str
customer_id: str
items: list
total_amount: float
timestamp: datetime
@dataclass
class PaymentProcessed:
order_id: str
payment_id: str
amount: float
@dataclass
class PaymentFailed:
order_id: str
reason: str
@dataclass
class InventoryReserved:
order_id: str
items: list
@dataclass
class InventoryInsufficient:
order_id: str
item_id: str
requested: int
available: int
@dataclass
class ShippingCreated:
order_id: str
tracking_number: str
보상 이벤트
@dataclass
class PaymentRefunded:
order_id: str
payment_id: str
amount: float
@dataclass
class InventoryReleased:
order_id: str
items: list
@dataclass
class OrderCancelled:
order_id: str
reason: str
주문 서비스
order_service.py
from kafka import KafkaProducer, KafkaConsumer
class OrderService:
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v).encode()
)
self.db = OrderDB()
def create_order(self, customer_id, items, total_amount):
"""주문 생성 + OrderCreated 이벤트 발행"""
order = self.db.create(
customer_id=customer_id,
items=items,
total_amount=total_amount,
status="PENDING"
)
self.producer.send('order-events', {
'type': 'OrderCreated',
'order_id': order.id,
'customer_id': customer_id,
'items': items,
'total_amount': total_amount
})
return order
def handle_payment_failed(self, event):
"""결제 실패 시 주문 취소"""
self.db.update_status(event['order_id'], "CANCELLED")
self.producer.send('order-events', {
'type': 'OrderCancelled',
'order_id': event['order_id'],
'reason': event['reason']
})
def handle_inventory_insufficient(self, event):
"""재고 부족 시 환불 요청"""
self.db.update_status(event['order_id'], "CANCELLING")
결제 환불 이벤트 발행
self.producer.send('payment-events', {
'type': 'RefundRequested',
'order_id': event['order_id']
})
def handle_shipping_created(self, event):
"""배송 생성 완료 시 주문 확정"""
self.db.update_status(event['order_id'], "CONFIRMED")
결제 서비스
payment_service.py
class PaymentService:
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v).encode()
)
self.db = PaymentDB()
def handle_order_created(self, event):
"""주문 생성 이벤트 수신 -> 결제 처리"""
try:
payment = self.db.process_payment(
order_id=event['order_id'],
customer_id=event['customer_id'],
amount=event['total_amount']
)
self.producer.send('payment-events', {
'type': 'PaymentProcessed',
'order_id': event['order_id'],
'payment_id': payment.id,
'amount': event['total_amount']
})
except InsufficientFundsError as e:
self.producer.send('payment-events', {
'type': 'PaymentFailed',
'order_id': event['order_id'],
'reason': str(e)
})
def handle_refund_requested(self, event):
"""보상 트랜잭션: 환불 처리"""
payment = self.db.get_by_order(event['order_id'])
self.db.refund(payment.id)
self.producer.send('payment-events', {
'type': 'PaymentRefunded',
'order_id': event['order_id'],
'payment_id': payment.id,
'amount': payment.amount
})
Orchestration 방식 — 중앙 코디네이터
하나의 Orchestrator(Saga Coordinator)가 전체 트랜잭션 흐름을 관리합니다.
Temporal로 구현하는 Saga Orchestrator
workflows.py
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta
@workflow.defn
class OrderSagaWorkflow:
"""주문 Saga - Orchestration 방식"""
@workflow.run
async def run(self, order_request: dict) -> dict:
retry_policy = RetryPolicy(
maximum_attempts=3,
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=10),
)
order_id = None
payment_id = None
reservation_id = None
try:
Step 1: 주문 생성
order_id = await workflow.execute_activity(
create_order,
order_request,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=retry_policy,
)
workflow.logger.info(f"Order created: {order_id}")
Step 2: 결제 처리
payment_id = await workflow.execute_activity(
process_payment,
{"order_id": order_id, "amount": order_request["total_amount"]},
start_to_close_timeout=timedelta(seconds=60),
retry_policy=retry_policy,
)
workflow.logger.info(f"Payment processed: {payment_id}")
Step 3: 재고 예약
reservation_id = await workflow.execute_activity(
reserve_inventory,
{"order_id": order_id, "items": order_request["items"]},
start_to_close_timeout=timedelta(seconds=30),
retry_policy=retry_policy,
)
workflow.logger.info(f"Inventory reserved: {reservation_id}")
Step 4: 배송 생성
tracking_number = await workflow.execute_activity(
create_shipment,
{"order_id": order_id, "address": order_request["address"]},
start_to_close_timeout=timedelta(seconds=30),
retry_policy=retry_policy,
)
workflow.logger.info(f"Shipment created: {tracking_number}")
Step 5: 주문 확정
await workflow.execute_activity(
confirm_order,
{"order_id": order_id},
start_to_close_timeout=timedelta(seconds=10),
)
return {
"status": "SUCCESS",
"order_id": order_id,
"tracking_number": tracking_number
}
except Exception as e:
workflow.logger.error(f"Saga failed: {e}")
보상 트랜잭션 실행 (역순)
if reservation_id:
await workflow.execute_activity(
release_inventory,
{"reservation_id": reservation_id},
start_to_close_timeout=timedelta(seconds=30),
)
if payment_id:
await workflow.execute_activity(
refund_payment,
{"payment_id": payment_id},
start_to_close_timeout=timedelta(seconds=60),
)
if order_id:
await workflow.execute_activity(
cancel_order,
{"order_id": order_id, "reason": str(e)},
start_to_close_timeout=timedelta(seconds=10),
)
return {"status": "FAILED", "reason": str(e)}
Activity 구현
activities.py
from temporalio import activity
@activity.defn
async def create_order(request: dict) -> str:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://order-service:8080/orders",
json=request
)
response.raise_for_status()
return response.json()["order_id"]
@activity.defn
async def process_payment(request: dict) -> str:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://payment-service:8080/payments",
json=request
)
response.raise_for_status()
return response.json()["payment_id"]
@activity.defn
async def reserve_inventory(request: dict) -> str:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://inventory-service:8080/reservations",
json=request
)
if response.status_code == 409:
raise InsufficientInventoryError(response.json()["message"])
response.raise_for_status()
return response.json()["reservation_id"]
보상 활동
@activity.defn
async def refund_payment(request: dict) -> None:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://payment-service:8080/refunds",
json=request
)
response.raise_for_status()
@activity.defn
async def release_inventory(request: dict) -> None:
async with httpx.AsyncClient() as client:
response = await client.delete(
f"http://inventory-service:8080/reservations/{request['reservation_id']}"
)
response.raise_for_status()
@activity.defn
async def cancel_order(request: dict) -> None:
async with httpx.AsyncClient() as client:
response = await client.patch(
f"http://order-service:8080/orders/{request['order_id']}/cancel",
json={"reason": request["reason"]}
)
response.raise_for_status()
Temporal Worker 실행
worker.py
from temporalio.client import Client
from temporalio.worker import Worker
async def main():
client = await Client.connect("temporal:7233")
worker = Worker(
client,
task_queue="order-saga",
workflows=[OrderSagaWorkflow],
activities=[
create_order,
process_payment,
reserve_inventory,
create_shipment,
confirm_order,
refund_payment,
release_inventory,
cancel_order,
],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
Choreography vs Orchestration
| 항목 | Choreography | Orchestration |
| ----------- | ---------------------- | --------------------- |
| 결합도 | 느슨함 | Orchestrator에 의존 |
| 복잡도 | 서비스 수 증가 시 급증 | 중앙에서 관리 |
| 가시성 | 이벤트 추적 어려움 | 워크플로 상태 명확 |
| 단일 장애점 | 없음 | Orchestrator |
| 적합한 경우 | 단순한 흐름 (3-4 단계) | 복잡한 흐름 (5+ 단계) |
| 테스트 | 어려움 | 상대적으로 쉬움 |
Semantic Lock 카운터 측정
Saga는 격리성이 없으므로, 동시 실행 시 데이터 이상이 발생할 수 있습니다.
Semantic Lock 패턴
class OrderDB:
def create_with_lock(self, order_data):
"""주문 생성 시 상태를 PENDING으로 설정하여 다른 Saga가 간섭 못하게"""
order = Order(
**order_data,
status="PENDING", # Semantic Lock
saga_id=generate_saga_id()
)
self.session.add(order)
self.session.commit()
return order
def confirm_order(self, order_id, saga_id):
"""Saga 완료 시 락 해제"""
order = self.session.query(Order).filter_by(
id=order_id,
saga_id=saga_id,
status="PENDING"
).first()
if not order:
raise ConcurrencyError("Order already processed by another saga")
order.status = "CONFIRMED"
self.session.commit()
멱등성 보장
보상 트랜잭션이 여러 번 실행될 수 있으므로 멱등성이 필수입니다.
멱등성 키를 활용한 결제 처리
class PaymentService:
def process_payment(self, order_id, amount, idempotency_key):
이미 처리된 요청인지 확인
existing = self.db.find_by_idempotency_key(idempotency_key)
if existing:
return existing # 동일 결과 반환
payment = self.db.create_payment(
order_id=order_id,
amount=amount,
idempotency_key=idempotency_key
)
return payment
정리
Saga 패턴은 마이크로서비스에서 분산 트랜잭션을 처리하는 핵심 패턴입니다:
- **Choreography**: 단순한 흐름에 적합, 이벤트 기반으로 느슨한 결합
- **Orchestration**: 복잡한 흐름에 적합, Temporal 등으로 중앙 관리
- **보상 트랜잭션**: 실패 시 역순으로 복구
- **멱등성**: 재시도 안전성을 위해 필수
- **Semantic Lock**: 동시 Saga 간 간섭 방지
**Q1. 2PC 대신 Saga를 사용하는 주된 이유는?**
2PC는 동기 잠금으로 가용성과 확장성이 제한되지만, Saga는 비동기로 높은 가용성과 확장성을 제공합니다.
**Q2. 보상 트랜잭션(Compensating Transaction)이란?**
Saga의 특정 단계가 실패했을 때, 이전 성공한 단계를 되돌리는 역순 트랜잭션입니다.
**Q3. Choreography 방식의 단점은?**
서비스 수가 증가하면 이벤트 흐름이 복잡해지고, 전체 Saga의 진행 상황을 파악하기 어렵습니다.
**Q4. Temporal이 Saga Orchestration에 적합한 이유는?**
워크플로 상태를 자동으로 영속화하고, 실패 시 재시도와 보상을 선언적으로 관리할 수 있습니다.
**Q5. Semantic Lock 패턴의 목적은?**
PENDING 상태로 다른 Saga가 같은 데이터를 동시에 수정하는 것을 방지합니다.
**Q6. 보상 트랜잭션에서 멱등성이 중요한 이유는?**
네트워크 오류 등으로 보상이 여러 번 실행될 수 있으므로, 같은 결과를 보장해야 합니다.
**Q7. Choreography와 Orchestration 중 어떤 상황에서 Orchestration을 선택해야 하나요?**
5단계 이상의 복잡한 흐름, 조건부 분기가 많은 경우, 전체 진행 상황의 가시성이 필요한 경우입니다.
현재 단락 (1/355)
마이크로서비스 아키텍처에서 가장 어려운 문제 중 하나가 **분산 트랜잭션**입니다. 모놀리스에서는 데이터베이스의 ACID 트랜잭션으로 해결했지만, 각 서비스가 자체 데이터베이스를 ...