- Authors
- Name
- Introduction
- Why Saga Instead of 2PC?
- Order Scenario
- Choreography Approach — Event-Driven
- Orchestration Approach — Central Coordinator
- Choreography vs Orchestration
- Semantic Lock Countermeasure
- Ensuring Idempotency
- Summary
Introduction
One of the hardest problems in microservices architecture is distributed transactions. In a monolith, we solved this with the database's ACID transactions, but in microservices where each service owns its own database, 2PC (Two-Phase Commit) is not practical.
The Saga pattern decomposes a long transaction into multiple local transactions and maintains consistency through compensating transactions when failures occur.
Why Saga Instead of 2PC?
| Aspect | 2PC | Saga |
|---|---|---|
| Consistency | Strong consistency | Eventual consistency |
| Availability | Low (synchronous locks) | High (asynchronous) |
| Latency | High | Low |
| Scalability | Limited | Excellent |
| Fault isolation | Full rollback | Partial compensation |
Order Scenario
Let us use an online shopping order process as an example:
- Order Service: Create the order
- Payment Service: Process payment
- Inventory Service: Deduct inventory
- Shipping Service: Create shipment
If step 3 (inventory deduction) fails, we must refund step 2 (payment) and cancel step 1 (order).
Choreography Approach — Event-Driven
Each service publishes events, and other services subscribe to those events to execute their own local transactions.
Event Definitions
# events.py
from dataclasses import dataclass
from datetime import datetime
@dataclass
class OrderCreated:
order_id: str
customer_id: str
items: list
total_amount: float
timestamp: datetime
@dataclass
class PaymentProcessed:
order_id: str
payment_id: str
amount: float
@dataclass
class PaymentFailed:
order_id: str
reason: str
@dataclass
class InventoryReserved:
order_id: str
items: list
@dataclass
class InventoryInsufficient:
order_id: str
item_id: str
requested: int
available: int
@dataclass
class ShippingCreated:
order_id: str
tracking_number: str
# Compensation events
@dataclass
class PaymentRefunded:
order_id: str
payment_id: str
amount: float
@dataclass
class InventoryReleased:
order_id: str
items: list
@dataclass
class OrderCancelled:
order_id: str
reason: str
Order Service
# order_service.py
import json
from kafka import KafkaProducer, KafkaConsumer
class OrderService:
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v).encode()
)
self.db = OrderDB()
def create_order(self, customer_id, items, total_amount):
"""Create order + publish OrderCreated event"""
order = self.db.create(
customer_id=customer_id,
items=items,
total_amount=total_amount,
status="PENDING"
)
self.producer.send('order-events', {
'type': 'OrderCreated',
'order_id': order.id,
'customer_id': customer_id,
'items': items,
'total_amount': total_amount
})
return order
def handle_payment_failed(self, event):
"""Cancel order on payment failure"""
self.db.update_status(event['order_id'], "CANCELLED")
self.producer.send('order-events', {
'type': 'OrderCancelled',
'order_id': event['order_id'],
'reason': event['reason']
})
def handle_inventory_insufficient(self, event):
"""Request refund on insufficient inventory"""
self.db.update_status(event['order_id'], "CANCELLING")
# Publish payment refund event
self.producer.send('payment-events', {
'type': 'RefundRequested',
'order_id': event['order_id']
})
def handle_shipping_created(self, event):
"""Confirm order when shipping is created"""
self.db.update_status(event['order_id'], "CONFIRMED")
Payment Service
# payment_service.py
class PaymentService:
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v).encode()
)
self.db = PaymentDB()
def handle_order_created(self, event):
"""Receive OrderCreated event -> process payment"""
try:
payment = self.db.process_payment(
order_id=event['order_id'],
customer_id=event['customer_id'],
amount=event['total_amount']
)
self.producer.send('payment-events', {
'type': 'PaymentProcessed',
'order_id': event['order_id'],
'payment_id': payment.id,
'amount': event['total_amount']
})
except InsufficientFundsError as e:
self.producer.send('payment-events', {
'type': 'PaymentFailed',
'order_id': event['order_id'],
'reason': str(e)
})
def handle_refund_requested(self, event):
"""Compensating transaction: process refund"""
payment = self.db.get_by_order(event['order_id'])
self.db.refund(payment.id)
self.producer.send('payment-events', {
'type': 'PaymentRefunded',
'order_id': event['order_id'],
'payment_id': payment.id,
'amount': payment.amount
})
Orchestration Approach — Central Coordinator
A single Orchestrator (Saga Coordinator) manages the entire transaction flow.
Saga Orchestrator with Temporal
# workflows.py
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta
@workflow.defn
class OrderSagaWorkflow:
"""Order Saga - Orchestration approach"""
@workflow.run
async def run(self, order_request: dict) -> dict:
retry_policy = RetryPolicy(
maximum_attempts=3,
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=10),
)
order_id = None
payment_id = None
reservation_id = None
try:
# Step 1: Create order
order_id = await workflow.execute_activity(
create_order,
order_request,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=retry_policy,
)
workflow.logger.info(f"Order created: {order_id}")
# Step 2: Process payment
payment_id = await workflow.execute_activity(
process_payment,
{"order_id": order_id, "amount": order_request["total_amount"]},
start_to_close_timeout=timedelta(seconds=60),
retry_policy=retry_policy,
)
workflow.logger.info(f"Payment processed: {payment_id}")
# Step 3: Reserve inventory
reservation_id = await workflow.execute_activity(
reserve_inventory,
{"order_id": order_id, "items": order_request["items"]},
start_to_close_timeout=timedelta(seconds=30),
retry_policy=retry_policy,
)
workflow.logger.info(f"Inventory reserved: {reservation_id}")
# Step 4: Create shipment
tracking_number = await workflow.execute_activity(
create_shipment,
{"order_id": order_id, "address": order_request["address"]},
start_to_close_timeout=timedelta(seconds=30),
retry_policy=retry_policy,
)
workflow.logger.info(f"Shipment created: {tracking_number}")
# Step 5: Confirm order
await workflow.execute_activity(
confirm_order,
{"order_id": order_id},
start_to_close_timeout=timedelta(seconds=10),
)
return {
"status": "SUCCESS",
"order_id": order_id,
"tracking_number": tracking_number
}
except Exception as e:
workflow.logger.error(f"Saga failed: {e}")
# Execute compensating transactions (reverse order)
if reservation_id:
await workflow.execute_activity(
release_inventory,
{"reservation_id": reservation_id},
start_to_close_timeout=timedelta(seconds=30),
)
if payment_id:
await workflow.execute_activity(
refund_payment,
{"payment_id": payment_id},
start_to_close_timeout=timedelta(seconds=60),
)
if order_id:
await workflow.execute_activity(
cancel_order,
{"order_id": order_id, "reason": str(e)},
start_to_close_timeout=timedelta(seconds=10),
)
return {"status": "FAILED", "reason": str(e)}
Activity Implementation
# activities.py
from temporalio import activity
@activity.defn
async def create_order(request: dict) -> str:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://order-service:8080/orders",
json=request
)
response.raise_for_status()
return response.json()["order_id"]
@activity.defn
async def process_payment(request: dict) -> str:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://payment-service:8080/payments",
json=request
)
response.raise_for_status()
return response.json()["payment_id"]
@activity.defn
async def reserve_inventory(request: dict) -> str:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://inventory-service:8080/reservations",
json=request
)
if response.status_code == 409:
raise InsufficientInventoryError(response.json()["message"])
response.raise_for_status()
return response.json()["reservation_id"]
# Compensation activities
@activity.defn
async def refund_payment(request: dict) -> None:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://payment-service:8080/refunds",
json=request
)
response.raise_for_status()
@activity.defn
async def release_inventory(request: dict) -> None:
async with httpx.AsyncClient() as client:
response = await client.delete(
f"http://inventory-service:8080/reservations/{request['reservation_id']}"
)
response.raise_for_status()
@activity.defn
async def cancel_order(request: dict) -> None:
async with httpx.AsyncClient() as client:
response = await client.patch(
f"http://order-service:8080/orders/{request['order_id']}/cancel",
json={"reason": request["reason"]}
)
response.raise_for_status()
Running the Temporal Worker
# worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
async def main():
client = await Client.connect("temporal:7233")
worker = Worker(
client,
task_queue="order-saga",
workflows=[OrderSagaWorkflow],
activities=[
create_order,
process_payment,
reserve_inventory,
create_shipment,
confirm_order,
refund_payment,
release_inventory,
cancel_order,
],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
Choreography vs Orchestration
| Aspect | Choreography | Orchestration |
|---|---|---|
| Coupling | Loose | Depends on Orchestrator |
| Complexity | Surges as service count grows | Centrally managed |
| Visibility | Hard to trace events | Clear workflow state |
| Single point of failure | None | Orchestrator |
| Best for | Simple flows (3-4 steps) | Complex flows (5+ steps) |
| Testing | Difficult | Relatively easy |
Semantic Lock Countermeasure
Saga lacks isolation, so concurrent executions can cause data anomalies.
# Semantic Lock pattern
class OrderDB:
def create_with_lock(self, order_data):
"""Set status to PENDING on creation to prevent other Sagas from interfering"""
order = Order(
**order_data,
status="PENDING", # Semantic Lock
saga_id=generate_saga_id()
)
self.session.add(order)
self.session.commit()
return order
def confirm_order(self, order_id, saga_id):
"""Release the lock when the Saga completes"""
order = self.session.query(Order).filter_by(
id=order_id,
saga_id=saga_id,
status="PENDING"
).first()
if not order:
raise ConcurrencyError("Order already processed by another saga")
order.status = "CONFIRMED"
self.session.commit()
Ensuring Idempotency
Since compensating transactions may execute multiple times, idempotency is essential.
# Payment processing with idempotency keys
class PaymentService:
def process_payment(self, order_id, amount, idempotency_key):
# Check if request was already processed
existing = self.db.find_by_idempotency_key(idempotency_key)
if existing:
return existing # Return the same result
payment = self.db.create_payment(
order_id=order_id,
amount=amount,
idempotency_key=idempotency_key
)
return payment
Summary
The Saga pattern is the core pattern for handling distributed transactions in microservices:
- Choreography: Suited for simple flows, loosely coupled through events
- Orchestration: Suited for complex flows, centrally managed with tools like Temporal
- Compensating Transactions: Recover in reverse order on failure
- Idempotency: Essential for retry safety
- Semantic Lock: Prevents interference between concurrent Sagas
Quiz: Saga Pattern Comprehension Check (7 Questions)
Q1. What is the main reason for using Saga instead of 2PC?
2PC limits availability and scalability through synchronous locking, while Saga provides high availability and scalability through asynchronous processing.
Q2. What is a compensating transaction?
A reverse transaction that undoes previously completed steps when a specific step in the Saga fails.
Q3. What are the downsides of the Choreography approach?
As the number of services grows, event flows become complex and it becomes difficult to understand the overall Saga's progress.
Q4. Why is Temporal well-suited for Saga Orchestration?
It automatically persists workflow state and allows declarative management of retries and compensation on failure.
Q5. What is the purpose of the Semantic Lock pattern?
It prevents other Sagas from concurrently modifying the same data by using a PENDING status.
Q6. Why is idempotency important in compensating transactions?
Compensation may execute multiple times due to network errors and other issues, so it must guarantee the same result each time.
Q7. In what situations should you choose Orchestration over Choreography?
When you have complex flows with 5 or more steps, many conditional branches, or need visibility into overall progress.