Skip to content
Published on

Distributed Systems Complete Guide: From CAP Theorem to Distributed ML Training and Kafka

Authors

Introduction

Distributed systems are the backbone of modern AI infrastructure. Training a large model like GPT-4 requires thousands of GPUs working in concert, and serving millions of inference requests per second demands distributed systems like Kafka and Cassandra.

This guide covers the distributed systems knowledge AI engineers need most — from theoretical foundations like the CAP theorem all the way to practical PyTorch distributed training code.


1. Distributed Systems Fundamentals: CAP Theorem and Consistency Models

CAP Theorem

Proposed by Eric Brewer in 2000, the CAP theorem describes a fundamental limitation of distributed systems.

┌─────────────────────────────────────────────┐
CAP Triangle│                                             │
Consistency (C)│               △                             │
/ \                            │
/   \                           │
/     \                          │
/  CA   \                         │
/  (RDBMS)\                        │
/───────────\                       │
/  CPAP  \                      │
/ (HBase│(Cassandra)│      ▽────────┼────────▽                    │
Availability  Partition Tolerance     (A)              (P)└─────────────────────────────────────────────┘

No system can simultaneously guarantee all three properties:

  • C (Consistency): All nodes read the same data at the same time
  • A (Availability): Every request receives a response
  • P (Partition Tolerance): The system continues operating despite network partitions

Since network partitions are inevitable in real distributed environments, the practical choice is always CP vs AP.

SystemChoiceExample
HBase, Zookeeper, etcdCPSacrifice availability during partition
Cassandra, DynamoDBAPSacrifice consistency during partition
Traditional RDBMSCADoes not tolerate partitions

PACELC: Extending CAP

CAP only covers partition scenarios, but PACELC also captures the Latency vs Consistency tradeoff during normal operation.

PA or C  (during partition)
EL or C  (during normal operation)

Cassandra: PA/EL  (availability + low latency first)
etcd/Raft: PC/EC  (consistency first)

Consistency Models

Ordered from strongest to weakest:

Linearizability
  └── Strongest. All operations appear instantaneous
  └── Examples: etcd, Zookeeper

Sequential Consistency
  └── All nodes observe operations in the same order
  └── Examples: CPU memory model

Causal Consistency
  └── Causally related operations maintain order
  └── Examples: MongoDB causally consistent sessions

Eventual Consistency
  └── All nodes converge to the same value eventually
  └── Examples: Cassandra, DNS, S3

2. Consensus Algorithms: Paxos and Raft

The Problem with Paxos

Leslie Lamport introduced Paxos in 1989. It is theoretically sound but notoriously difficult to understand. Diego Ongaro literally titled his Raft paper "In Search of an Understandable Consensus Algorithm" for this reason.

The Raft Algorithm

Raft decomposes distributed consensus into three independent sub-problems:

  1. Leader Election
  2. Log Replication
  3. Safety

Node States

┌──────────┐  timeout   ┌──────────────┐  majority   ┌────────┐
│          │──────────▶│   Candidate  │────────────▶│        │
Follower │            └──────────────┘             │ Leader│          │◀─────────────────────────────────────── │        │
└──────────┘          receive heartbeat              └────────┘
     ▲                                                    │
     └────────────────── heartbeat ──────────────────────┘

Leader Election Process

  1. A Follower that receives no heartbeat within an election timeout (150-300ms) transitions to Candidate
  2. It increments its term number, votes for itself, and sends RequestVote RPCs to other nodes
  3. The first Candidate to receive a majority of votes becomes the new Leader
  4. The Leader continuously sends AppendEntries (heartbeat) RPCs to prevent new elections

Log Replication

# Distributed locking with etcd
import etcd3
import time

def distributed_lock(etcd_client, lock_name, ttl=10):
    """Distributed lock using Raft-backed etcd"""
    lease = etcd_client.lease(ttl)
    lock_key = f"/locks/{lock_name}"

    # atomic Compare-And-Swap
    success, _ = etcd_client.transaction(
        compare=[
            etcd3.transactions.create(lock_key, '==', 0)
        ],
        success=[
            etcd3.transactions.put(lock_key, 'locked', lease=lease)
        ],
        failure=[]
    )

    if success:
        print(f"Lock acquired: {lock_name}")
        return lease
    else:
        print(f"Lock already held: {lock_name}")
        return None

def release_lock(etcd_client, lease):
    """Release the distributed lock"""
    if lease:
        lease.revoke()
        print("Lock released")

# Usage
client = etcd3.client(host='localhost', port=2379)

lease = distributed_lock(client, 'my-resource')
if lease:
    try:
        # Perform work in critical section
        time.sleep(5)
    finally:
        release_lock(client, lease)

How Kubernetes Uses etcd

Kubernetes stores all cluster state in etcd. Write throughput is bounded by Raft consensus, which is why 3 to 5 etcd nodes are recommended for production clusters.

┌─────────────────────────────────────────────────────┐
│                  etcd Cluster (3 nodes)│                                                     │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐      │
│  │  Leader  │───▶│Follower 1│    │Follower 2│      │
│  │  node-1  │───▶│  node-2  │    │  node-3  │      │
│  └──────────┘    └──────────┘    └──────────┘      │
│       ▲               │               │             │
│       └───────────────┴───────────────┘             │
Raft Consensus└─────────────────────────────────────────────────────┘

3. Distributed Transactions: 2PC, Saga, and CQRS

2PC (Two-Phase Commit)

Phase 1 (Prepare):
  Coordinator ──▶ Participant A: "Are you ready?"
  Coordinator ──▶ Participant B: "Are you ready?"
  Participant A ──▶ Coordinator: "YES"
  Participant B ──▶ Coordinator: "YES"

Phase 2 (Commit):
  Coordinator ──▶ Participant A: "Commit!"
  Coordinator ──▶ Participant B: "Commit!"

The critical weakness of 2PC: if the Coordinator crashes after Phase 1, Participants are stuck in a blocking state indefinitely.

Saga Pattern

The Saga pattern decomposes a long transaction into a sequence of local transactions. If any step fails, compensating transactions execute in reverse order.

Order Saga:
  1. Create Order      → compensate: Cancel Order
  2. Deduct Inventory  → compensate: Restore Inventory
  3. Process Payment   → compensate: Refund Payment
  4. Start Shipping    → compensate: Cancel Shipment
from typing import Callable, List
from dataclasses import dataclass
import logging

logger = logging.getLogger(__name__)

@dataclass
class SagaStep:
    name: str
    action: Callable
    compensating_action: Callable

class SagaOrchestrator:
    """Saga Orchestrator pattern implementation"""

    def __init__(self, steps: List[SagaStep]):
        self.steps = steps
        self.completed_steps = []

    def execute(self, context: dict) -> bool:
        for step in self.steps:
            try:
                logger.info(f"Executing step: {step.name}")
                step.action(context)
                self.completed_steps.append(step)
                logger.info(f"Step completed: {step.name}")
            except Exception as e:
                logger.error(f"Step failed: {step.name}, error: {e}")
                self._compensate(context)
                return False
        return True

    def _compensate(self, context: dict):
        """Execute compensating transactions in reverse order"""
        logger.info("Starting compensation...")
        for step in reversed(self.completed_steps):
            try:
                logger.info(f"Compensating: {step.name}")
                step.compensating_action(context)
            except Exception as e:
                logger.error(f"Compensation failed: {step.name}, error: {e}")
                raise RuntimeError(f"Saga compensation failed at {step.name}")

# Define the Order Saga
def create_order(ctx):
    ctx['order_id'] = 'ORD-001'
    print(f"Order created: {ctx['order_id']}")

def cancel_order(ctx):
    print(f"Order cancelled: {ctx['order_id']}")

def deduct_inventory(ctx):
    ctx['inventory_reserved'] = True
    print("Inventory deducted")

def restore_inventory(ctx):
    print("Inventory restored")

def process_payment(ctx):
    raise Exception("Payment gateway timeout")

def refund_payment(ctx):
    print("Payment refunded")

order_saga = SagaOrchestrator([
    SagaStep("create_order", create_order, cancel_order),
    SagaStep("deduct_inventory", deduct_inventory, restore_inventory),
    SagaStep("process_payment", process_payment, refund_payment),
])

context = {"user_id": "user-123", "amount": 50000}
result = order_saga.execute(context)
print(f"Saga result: {'success' if result else 'failed with compensation'}")

Event Sourcing and CQRS

CQRS Architecture:

  Command Side (Writes)        Query Side (Reads)
  ─────────────────            ──────────────────
  POST /orders          ──▶    GET /orders/123
  PUT  /inventory       ──▶    GET /inventory/stats
       │                              ▲
       ▼                              │
  Event Store ──── Projection ────────┘
  (append-only)    (updates read models)

The key idea of event sourcing: store the sequence of events rather than the current state. This enables point-in-time state reconstruction, audit logs, and event replay.


4. Message Queues: Kafka Architecture

Core Kafka Concepts

┌─────────────────────────────────────────────────────────────┐
Kafka Cluster│                                                             │
Topic: orders (4 partitions, replication factor: 3)│                                                             │
Broker 1      Broker 2      Broker 3│  ┌──────┐      ┌──────┐      ┌──────┐                       │
│  │ P0 L │      │ P0 F │      │ P0 FL=Leader, F=Follower│  │ P1 F │      │ P1 L │      │ P1 F │                       │
│  │ P2 F │      │ P2 F │      │ P2 L │                       │
│  │ P3 L │      │ P3 F │      │ P3 F │                       │
│  └──────┘      └──────┘      └──────┘                       │
│                                                             │
Consumer Group A          Consumer Group BConsumer 1: P0, P1        Consumer X: P0, P1, P2, P3Consumer 2: P2, P3└─────────────────────────────────────────────────────────────┘
  • Partition: The unit of ordering guarantees and parallel processing
  • Replication: Partitions replicated across brokers for high availability
  • Consumer Group: Each partition is consumed by exactly one consumer per group

Kafka Producer/Consumer in Practice

from confluent_kafka import Producer, Consumer, KafkaError
import json
import time

# Producer configuration
producer_config = {
    'bootstrap.servers': 'localhost:9092',
    'acks': 'all',           # Wait for all ISR replicas to acknowledge
    'retries': 3,
    'enable.idempotence': True,   # Exactly-once semantics
    'compression.type': 'snappy',
}

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

producer = Producer(producer_config)

# Publish order events
for i in range(10):
    order_event = {
        'order_id': f'ORD-{i:04d}',
        'user_id': f'user-{i % 5}',
        'amount': 10000 * (i + 1),
        'timestamp': time.time()
    }
    producer.produce(
        topic='orders',
        key=order_event['user_id'],   # Same user → same partition
        value=json.dumps(order_event),
        callback=delivery_report
    )
    producer.poll(0)

producer.flush()
print("All messages sent")

# Consumer configuration
consumer_config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processor-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,   # Manual commit for at-least-once
}

