- Published on
Distributed Systems Complete Guide: From CAP Theorem to Distributed ML Training and Kafka
- Authors

- Name
- Youngju Kim
- @fjvbn20031
- Introduction
- 1. Distributed Systems Fundamentals: CAP Theorem and Consistency Models
- 2. Consensus Algorithms: Paxos and Raft
- 3. Distributed Transactions: 2PC, Saga, and CQRS
- 4. Message Queues: Kafka Architecture
- 5. Distributed Storage: Consistent Hashing and Cassandra
- 6. Distributed ML Training: Ring-AllReduce and PyTorch Distributed
- 7. Observability: Tracing, Logs, and Metrics
- Quiz
- Conclusion
Introduction
Distributed systems are the backbone of modern AI infrastructure. Training a large model like GPT-4 requires thousands of GPUs working in concert, and serving millions of inference requests per second demands distributed systems like Kafka and Cassandra.
This guide covers the distributed systems knowledge AI engineers need most — from theoretical foundations like the CAP theorem all the way to practical PyTorch distributed training code.
1. Distributed Systems Fundamentals: CAP Theorem and Consistency Models
CAP Theorem
Proposed by Eric Brewer in 2000, the CAP theorem describes a fundamental limitation of distributed systems.
┌─────────────────────────────────────────────┐
│ CAP Triangle │
│ │
│ Consistency (C) │
│ △ │
│ / \ │
│ / \ │
│ / \ │
│ / CA \ │
│ / (RDBMS)\ │
│ /───────────\ │
│ / CP │ AP \ │
│ / (HBase│(Cassandra) │
│ ▽────────┼────────▽ │
│ Availability Partition Tolerance │
│ (A) (P) │
└─────────────────────────────────────────────┘
No system can simultaneously guarantee all three properties:
- C (Consistency): All nodes read the same data at the same time
- A (Availability): Every request receives a response
- P (Partition Tolerance): The system continues operating despite network partitions
Since network partitions are inevitable in real distributed environments, the practical choice is always CP vs AP.
| System | Choice | Example |
|---|---|---|
| HBase, Zookeeper, etcd | CP | Sacrifice availability during partition |
| Cassandra, DynamoDB | AP | Sacrifice consistency during partition |
| Traditional RDBMS | CA | Does not tolerate partitions |
PACELC: Extending CAP
CAP only covers partition scenarios, but PACELC also captures the Latency vs Consistency tradeoff during normal operation.
P → A or C (during partition)
E → L or C (during normal operation)
Cassandra: PA/EL (availability + low latency first)
etcd/Raft: PC/EC (consistency first)
Consistency Models
Ordered from strongest to weakest:
Linearizability
└── Strongest. All operations appear instantaneous
└── Examples: etcd, Zookeeper
Sequential Consistency
└── All nodes observe operations in the same order
└── Examples: CPU memory model
Causal Consistency
└── Causally related operations maintain order
└── Examples: MongoDB causally consistent sessions
Eventual Consistency
└── All nodes converge to the same value eventually
└── Examples: Cassandra, DNS, S3
2. Consensus Algorithms: Paxos and Raft
The Problem with Paxos
Leslie Lamport introduced Paxos in 1989. It is theoretically sound but notoriously difficult to understand. Diego Ongaro literally titled his Raft paper "In Search of an Understandable Consensus Algorithm" for this reason.
The Raft Algorithm
Raft decomposes distributed consensus into three independent sub-problems:
- Leader Election
- Log Replication
- Safety
Node States
┌──────────┐ timeout ┌──────────────┐ majority ┌────────┐
│ │──────────▶│ Candidate │────────────▶│ │
│ Follower │ └──────────────┘ │ Leader │
│ │◀─────────────────────────────────────── │ │
└──────────┘ receive heartbeat └────────┘
▲ │
└────────────────── heartbeat ──────────────────────┘
Leader Election Process
- A Follower that receives no heartbeat within an election timeout (150-300ms) transitions to Candidate
- It increments its term number, votes for itself, and sends RequestVote RPCs to other nodes
- The first Candidate to receive a majority of votes becomes the new Leader
- The Leader continuously sends AppendEntries (heartbeat) RPCs to prevent new elections
Log Replication
# Distributed locking with etcd
import etcd3
import time
def distributed_lock(etcd_client, lock_name, ttl=10):
"""Distributed lock using Raft-backed etcd"""
lease = etcd_client.lease(ttl)
lock_key = f"/locks/{lock_name}"
# atomic Compare-And-Swap
success, _ = etcd_client.transaction(
compare=[
etcd3.transactions.create(lock_key, '==', 0)
],
success=[
etcd3.transactions.put(lock_key, 'locked', lease=lease)
],
failure=[]
)
if success:
print(f"Lock acquired: {lock_name}")
return lease
else:
print(f"Lock already held: {lock_name}")
return None
def release_lock(etcd_client, lease):
"""Release the distributed lock"""
if lease:
lease.revoke()
print("Lock released")
# Usage
client = etcd3.client(host='localhost', port=2379)
lease = distributed_lock(client, 'my-resource')
if lease:
try:
# Perform work in critical section
time.sleep(5)
finally:
release_lock(client, lease)
How Kubernetes Uses etcd
Kubernetes stores all cluster state in etcd. Write throughput is bounded by Raft consensus, which is why 3 to 5 etcd nodes are recommended for production clusters.
┌─────────────────────────────────────────────────────┐
│ etcd Cluster (3 nodes) │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Leader │───▶│Follower 1│ │Follower 2│ │
│ │ node-1 │───▶│ node-2 │ │ node-3 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ ▲ │ │ │
│ └───────────────┴───────────────┘ │
│ Raft Consensus │
└─────────────────────────────────────────────────────┘
3. Distributed Transactions: 2PC, Saga, and CQRS
2PC (Two-Phase Commit)
Phase 1 (Prepare):
Coordinator ──▶ Participant A: "Are you ready?"
Coordinator ──▶ Participant B: "Are you ready?"
Participant A ──▶ Coordinator: "YES"
Participant B ──▶ Coordinator: "YES"
Phase 2 (Commit):
Coordinator ──▶ Participant A: "Commit!"
Coordinator ──▶ Participant B: "Commit!"
The critical weakness of 2PC: if the Coordinator crashes after Phase 1, Participants are stuck in a blocking state indefinitely.
Saga Pattern
The Saga pattern decomposes a long transaction into a sequence of local transactions. If any step fails, compensating transactions execute in reverse order.
Order Saga:
1. Create Order → compensate: Cancel Order
2. Deduct Inventory → compensate: Restore Inventory
3. Process Payment → compensate: Refund Payment
4. Start Shipping → compensate: Cancel Shipment
from typing import Callable, List
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class SagaStep:
name: str
action: Callable
compensating_action: Callable
class SagaOrchestrator:
"""Saga Orchestrator pattern implementation"""
def __init__(self, steps: List[SagaStep]):
self.steps = steps
self.completed_steps = []
def execute(self, context: dict) -> bool:
for step in self.steps:
try:
logger.info(f"Executing step: {step.name}")
step.action(context)
self.completed_steps.append(step)
logger.info(f"Step completed: {step.name}")
except Exception as e:
logger.error(f"Step failed: {step.name}, error: {e}")
self._compensate(context)
return False
return True
def _compensate(self, context: dict):
"""Execute compensating transactions in reverse order"""
logger.info("Starting compensation...")
for step in reversed(self.completed_steps):
try:
logger.info(f"Compensating: {step.name}")
step.compensating_action(context)
except Exception as e:
logger.error(f"Compensation failed: {step.name}, error: {e}")
raise RuntimeError(f"Saga compensation failed at {step.name}")
# Define the Order Saga
def create_order(ctx):
ctx['order_id'] = 'ORD-001'
print(f"Order created: {ctx['order_id']}")
def cancel_order(ctx):
print(f"Order cancelled: {ctx['order_id']}")
def deduct_inventory(ctx):
ctx['inventory_reserved'] = True
print("Inventory deducted")
def restore_inventory(ctx):
print("Inventory restored")
def process_payment(ctx):
raise Exception("Payment gateway timeout")
def refund_payment(ctx):
print("Payment refunded")
order_saga = SagaOrchestrator([
SagaStep("create_order", create_order, cancel_order),
SagaStep("deduct_inventory", deduct_inventory, restore_inventory),
SagaStep("process_payment", process_payment, refund_payment),
])
context = {"user_id": "user-123", "amount": 50000}
result = order_saga.execute(context)
print(f"Saga result: {'success' if result else 'failed with compensation'}")
Event Sourcing and CQRS
CQRS Architecture:
Command Side (Writes) Query Side (Reads)
───────────────── ──────────────────
POST /orders ──▶ GET /orders/123
PUT /inventory ──▶ GET /inventory/stats
│ ▲
▼ │
Event Store ──── Projection ────────┘
(append-only) (updates read models)
The key idea of event sourcing: store the sequence of events rather than the current state. This enables point-in-time state reconstruction, audit logs, and event replay.
4. Message Queues: Kafka Architecture
Core Kafka Concepts
┌─────────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ │
│ Topic: orders (4 partitions, replication factor: 3) │
│ │
│ Broker 1 Broker 2 Broker 3 │
│ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ P0 L │ │ P0 F │ │ P0 F │ L=Leader, F=Follower │
│ │ P1 F │ │ P1 L │ │ P1 F │ │
│ │ P2 F │ │ P2 F │ │ P2 L │ │
│ │ P3 L │ │ P3 F │ │ P3 F │ │
│ └──────┘ └──────┘ └──────┘ │
│ │
│ Consumer Group A Consumer Group B │
│ Consumer 1: P0, P1 Consumer X: P0, P1, P2, P3 │
│ Consumer 2: P2, P3 │
└─────────────────────────────────────────────────────────────┘
- Partition: The unit of ordering guarantees and parallel processing
- Replication: Partitions replicated across brokers for high availability
- Consumer Group: Each partition is consumed by exactly one consumer per group
Kafka Producer/Consumer in Practice
from confluent_kafka import Producer, Consumer, KafkaError
import json
import time
# Producer configuration
producer_config = {
'bootstrap.servers': 'localhost:9092',
'acks': 'all', # Wait for all ISR replicas to acknowledge
'retries': 3,
'enable.idempotence': True, # Exactly-once semantics
'compression.type': 'snappy',
}
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
producer = Producer(producer_config)
# Publish order events
for i in range(10):
order_event = {
'order_id': f'ORD-{i:04d}',
'user_id': f'user-{i % 5}',
'amount': 10000 * (i + 1),
'timestamp': time.time()
}
producer.produce(
topic='orders',
key=order_event['user_id'], # Same user → same partition
value=json.dumps(order_event),
callback=delivery_report
)
producer.poll(0)
producer.flush()
print("All messages sent")
# Consumer configuration
consumer_config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processor-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # Manual commit for at-least-once
}
consumer = Consumer(consumer_config)
consumer.subscribe(['orders'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaError(msg.error())
event = json.loads(msg.value().decode('utf-8'))
print(f"Processing order: {event['order_id']}, amount: {event['amount']}")
# Commit offset after successful processing
consumer.commit(asynchronous=False)
except KeyboardInterrupt:
pass
finally:
consumer.close()
Kafka Topic YAML Configuration
# Kafka Topic configuration (Strimzi Operator)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: orders
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 12 # Max parallelism = partition count
replicas: 3 # Replication factor
config:
retention.ms: '604800000' # Retain messages for 7 days
min.insync.replicas: '2' # Require 2 ISR replicas for ack
compression.type: snappy
cleanup.policy: delete
Pulsar vs Kafka
| Feature | Kafka | Pulsar |
|---|---|---|
| Storage | Coupled with broker | Separate (Apache BookKeeper) |
| Multi-tenancy | Limited | Native support |
| Geo-replication | MirrorMaker2 | Built-in |
| Latency | ~5ms | ~1ms |
| Operational complexity | Low | Higher |
| Maturity | Very high | High |
5. Distributed Storage: Consistent Hashing and Cassandra
Consistent Hashing
The problem with naive hashing: adding or removing a node remaps nearly all keys. Consistent hashing solution: only a minimal number of keys are remapped when the node set changes.
import hashlib
from sortedcontainers import SortedDict
class ConsistentHashRing:
"""Consistent hashing ring implementation"""
def __init__(self, virtual_nodes=150):
self.virtual_nodes = virtual_nodes
self.ring = SortedDict()
self.nodes = set()
def _hash(self, key: str) -> int:
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_node(self, node: str):
"""Add a node to the ring with virtual nodes"""
self.nodes.add(node)
for i in range(self.virtual_nodes):
virtual_key = f"{node}#{i}"
hash_val = self._hash(virtual_key)
self.ring[hash_val] = node
print(f"Added node: {node} with {self.virtual_nodes} virtual nodes")
def remove_node(self, node: str):
"""Remove a node from the ring"""
self.nodes.discard(node)
keys_to_remove = [k for k, v in self.ring.items() if v == node]
for k in keys_to_remove:
del self.ring[k]
print(f"Removed node: {node}")
def get_node(self, key: str) -> str:
"""Return the node responsible for a key"""
if not self.ring:
raise Exception("Ring is empty")
hash_val = self._hash(key)
idx = self.ring.bisect_left(hash_val)
if idx == len(self.ring):
idx = 0 # Wrap around the ring
return self.ring.values()[idx]
def get_nodes_for_replication(self, key: str, n: int) -> list:
"""Return n distinct nodes for replication"""
if len(self.nodes) < n:
raise Exception(f"Not enough nodes: {len(self.nodes)} < {n}")
hash_val = self._hash(key)
idx = self.ring.bisect_left(hash_val)
result_nodes = []
seen_nodes = set()
total = len(self.ring)
for i in range(total):
curr_idx = (idx + i) % total
node = self.ring.values()[curr_idx]
if node not in seen_nodes:
seen_nodes.add(node)
result_nodes.append(node)
if len(result_nodes) == n:
break
return result_nodes
# Example usage
ring = ConsistentHashRing(virtual_nodes=150)
for i in range(5):
ring.add_node(f"cassandra-node-{i}")
for key in ["user:1001", "order:5555", "product:abc", "session:xyz"]:
node = ring.get_node(key)
replicas = ring.get_nodes_for_replication(key, 3)
print(f"Key '{key}' -> Primary: {node}, Replicas: {replicas}")
Apache Cassandra Architecture
┌──────────────────────────────────────────────────────────────────┐
│ Cassandra Ring (6 nodes) │
│ │
│ Node 1 │
│ / \ │
│ Node 6 Node 2 │
│ | | │
│ Node 5 Node 3 │
│ \ / │
│ Node 4 │
│ │
│ Write Path: │
│ Client → Coordinator → Hash partition key → Responsible nodes │
│ → CommitLog (WAL) → Memtable → SSTable │
│ │
│ Read Path: │
│ Client → Coordinator → Responsible nodes → Row Cache / Bloom │
│ → Merge SSTables → Return latest data │
└──────────────────────────────────────────────────────────────────┘
Key Cassandra characteristics:
- Tunable Consistency: Choose consistency level per query:
QUORUM,ONE,ALL - Gossip Protocol: Nodes spread state information using an epidemic algorithm
- Compaction: Periodically merges SSTables to improve read performance
6. Distributed ML Training: Ring-AllReduce and PyTorch Distributed
Parameter Server vs All-Reduce
Parameter Server approach:
───────────────────────
Worker 1 ─┐
Worker 2 ─┤── PS (Parameter Server) ──▶ Aggregate + broadcast gradients
Worker 3 ─┘
Problem: PS is a bottleneck. Bandwidth requirement is O(n).
Ring-AllReduce approach:
────────────────────────
Worker 1 ──▶ Worker 2 ──▶ Worker 3
▲ │
└──────────────────────────┘
Each worker communicates only with neighbors.
Total data transferred: 2 * (n-1)/n * data_size (independent of n)
Ring-AllReduce operates in two phases:
- Reduce-Scatter: Each node reduces its chunk and sends it to the next node
- All-Gather: The reduced results are broadcast to all nodes
PyTorch DistributedDataParallel (DDP)
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
def setup(rank, world_size):
"""Initialize distributed training"""
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# NCCL backend: high-speed GPU communication (NVLink, InfiniBand)
dist.init_process_group(
backend='nccl',
rank=rank,
world_size=world_size
)
torch.cuda.set_device(rank)
def cleanup():
dist.destroy_process_group()
class SimpleModel(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(1024, 2048),
nn.ReLU(),
nn.Linear(2048, 1024),
nn.ReLU(),
nn.Linear(1024, 10)
)
def forward(self, x):
return self.layers(x)
def train(rank, world_size, epochs=5):
setup(rank, world_size)
# Place model on GPU, wrap with DDP
model = SimpleModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
# DistributedSampler ensures each process gets a different data shard
dataset = torch.utils.data.TensorDataset(
torch.randn(1000, 1024),
torch.randint(0, 10, (1000,))
)
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)
optimizer = optim.Adam(ddp_model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
for epoch in range(epochs):
sampler.set_epoch(epoch) # Reseed shuffle each epoch
total_loss = 0.0
for batch_x, batch_y in dataloader:
batch_x = batch_x.to(rank)
batch_y = batch_y.to(rank)
optimizer.zero_grad()
output = ddp_model(batch_x)
loss = criterion(output, batch_y)
loss.backward() # Gradient AllReduce happens automatically
optimizer.step()
total_loss += loss.item()
if rank == 0: # Only rank 0 logs
print(f"Epoch {epoch}: Loss = {total_loss/len(dataloader):.4f}")
cleanup()
# Launch: torchrun --nproc_per_node=4 train.py
if __name__ == '__main__':
world_size = torch.cuda.device_count()
torch.multiprocessing.spawn(
train,
args=(world_size,),
nprocs=world_size,
join=True
)
NCCL and Communication Backends
NCCL (NVIDIA Collective Communications Library):
Single node (NVLink):
GPU0 ←──── NVLink ────→ GPU1
│ │
600 GB/s bidirectional bandwidth
Multi-node (InfiniBand + RDMA):
Node 1 ←── IB (200 Gb/s) ──→ Node 2
│ │
RDMA: direct GPU-to-GPU without CPU involvement
Backend selection:
- nccl: GPU distributed training (recommended)
- gloo: CPU training or debugging
- mpi: HPC environments
Horovod Comparison
| Feature | PyTorch DDP | Horovod |
|---|---|---|
| Integration | Built into PyTorch | Separate install |
| Algorithm | Bucketed AllReduce | Ring-AllReduce |
| Frameworks | PyTorch only | TF/PyTorch/MXNet |
| Ease of use | Moderate | Simple (hvd.init()) |
| Performance | Comparable | Comparable |
7. Observability: Tracing, Logs, and Metrics
Distributed Tracing
In a microservices architecture, requests traverse multiple services. Distributed tracing lets you see the entire journey of a single request.
Request flow (traced by Jaeger/Zipkin):
Browser → API GW → Order Svc → Inventory Svc → DB
│ │ │ │ │
│ TraceID: abc123 │ │ │
│ SpanID: 001 SpanID: 002 SpanID: 003 │
│ │ │ │
└──────────────────────┴────────────┴──────────┘
Total latency: 120ms
Order Svc: 80ms (bottleneck!)
OpenTelemetry layers:
- API: write instrumentation code
- SDK: process and export telemetry data
- Exporter: send to Jaeger / Zipkin / OTLP endpoint
ELK Stack Log Aggregation
┌─────────────────────────────────────────────────────┐
│ ELK Stack │
│ │
│ App Logs ──▶ Filebeat ──▶ Logstash ──▶ Elasticsearch│
│ K8s Logs ──▶ Fluentd ──/ ──▶ Kibana │
│ (dashboards)│
│ │
│ Modern alternative: │
│ App Logs ──▶ Promtail ──▶ Loki ──▶ Grafana │
└─────────────────────────────────────────────────────┘
Prometheus + Grafana Metrics
# Prometheus scrape configuration
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['kafka-broker-1:9090', 'kafka-broker-2:9090']
metrics_path: '/metrics'
scrape_interval: 15s
- job_name: 'pytorch-training'
static_configs:
- targets: ['trainer-0:8080', 'trainer-1:8080']
Critical Kafka metrics for Grafana dashboards:
kafka_consumer_lag: Consumer lag (key SLA metric)kafka_network_request_rate: Broker request throughputkafka_log_size: Partition log sizekafka_under_replicated_partitions: Number of under-replicated partitions
Quiz
Q1. When a network partition occurs, what are the tradeoffs a CP system and an AP system each make?
Answer: A CP system sacrifices availability to maintain consistency during a partition. An AP system does the opposite — it sacrifices consistency to maintain availability.
Explanation: When a network partition occurs, nodes on opposite sides of the partition can no longer communicate. A CP system (e.g., etcd, HBase) will refuse requests or return errors rather than risk serving stale data. An AP system (e.g., Cassandra, DynamoDB) continues serving requests from partitioned nodes, meaning different nodes may return different data. Once the partition heals, the system reconciles via eventual consistency.
Q2. Describe the process by which Raft elects a new leader when the current leader fails.
Answer: When a Follower does not receive a heartbeat within its election timeout, it becomes a Candidate, sends RequestVote RPCs, and the first Candidate to receive a majority of votes becomes the new Leader.
Explanation: After the leader fails, Followers wait for their election timeout (typically 150-300ms). The first Follower to time out transitions to Candidate, increments its term, votes for itself, and sends RequestVote RPCs to all other nodes. Each node grants at most one vote per term on a first-come-first-served basis. The first Candidate to accumulate a majority wins and becomes the new Leader. If a split vote occurs, all Candidates time out and restart the election with a new, higher term.
Q3. What happens in Kafka when a consumer group has more consumers than partitions?
Answer: The excess consumers receive no partition assignment and remain idle — they process no messages.
Explanation: In Kafka's consumption model, each partition in a topic can be consumed by at most one consumer within the same consumer group at a time. If there are 4 consumers but only 3 partitions, one consumer will have no partition assigned and will sit idle, consuming resources but doing no work. For this reason, you generally design so the number of consumers does not exceed the number of partitions. To scale throughput further, you must increase the partition count first.
Q4. Why is Ring-AllReduce more communication-efficient than the Parameter Server approach?
Answer: In Ring-AllReduce, each node communicates only with its two neighbors, so total bandwidth usage is constant regardless of the number of nodes. In Parameter Server, all Workers communicate with the PS, making the PS a bandwidth bottleneck that scales linearly with worker count.
Explanation: With n workers in a Parameter Server setup, the PS must exchange gradients with all n workers. As n grows, the PS's required bandwidth grows linearly. In Ring-AllReduce, each worker sends data only to the next worker in the ring. The total data transferred per worker converges to 2 _ (n-1)/n _ data_size, which approaches 2x data_size as n grows — independent of the number of workers. This allows every node's bandwidth to be fully and evenly utilized.
Q5. In the Saga pattern, when is a compensating transaction required?
Answer: When an intermediate step in a Saga fails, compensating transactions are executed for all previously completed steps (in reverse order) to undo their effects.
Explanation: The Saga pattern handles distributed transactions when ACID guarantees are unavailable. Each step commits locally and immediately. For example, in an Order Saga — Create Order → Deduct Inventory → Process Payment — if the payment step fails, a "Refund Payment" compensating transaction runs, followed by a "Restore Inventory" transaction, and finally a "Cancel Order" transaction. Because compensating transactions can also fail, they must be idempotent and designed for retries. Manual intervention may be required if a compensation step fails.
Conclusion
Distributed systems mastery is not acquired overnight. Understanding CAP theorem tradeoffs, implementing Raft consensus by hand, applying Kafka partition strategies to real production services — these skills deepen through experience.
For AI engineers in particular, distributed ML training techniques like Ring-AllReduce and NCCL are becoming increasingly central. Training a model at the scale of GPT-4 requires optimizing the network communication of thousands of GPUs, which ultimately comes down to the core principles of distributed systems.
Recommended learning path: Understand CAP theorem → Read the Raft paper → Operate etcd in production → Get Kafka production experience → Train multi-GPU models with PyTorch DDP