consumer = Consumer(consumer_config)
consumer.subscribe(['orders'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                raise KafkaError(msg.error())

        event = json.loads(msg.value().decode('utf-8'))
        print(f"Processing order: {event['order_id']}, amount: {event['amount']}")

        # Commit offset after successful processing
        consumer.commit(asynchronous=False)

except KeyboardInterrupt:
    pass
finally:
    consumer.close()

Kafka Topic YAML Configuration

# Kafka Topic configuration (Strimzi Operator)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: orders
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 12 # Max parallelism = partition count
  replicas: 3 # Replication factor
  config:
    retention.ms: '604800000' # Retain messages for 7 days
    min.insync.replicas: '2' # Require 2 ISR replicas for ack
    compression.type: snappy
    cleanup.policy: delete

Pulsar vs Kafka

FeatureKafkaPulsar
StorageCoupled with brokerSeparate (Apache BookKeeper)
Multi-tenancyLimitedNative support
Geo-replicationMirrorMaker2Built-in
Latency~5ms~1ms
Operational complexityLowHigher
MaturityVery highHigh

5. Distributed Storage: Consistent Hashing and Cassandra

Consistent Hashing

The problem with naive hashing: adding or removing a node remaps nearly all keys. Consistent hashing solution: only a minimal number of keys are remapped when the node set changes.

import hashlib
from sortedcontainers import SortedDict

class ConsistentHashRing:
    """Consistent hashing ring implementation"""

    def __init__(self, virtual_nodes=150):
        self.virtual_nodes = virtual_nodes
        self.ring = SortedDict()
        self.nodes = set()

    def _hash(self, key: str) -> int:
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

    def add_node(self, node: str):
        """Add a node to the ring with virtual nodes"""
        self.nodes.add(node)
        for i in range(self.virtual_nodes):
            virtual_key = f"{node}#{i}"
            hash_val = self._hash(virtual_key)
            self.ring[hash_val] = node
        print(f"Added node: {node} with {self.virtual_nodes} virtual nodes")

    def remove_node(self, node: str):
        """Remove a node from the ring"""
        self.nodes.discard(node)
        keys_to_remove = [k for k, v in self.ring.items() if v == node]
        for k in keys_to_remove:
            del self.ring[k]
        print(f"Removed node: {node}")

    def get_node(self, key: str) -> str:
        """Return the node responsible for a key"""
        if not self.ring:
            raise Exception("Ring is empty")

        hash_val = self._hash(key)
        idx = self.ring.bisect_left(hash_val)
        if idx == len(self.ring):
            idx = 0  # Wrap around the ring

        return self.ring.values()[idx]

    def get_nodes_for_replication(self, key: str, n: int) -> list:
        """Return n distinct nodes for replication"""
        if len(self.nodes) < n:
            raise Exception(f"Not enough nodes: {len(self.nodes)} < {n}")

        hash_val = self._hash(key)
        idx = self.ring.bisect_left(hash_val)

        result_nodes = []
        seen_nodes = set()
        total = len(self.ring)

        for i in range(total):
            curr_idx = (idx + i) % total
            node = self.ring.values()[curr_idx]
            if node not in seen_nodes:
                seen_nodes.add(node)
                result_nodes.append(node)
                if len(result_nodes) == n:
                    break

        return result_nodes

# Example usage
ring = ConsistentHashRing(virtual_nodes=150)
for i in range(5):
    ring.add_node(f"cassandra-node-{i}")

for key in ["user:1001", "order:5555", "product:abc", "session:xyz"]:
    node = ring.get_node(key)
    replicas = ring.get_nodes_for_replication(key, 3)
    print(f"Key '{key}' -> Primary: {node}, Replicas: {replicas}")

Apache Cassandra Architecture

┌──────────────────────────────────────────────────────────────────┐
Cassandra Ring (6 nodes)│                                                                  │
Node 1/      \                                   │
Node 6      Node 2|            |Node 5      Node 3│                       \      /Node 4│                                                                  │
Write Path:ClientCoordinatorHash partition key → Responsible nodes   │
│                       → CommitLog (WAL)MemtableSSTable│                                                                  │
Read Path:ClientCoordinatorResponsible nodes → Row Cache / Bloom│                       → Merge SSTablesReturn latest data      │
└──────────────────────────────────────────────────────────────────┘

Key Cassandra characteristics:

  • Tunable Consistency: Choose consistency level per query: QUORUM, ONE, ALL
  • Gossip Protocol: Nodes spread state information using an epidemic algorithm
  • Compaction: Periodically merges SSTables to improve read performance

6. Distributed ML Training: Ring-AllReduce and PyTorch Distributed

Parameter Server vs All-Reduce

Parameter Server approach:
  ───────────────────────
  Worker 1 ─┐
  Worker 2 ─┤── PS (Parameter Server) ──▶ Aggregate + broadcast gradients
  Worker 3 ─┘

  Problem: PS is a bottleneck. Bandwidth requirement is O(n).

Ring-AllReduce approach:
  ────────────────────────
  Worker 1 ──▶ Worker 2 ──▶ Worker 3
     ▲                          │
     └──────────────────────────┘

  Each worker communicates only with neighbors.
  Total data transferred: 2 * (n-1)/n * data_size (independent of n)

Ring-AllReduce operates in two phases:

  1. Reduce-Scatter: Each node reduces its chunk and sends it to the next node
  2. All-Gather: The reduced results are broadcast to all nodes

PyTorch DistributedDataParallel (DDP)

import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler

def setup(rank, world_size):
    """Initialize distributed training"""
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # NCCL backend: high-speed GPU communication (NVLink, InfiniBand)
    dist.init_process_group(
        backend='nccl',
        rank=rank,
        world_size=world_size
    )
    torch.cuda.set_device(rank)

def cleanup():
    dist.destroy_process_group()

class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(1024, 2048),
            nn.ReLU(),
            nn.Linear(2048, 1024),
            nn.ReLU(),
            nn.Linear(1024, 10)
        )

    def forward(self, x):
        return self.layers(x)

def train(rank, world_size, epochs=5):
    setup(rank, world_size)

    # Place model on GPU, wrap with DDP
    model = SimpleModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    # DistributedSampler ensures each process gets a different data shard
    dataset = torch.utils.data.TensorDataset(
        torch.randn(1000, 1024),
        torch.randint(0, 10, (1000,))
    )
    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
    dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)

    optimizer = optim.Adam(ddp_model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(epochs):
        sampler.set_epoch(epoch)  # Reseed shuffle each epoch
        total_loss = 0.0

        for batch_x, batch_y in dataloader:
            batch_x = batch_x.to(rank)
            batch_y = batch_y.to(rank)

            optimizer.zero_grad()
            output = ddp_model(batch_x)
            loss = criterion(output, batch_y)
            loss.backward()  # Gradient AllReduce happens automatically
            optimizer.step()
            total_loss += loss.item()

        if rank == 0:  # Only rank 0 logs
            print(f"Epoch {epoch}: Loss = {total_loss/len(dataloader):.4f}")

    cleanup()

# Launch: torchrun --nproc_per_node=4 train.py
if __name__ == '__main__':
    world_size = torch.cuda.device_count()
    torch.multiprocessing.spawn(
        train,
        args=(world_size,),
        nprocs=world_size,
        join=True
    )

NCCL and Communication Backends

NCCL (NVIDIA Collective Communications Library):

  Single node (NVLink):
  GPU0 ←──── NVLink ────→ GPU1
    │                       │
   600 GB/s bidirectional bandwidth

  Multi-node (InfiniBand + RDMA):
  Node 1 ←── IB (200 Gb/s) ──→ Node 2
    │                              │
  RDMA: direct GPU-to-GPU without CPU involvement

Backend selection:
  - nccl:  GPU distributed training (recommended)
  - gloo:  CPU training or debugging
  - mpi:   HPC environments

Horovod Comparison

FeaturePyTorch DDPHorovod
IntegrationBuilt into PyTorchSeparate install
AlgorithmBucketed AllReduceRing-AllReduce
FrameworksPyTorch onlyTF/PyTorch/MXNet
Ease of useModerateSimple (hvd.init())
PerformanceComparableComparable

7. Observability: Tracing, Logs, and Metrics

Distributed Tracing

In a microservices architecture, requests traverse multiple services. Distributed tracing lets you see the entire journey of a single request.

Request flow (traced by Jaeger/Zipkin):

  BrowserAPI GWOrder SvcInventory SvcDB
    │           │          │            │          │
TraceID: abc123     │            │          │
SpanID: 001    SpanID: 002  SpanID: 003    │                      │            │          │
    └──────────────────────┴────────────┴──────────┘
             Total latency: 120ms
             Order Svc: 80ms (bottleneck!)
OpenTelemetry layers:
  - API: write instrumentation code
  - SDK: process and export telemetry data
  - Exporter: send to Jaeger / Zipkin / OTLP endpoint

ELK Stack Log Aggregation

┌─────────────────────────────────────────────────────┐
ELK Stack│                                                     │
App Logs ──▶ Filebeat ──▶ Logstash ──▶ Elasticsearch│
K8s Logs ──▶ Fluentd ──/             ──▶ Kibana                                         (dashboards)│                                                     │
Modern alternative:App Logs ──▶ Promtail ──▶ Loki ──▶ Grafana└─────────────────────────────────────────────────────┘

Prometheus + Grafana Metrics

# Prometheus scrape configuration
scrape_configs:
  - job_name: 'kafka'
    static_configs:
      - targets: ['kafka-broker-1:9090', 'kafka-broker-2:9090']
    metrics_path: '/metrics'
    scrape_interval: 15s

  - job_name: 'pytorch-training'
    static_configs:
      - targets: ['trainer-0:8080', 'trainer-1:8080']

Critical Kafka metrics for Grafana dashboards:

  • kafka_consumer_lag: Consumer lag (key SLA metric)
  • kafka_network_request_rate: Broker request throughput
  • kafka_log_size: Partition log size
  • kafka_under_replicated_partitions: Number of under-replicated partitions

Quiz

Q1. When a network partition occurs, what are the tradeoffs a CP system and an AP system each make?

Answer: A CP system sacrifices availability to maintain consistency during a partition. An AP system does the opposite — it sacrifices consistency to maintain availability.

Explanation: When a network partition occurs, nodes on opposite sides of the partition can no longer communicate. A CP system (e.g., etcd, HBase) will refuse requests or return errors rather than risk serving stale data. An AP system (e.g., Cassandra, DynamoDB) continues serving requests from partitioned nodes, meaning different nodes may return different data. Once the partition heals, the system reconciles via eventual consistency.

Q2. Describe the process by which Raft elects a new leader when the current leader fails.

Answer: When a Follower does not receive a heartbeat within its election timeout, it becomes a Candidate, sends RequestVote RPCs, and the first Candidate to receive a majority of votes becomes the new Leader.

Explanation: After the leader fails, Followers wait for their election timeout (typically 150-300ms). The first Follower to time out transitions to Candidate, increments its term, votes for itself, and sends RequestVote RPCs to all other nodes. Each node grants at most one vote per term on a first-come-first-served basis. The first Candidate to accumulate a majority wins and becomes the new Leader. If a split vote occurs, all Candidates time out and restart the election with a new, higher term.

Q3. What happens in Kafka when a consumer group has more consumers than partitions?

Answer: The excess consumers receive no partition assignment and remain idle — they process no messages.

Explanation: In Kafka's consumption model, each partition in a topic can be consumed by at most one consumer within the same consumer group at a time. If there are 4 consumers but only 3 partitions, one consumer will have no partition assigned and will sit idle, consuming resources but doing no work. For this reason, you generally design so the number of consumers does not exceed the number of partitions. To scale throughput further, you must increase the partition count first.

Q4. Why is Ring-AllReduce more communication-efficient than the Parameter Server approach?

Answer: In Ring-AllReduce, each node communicates only with its two neighbors, so total bandwidth usage is constant regardless of the number of nodes. In Parameter Server, all Workers communicate with the PS, making the PS a bandwidth bottleneck that scales linearly with worker count.

Explanation: With n workers in a Parameter Server setup, the PS must exchange gradients with all n workers. As n grows, the PS's required bandwidth grows linearly. In Ring-AllReduce, each worker sends data only to the next worker in the ring. The total data transferred per worker converges to 2 _ (n-1)/n _ data_size, which approaches 2x data_size as n grows — independent of the number of workers. This allows every node's bandwidth to be fully and evenly utilized.

Q5. In the Saga pattern, when is a compensating transaction required?

Answer: When an intermediate step in a Saga fails, compensating transactions are executed for all previously completed steps (in reverse order) to undo their effects.

Explanation: The Saga pattern handles distributed transactions when ACID guarantees are unavailable. Each step commits locally and immediately. For example, in an Order Saga — Create Order → Deduct Inventory → Process Payment — if the payment step fails, a "Refund Payment" compensating transaction runs, followed by a "Restore Inventory" transaction, and finally a "Cancel Order" transaction. Because compensating transactions can also fail, they must be idempotent and designed for retries. Manual intervention may be required if a compensation step fails.


Conclusion

Distributed systems mastery is not acquired overnight. Understanding CAP theorem tradeoffs, implementing Raft consensus by hand, applying Kafka partition strategies to real production services — these skills deepen through experience.

For AI engineers in particular, distributed ML training techniques like Ring-AllReduce and NCCL are becoming increasingly central. Training a model at the scale of GPT-4 requires optimizing the network communication of thousands of GPUs, which ultimately comes down to the core principles of distributed systems.

Recommended learning path: Understand CAP theorem → Read the Raft paper → Operate etcd in production → Get Kafka production experience → Train multi-GPU models with PyTorch DDP