- Published on
Distributed Systems Fundamentals Complete Guide 2025: Consensus Algorithms, Replication, Consistency Models, Fault Tolerance
- Authors

- Name
- Youngju Kim
- @fjvbn20031
Table of Contents
1. Why Distributed Systems
1.1 Why We Need Distributed Systems
There are three fundamental reasons that cannot be solved by a single server.
- Scalability: A single machine has physical limits on CPU, memory, and disk
- Availability: Eliminate single points of failure (SPOF) to ensure service continuity
- Geographic Distribution: Serve users from nearby locations to reduce latency
Core Challenges of Distributed Systems
┌─────────────────────────────────────────────┐
│ │
│ The network is unreliable │
│ ├── Packet loss │
│ ├── Latency variability │
│ └── Network partitions │
│ │
│ Nodes can fail │
│ ├── Crashes │
│ ├── Slowdowns │
│ └── Byzantine faults (malicious behavior) │
│ │
│ No global time exists │
│ ├── Clock drift │
│ ├── NTP errors │
│ └── Difficulty determining event ordering │
│ │
└─────────────────────────────────────────────┘
1.2 The 8 Fallacies of Distributed Computing
Eight false assumptions defined by Peter Deutsch.
| No. | Fallacy | Reality |
|---|---|---|
| 1 | The network is reliable | Packet loss, delays, and partitions occur |
| 2 | Latency is zero | Latency is proportional to physical distance |
| 3 | Bandwidth is infinite | Network bandwidth has limits |
| 4 | The network is secure | Security threats always exist |
| 5 | Topology never changes | Network configuration changes constantly |
| 6 | There is one administrator | Multiple organizations manage the network |
| 7 | Transport cost is zero | Data transfer has costs |
| 8 | The network is homogeneous | Various equipment and protocols coexist |
2. CAP Theorem and PACELC
2.1 CAP Theorem
The CAP theorem, proposed by Eric Brewer in 2000, is a fundamental principle of distributed system design.
CAP Theorem
┌─────────────────────────────────────────────┐
│ │
│ Consistency (C) │
│ /\ │
│ / \ │
│ / \ │
│ / CP \ CA │
│ / systems\ systems │
│ / \ │
│ /____________\ │
│ Availability (A) ─── Partition │
│ Tolerance (P) │
│ │
│ CP: HBase, MongoDB, etcd, ZooKeeper │
│ AP: Cassandra, DynamoDB, CouchDB │
│ CA: Single-node RDBMS (theoretical) │
│ │
│ During partition: must choose C or A │
└─────────────────────────────────────────────┘
Three properties:
- C (Consistency): All nodes see the same data
- A (Availability): Every request receives a response (from non-failing nodes)
- P (Partition Tolerance): System operates despite network partitions
Key insight: Network partitions are inevitable, so P is mandatory. The real choice is CP vs AP.
2.2 PACELC Extension
PACELC, proposed by Daniel Abadi, extends CAP to address trade-offs in normal operation.
PACELC Theorem
if (Partition) then
Availability vs Consistency
else
Latency vs Consistency
┌─────────────────────────────────────────────┐
│ System │ On P │ Normal (E) │
│───────────────│─────────│────────────────│
│ DynamoDB │ PA │ EL (low latency)│
│ Cassandra │ PA │ EL │
│ MongoDB │ PC │ EC (strong) │
│ Google Spanner│ PC │ EC │
│ CockroachDB │ PC │ EL (read opt.) │
│ PNUTS (Yahoo) │ PC │ EL │
└─────────────────────────────────────────────┘
3. Consistency Models
3.1 The Consistency Spectrum
Strong Consistency ◄─────────────────────────► Weak Consistency
Linearizable Sequential Causal Eventual
│ Low perf │ │ │ High perf │
│ Complex impl │ │ │ Simple impl│
│ Strong guar. │ │ │ Weak guar. │
3.2 Linearizability
The strongest consistency guarantee. All operations must match real-time ordering.
Linearizable
Client A: ──write(x=1)──────────────────────
Client B: ────────────read(x)→1─────────────
Client C: ──────────────────read(x)→1───────
All clients always read the latest value after a write completes
NOT Linearizable
Client A: ──write(x=1)──────────────────────
Client B: ────────────read(x)→1─────────────
Client C: ──────────────────read(x)→0─────── ← stale!
3.3 Eventual Consistency
If updates continue without interruption, eventually all replicas will converge to the same state.
# Read pattern assuming Eventual Consistency
class EventuallyConsistentStore:
def __init__(self, replicas: list):
self.replicas = replicas
def write(self, key: str, value: str, timestamp: float):
"""Async write to all replicas"""
for replica in self.replicas:
replica.async_put(key, value, timestamp)
def read(self, key: str) -> str:
"""Read - can read from any replica"""
# Select the value with the latest timestamp (read-repair)
values = [r.get(key) for r in self.replicas]
return max(values, key=lambda v: v.timestamp).value
def read_your_writes(self, key: str, session_token: str) -> str:
"""Guarantee reading your own writes"""
last_write_replica = self.get_replica_for_session(session_token)
return last_write_replica.get(key).value
3.4 Causal Consistency
Only guarantees the ordering of causally related operations.
Causal Consistency Example:
A: write(x=1) <- B reads x=1 then writes y=2
B: read(x)→1, write(y=2)
C: read(y)→2, read(x)→?
With causal consistency guaranteed:
If C can see y=2, it must also see x=1.
(x=1, the causal predecessor of y=2, must be visible first)
3.5 Read-Your-Writes Consistency
You can always read data that you yourself wrote.
Read-Your-Writes
Client A: write(x=1) ───→ read(x)→1 (always guaranteed)
|
reads own write
Session Consistency: guaranteed within the same session
Monotonic Reads: once you see a new value, you never revert to an old one
4. Consensus Algorithms
4.1 Paxos
The first consensus algorithm, proposed by Leslie Lamport.
Basic Paxos (single value consensus)
┌─────────────────────────────────────────────┐
│ │
│ Phase 1a: Prepare │
│ Proposer → Acceptor: prepare(n) │
│ "Please prepare for proposal number n" │
│ │
│ Phase 1b: Promise │
│ Acceptor → Proposer: promise(n, prev_val) │
│ "I will reject anything below n" │
│ │
│ Phase 2a: Accept │
│ Proposer → Acceptor: accept(n, value) │
│ "Please accept this value" │
│ │
│ Phase 2b: Accepted │
│ Acceptor → Learner: accepted(n, value) │
│ "If majority accepts, consensus reached" │
│ │
└─────────────────────────────────────────────┘
4.2 Multi-Paxos
In real systems, Multi-Paxos is used for repeated consensus. It elects a stable leader to skip Phase 1.
4.3 Raft: An Understandable Consensus Algorithm
Raft, proposed by Diego Ongaro in 2014, provides safety equivalent to Paxos while being designed for understandability.
Raft State Transitions
┌────────────────────────────────────────────────┐
│ │
│ ┌──────────┐ timeout ┌──────────┐ │
│ │ Follower │──────────→│ Candidate│ │
│ │ │←──────────│ │ │
│ └──────────┘ fail/discover└────┬─────┘ │
│ ↑ │ │
│ │ majority votes won │ │
│ │ ↓ │
│ │ ┌──────────┐ │
│ └──────────────│ Leader │ │
│ new leader │ │ │
│ └──────────┘ │
│ │
│ Term: │
│ ┌───┬───┬───┬───┬───┐ │
│ │ 1 │ 2 │ 3 │ 4 │ 5 │ ... │
│ └───┴───┴───┴───┴───┘ │
│ At most one Leader per Term │
└────────────────────────────────────────────────┘
Raft Leader Election
Leader Election Process
┌────────────────────────────────────────────────┐
│ 1. Follower heartbeat timeout │
│ 2. Increment Term, vote for self │
│ 3. Send RequestVote RPC to other nodes │
│ 4. Become Leader if majority votes received │
│ │
│ [Node A: Follower] [Node B: Follower] │
│ [Node C: Follower] [Node D: Follower] │
│ [Node E: Follower] │
│ │
│ Node A timeout → Candidate (Term 2) │
│ A→B: RequestVote(Term 2) → B: Grant │
│ A→C: RequestVote(Term 2) → C: Grant │
│ A→D: RequestVote(Term 2) → D: Grant │
│ │
│ A wins majority (3/5) → Leader! │
│ A→all: AppendEntries(heartbeat) │
└────────────────────────────────────────────────┘
Raft Log Replication
Log Replication
┌────────────────────────────────────────────────┐
│ │
│ Client → Leader: write(x=5) │
│ │
│ Leader Log: [1:x=3] [2:y=7] [3:x=5] │
│ Follower A: [1:x=3] [2:y=7] │
│ Follower B: [1:x=3] [2:y=7] │
│ Follower C: [1:x=3] │
│ Follower D: [1:x=3] [2:y=7] │
│ │
│ 1. Leader appends entry to log │
│ 2. Replicates via AppendEntries RPC │
│ 3. Commits when majority confirms replication │
│ 4. Responds to client │
│ 5. Notifies commit in next heartbeat │
│ │
│ Safety: Leader contains all committed entries │
│ (Election Restriction) │
└────────────────────────────────────────────────┘
4.4 Raft Implementation Example (Pseudocode)
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
class NodeState(Enum):
FOLLOWER = "follower"
CANDIDATE = "candidate"
LEADER = "leader"
@dataclass
class LogEntry:
term: int
index: int
command: str
@dataclass
class RaftNode:
node_id: str
state: NodeState = NodeState.FOLLOWER
current_term: int = 0
voted_for: Optional[str] = None
log: list = field(default_factory=list)
commit_index: int = 0
last_applied: int = 0
# Leader-only state
next_index: dict = field(default_factory=dict)
match_index: dict = field(default_factory=dict)
def on_election_timeout(self):
"""Election timeout: transition to Candidate"""
self.state = NodeState.CANDIDATE
self.current_term += 1
self.voted_for = self.node_id
votes_received = 1 # Vote for self
# Send RequestVote to all peers
for peer in self.peers:
response = self.send_request_vote(peer)
if response and response.vote_granted:
votes_received += 1
if votes_received > len(self.peers) // 2:
self.become_leader()
def become_leader(self):
"""Transition to Leader"""
self.state = NodeState.LEADER
for peer in self.peers:
self.next_index[peer] = len(self.log)
self.match_index[peer] = 0
self.send_heartbeats()
def on_append_entries(self, term, leader_id, entries, leader_commit):
"""Handle AppendEntries RPC"""
if term < self.current_term:
return False
self.state = NodeState.FOLLOWER
self.current_term = term
self.reset_election_timer()
# Check log consistency and append entries
self.log.extend(entries)
if leader_commit > self.commit_index:
self.commit_index = min(leader_commit, len(self.log) - 1)
self.apply_committed_entries()
return True
def replicate_log(self, command: str):
"""Handle client command (Leader only)"""
if self.state != NodeState.LEADER:
return False
entry = LogEntry(
term=self.current_term,
index=len(self.log),
command=command
)
self.log.append(entry)
# Replicate to all Followers
ack_count = 1 # Self
for peer in self.peers:
success = self.send_append_entries(peer, [entry])
if success:
ack_count += 1
# Check majority
if ack_count > len(self.peers) // 2:
self.commit_index = entry.index
self.apply_committed_entries()
return True
return False
5. Replication Strategies
5.1 Leader-Follower
Leader-Follower Replication
┌─────────────────────────────────────────────┐
│ │
│ Client ──write──→ ┌────────┐ │
│ │ Leader │ │
│ Client ──read──→ │ (R/W) │ │
│ └───┬────┘ │
│ replicate │ replicate │
│ ┌─────────┼─────────┐ │
│ ↓ ↓ ↓ │
│ ┌────────┐┌────────┐┌────────┐ │
│ │Follower││Follower││Follower│ │
│ │ (Read) ││ (Read) ││ (Read) │ │
│ └────────┘└────────┘└────────┘ │
│ │
│ Synchronous: slow but no data loss │
│ Asynchronous: fast but potential data loss │
│ Semi-synchronous: sync to at least 1 │
└─────────────────────────────────────────────┘
5.2 Multi-Leader
Multi-Leader Replication
┌─────────────────────────────────────────────┐
│ │
│ DC-US DC-EU DC-Asia │
│ ┌────────┐ ┌────────┐ ┌────────┐│
│ │Leader 1│◄─────►│Leader 2│◄────►│Leader 3││
│ │ (R/W) │ │ (R/W) │ │ (R/W) ││
│ └───┬────┘ └───┬────┘ └───┬────┘│
│ │ │ │ │
│ ┌───┴───┐ ┌───┴───┐ ┌───┴───┐ │
│ │Follower│ │Follower│ │Follower│ │
│ └───────┘ └───────┘ └───────┘ │
│ │
│ Pros: reduced write latency, offline ops │
│ Cons: conflict resolution needed (LWW, CRDT)│
└─────────────────────────────────────────────┘
Conflict Resolution Strategies
# Last-Writer-Wins (LWW) - simplest
def lww_resolve(conflict_a, conflict_b):
"""Larger timestamp wins"""
if conflict_a.timestamp > conflict_b.timestamp:
return conflict_a
return conflict_b
# Custom merge function
def custom_merge(conflict_a, conflict_b):
"""Domain-specific merge logic"""
if conflict_a.type == "counter":
return Value(conflict_a.value + conflict_b.value)
elif conflict_a.type == "set":
return Value(conflict_a.value.union(conflict_b.value))
else:
return lww_resolve(conflict_a, conflict_b)
5.3 Leaderless / Quorum
Leaderless Replication (Dynamo Style)
┌─────────────────────────────────────────────┐
│ │
│ N=3 (total replicas), W=2 (write quorum), │
│ R=2 (read quorum) │
│ │
│ Write: Client → Node1 (ack) │
│ → Node2 (ack) ← W=2 met │
│ → Node3 (timeout) │
│ │
│ Read: Client → Node1 (v=5, ts=100) │
│ → Node2 (v=5, ts=100) ← cur │
│ → Node3 (v=3, ts=90) ← old │
│ │
│ Read repair: send latest to stale Node3 │
│ │
│ Quorum condition: W + R > N → at least │
│ one intersection guarantees latest value │
│ │
│ Sloppy Quorum: substitute nodes on failure │
│ (Hinted Handoff) │
└─────────────────────────────────────────────┘
5.4 Chain Replication
Chain Replication
┌─────────────────────────────────────────────┐
│ │
│ Write → [Head] → [Middle] → [Tail] → Read │
│ │
│ Pros: │
│ - Strong consistency guarantee │
│ - Load distribution for reads and writes │
│ - Relatively simple implementation │
│ │
│ Cons: │
│ - Write latency proportional to chain len │
│ - Head/Tail failure requires reconfiguration│
│ │
│ Used in: Azure Storage, HDFS │
└─────────────────────────────────────────────┘
6. Distributed Clocks
6.1 Physical Clocks and NTP
Physical Clock Problems
┌─────────────────────────────────────────────┐
│ │
│ Node A clock: 10:00:00.000 │
│ Node B clock: 10:00:00.150 (150ms off) │
│ Node C clock: 09:59:59.800 (200ms off) │
│ │
│ NTP sync accuracy: typically tens of ms │
│ Google Spanner TrueTime: ~7ms error │
│ │
│ Problem: cannot determine if event A │
│ happened before B using physical clocks │
└─────────────────────────────────────────────┘
6.2 Lamport Timestamps
A logical clock proposed by Leslie Lamport. Guarantees the ordering of causal relationships.
Lamport Timestamp
Rules:
1. Increment local counter on event
2. Include counter when sending message
3. On receive: max(local, received) + 1
Node A: [1] ──msg──→ [2] ────→ [3]
|
Node B: [1] ────→ [3] (max(1,2)+1) → [4]
|
Node C: [1] ──→ [2] ───────────→ [5] (max(2,4)+1)
Limitation: Lamport time alone cannot
distinguish causal relationships
(L(a) < L(b) doesn't mean a caused b)
class LamportClock:
def __init__(self):
self.time = 0
def tick(self) -> int:
"""Local event"""
self.time += 1
return self.time
def send(self) -> int:
"""Send message"""
self.time += 1
return self.time
def receive(self, sender_time: int) -> int:
"""Receive message"""
self.time = max(self.time, sender_time) + 1
return self.time
6.3 Vector Clocks
Vector Clocks
Each node maintains a vector of counters for all nodes
Node A: [A:1, B:0, C:0]
──msg──→
Node B: [A:0, B:1, C:0] → after receive [A:1, B:2, C:0]
──msg──→
Node C: [A:0, B:0, C:1] → after receive [A:1, B:2, C:2]
Causality determination:
V1 < V2 iff for all i: V1[i] <= V2[i] and
for at least one j: V1[j] < V2[j]
V1 || V2 (concurrent) iff neither V1 < V2 nor V2 < V1
class VectorClock:
def __init__(self, node_id: str, nodes: list):
self.node_id = node_id
self.clock = {n: 0 for n in nodes}
def tick(self):
"""Local event"""
self.clock[self.node_id] += 1
def send(self) -> dict:
"""Send message"""
self.clock[self.node_id] += 1
return self.clock.copy()
def receive(self, sender_clock: dict):
"""Receive message"""
for node in self.clock:
self.clock[node] = max(self.clock[node], sender_clock.get(node, 0))
self.clock[self.node_id] += 1
def is_before(self, other_clock: dict) -> bool:
"""Was self before other?"""
all_leq = all(self.clock[n] <= other_clock.get(n, 0) for n in self.clock)
any_lt = any(self.clock[n] < other_clock.get(n, 0) for n in self.clock)
return all_leq and any_lt
def is_concurrent(self, other_clock: dict) -> bool:
"""Are events concurrent?"""
return not self.is_before(other_clock) and not self._is_after(other_clock)
6.4 Hybrid Logical Clocks (HLC)
HLC (Hybrid Logical Clock)
= Combines strengths of physical + logical clocks
Structure: (physical_time, logical_counter)
Advantages:
- Values close to physical time (human-readable)
- Can track causal relationships
- Compensates for NTP errors
Used in: CockroachDB, YugabyteDB
7. Fault Models
7.1 Fault Types
Fault Model Spectrum (by tolerance difficulty)
┌─────────────────────────────────────────────┐
│ │
│ Crash-Stop (simplest) │
│ ├── Node stops and never recovers │
│ ├── Relatively easy to detect │
│ └── Assumed by most consensus algorithms │
│ │
│ Crash-Recovery │
│ ├── Node stops and recovers later │
│ ├── State restored from persistent storage │
│ └── Requires WAL (Write-Ahead Log) │
│ │
│ Omission │
│ ├── Fails to send or receive messages │
│ ├── Includes network partitions │
│ └── Detected by timeouts │
│ │
│ Byzantine (most complex) │
│ ├── Node exhibits arbitrary/malicious behavior│
│ ├── Can send incorrect data │
│ ├── Requires 3f+1 nodes (f: faulty nodes) │
│ └── Primarily used in blockchain │
│ │
└─────────────────────────────────────────────┘
7.2 Failure Detection
class PhiAccrualFailureDetector:
"""Phi Accrual Failure Detector (used in Akka)"""
def __init__(self, threshold: float = 8.0):
self.threshold = threshold
self.heartbeat_intervals = []
self.last_heartbeat = None
def heartbeat(self, timestamp: float):
"""Record heartbeat"""
if self.last_heartbeat:
interval = timestamp - self.last_heartbeat
self.heartbeat_intervals.append(interval)
self.last_heartbeat = timestamp
def phi(self, current_time: float) -> float:
"""Calculate phi value - suspicion level"""
if not self.heartbeat_intervals:
return 0.0
time_since_last = current_time - self.last_heartbeat
mean = sum(self.heartbeat_intervals) / len(self.heartbeat_intervals)
variance = sum((x - mean) ** 2 for x in self.heartbeat_intervals) / len(self.heartbeat_intervals)
std_dev = variance ** 0.5
if std_dev == 0:
return float('inf') if time_since_last > mean else 0.0
y = (time_since_last - mean) / std_dev
return max(0.0, y * 1.5) # Simplified approximation
def is_alive(self, current_time: float) -> bool:
"""Determine if node is alive"""
return self.phi(current_time) < self.threshold
8. Distributed Transactions
8.1 2PC (Two-Phase Commit)
2PC (Two-Phase Commit)
┌────────────────────────────────────────────────┐
│ │
│ Phase 1: Prepare (Vote) │
│ Coordinator → Participant A: prepare? │
│ Coordinator → Participant B: prepare? │
│ Coordinator → Participant C: prepare? │
│ │
│ A → Coordinator: YES (ready) │
│ B → Coordinator: YES │
│ C → Coordinator: YES │
│ │
│ Phase 2: Commit (Execute) │
│ Coordinator → A: commit │
│ Coordinator → B: commit │
│ Coordinator → C: commit │
│ │
│ Problems: │
│ - Coordinator failure blocks participants │
│ - Single point of failure (SPOF) │
│ - Synchronous, performance degradation │
│ │
│ Any NO → Abort │
│ Coordinator failure → uncertain (in-doubt) │
└────────────────────────────────────────────────┘
8.2 3PC (Three-Phase Commit)
3PC: Attempts to solve 2PC blocking problem
Phase 1: CanCommit (Vote)
Phase 2: PreCommit (Pre-commit)
Phase 3: DoCommit (Final commit)
The additional PreCommit phase enables:
- Timeout-based recovery
- But still problematic with network partitions
In practice, Saga pattern is used more often than 3PC
8.3 Saga Pattern
Saga Pattern: Based on compensating transactions
┌────────────────────────────────────────────────┐
│ │
│ Normal flow: │
│ T1 → T2 → T3 → T4 → Done │
│ (Order)(Pay)(Stock)(Ship) │
│ │
│ T3 failure compensation: │
│ T1 → T2 → T3(fail) → C2 → C1 │
│ (Order)(Pay)(StockFail)(Refund)(CancelOrder) │
│ │
│ Choreography (event-driven): │
│ Each service publishes and subscribes events │
│ │
│ Orchestration (centralized): │
│ Saga Orchestrator coordinates each step │
│ │
└────────────────────────────────────────────────┘
# Saga Orchestrator Example
from dataclasses import dataclass
from typing import Callable
from enum import Enum
class SagaStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
COMPENSATING = "compensating"
FAILED = "failed"
@dataclass
class SagaStep:
name: str
action: Callable
compensation: Callable
class SagaOrchestrator:
def __init__(self, steps: list):
self.steps = steps
self.completed_steps = []
self.status = SagaStatus.PENDING
async def execute(self):
"""Execute Saga"""
self.status = SagaStatus.RUNNING
for step in self.steps:
try:
await step.action()
self.completed_steps.append(step)
except Exception as e:
print(f"Step '{step.name}' failed: {e}")
await self.compensate()
return False
self.status = SagaStatus.COMPLETED
return True
async def compensate(self):
"""Execute compensating transactions (reverse order)"""
self.status = SagaStatus.COMPENSATING
for step in reversed(self.completed_steps):
try:
await step.compensation()
print(f"Compensated: {step.name}")
except Exception as e:
print(f"Compensation failed for '{step.name}': {e}")
self.status = SagaStatus.FAILED
return
self.status = SagaStatus.FAILED
# Usage
saga = SagaOrchestrator([
SagaStep("create_order", create_order, cancel_order),
SagaStep("process_payment", charge_payment, refund_payment),
SagaStep("reserve_inventory", reserve_stock, release_stock),
SagaStep("arrange_shipping", book_delivery, cancel_delivery),
])
9. Partitioning and Sharding
9.1 Partitioning Strategies
Range Partitioning
┌─────────────────────────────────────────────┐
│ Key: A-F → Shard 1 │
│ Key: G-N → Shard 2 │
│ Key: O-Z → Shard 3 │
│ │
│ Pros: efficient range queries │
│ Cons: potential hotspots │
└─────────────────────────────────────────────┘
Hash Partitioning
┌─────────────────────────────────────────────┐
│ hash(key) % 3 == 0 → Shard 1 │
│ hash(key) % 3 == 1 → Shard 2 │
│ hash(key) % 3 == 2 → Shard 3 │
│ │
│ Pros: even distribution │
│ Cons: no range queries, costly resharding │
└─────────────────────────────────────────────┘
9.2 Consistent Hashing
Consistent Hashing
┌─────────────────────────────────────────────┐
│ │
│ Node A (0) │
│ / \ │
│ / \ │
│ Node D Node B │
│ (270) (90) │
│ \ / │
│ \ / │
│ Node C (180) │
│ │
│ Key hash → position on ring │
│ Assigned to nearest node clockwise │
│ │
│ On node add/remove: │
│ - Only adjacent nodes affected │
│ - Average K/N keys move (K: total, N: nodes)│
│ │
│ Virtual Nodes: │
│ - Each physical node occupies multiple │
│ positions on the ring │
│ - Even load distribution │
│ │
└─────────────────────────────────────────────┘
import hashlib
from bisect import bisect_right
class ConsistentHashRing:
def __init__(self, virtual_nodes: int = 150):
self.virtual_nodes = virtual_nodes
self.ring = {} # hash -> node
self.sorted_keys = [] # sorted hash values
self.nodes = set()
def _hash(self, key: str) -> int:
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_node(self, node: str):
"""Add a node"""
self.nodes.add(node)
for i in range(self.virtual_nodes):
virtual_key = f"{node}:vn{i}"
h = self._hash(virtual_key)
self.ring[h] = node
self.sorted_keys.append(h)
self.sorted_keys.sort()
def remove_node(self, node: str):
"""Remove a node"""
self.nodes.discard(node)
for i in range(self.virtual_nodes):
virtual_key = f"{node}:vn{i}"
h = self._hash(virtual_key)
del self.ring[h]
self.sorted_keys.remove(h)
def get_node(self, key: str) -> str:
"""Look up node for key"""
if not self.ring:
return None
h = self._hash(key)
idx = bisect_right(self.sorted_keys, h)
if idx >= len(self.sorted_keys):
idx = 0
return self.ring[self.sorted_keys[idx]]
# Usage
ring = ConsistentHashRing(virtual_nodes=150)
ring.add_node("node-1")
ring.add_node("node-2")
ring.add_node("node-3")
print(ring.get_node("user:123")) # → node-2
print(ring.get_node("order:456")) # → node-1
10. Gossip Protocol and Membership
10.1 Gossip Protocol
Gossip (Epidemic) Protocol
┌─────────────────────────────────────────────┐
│ │
│ Periodically propagate state to random node│
│ │
│ Round 1: A → B (A's info) │
│ Round 2: A → C, B → D (A,B info) │
│ Round 3: C → E, D → F (spreading) │
│ ... │
│ Full propagation in O(log N) rounds │
│ │
│ Push: send own info │
│ Pull: request peer's info │
│ Push-Pull: bidirectional (most efficient) │
│ │
│ Use cases: │
│ - Membership management │
│ - Failure detection │
│ - Distributed aggregation │
│ - Amazon DynamoDB, Apache Cassandra │
│ │
└─────────────────────────────────────────────┘
10.2 SWIM Protocol
SWIM (Scalable Weakly-consistent Infection-style Membership)
┌────────────────────────────────────────────────┐
│ │
│ 1. Ping: direct ping to random node │
│ A ──ping──→ B │
│ │
│ 2. Ack: response │
│ A ←──ack── B │
│ │
│ 3. No response: indirect ping (Ping-Req) │
│ A ──ping-req──→ C ──ping──→ B │
│ A ←──ack────── C ←──ack── B │
│ │
│ 4. Still no response: suspect B │
│ 5. After timeout: declare B dead │
│ │
│ Used in: HashiCorp Serf, Consul │
└────────────────────────────────────────────────┘
11. Real-World System Analysis
11.1 Google Spanner
Google Spanner Key Technologies
┌─────────────────────────────────────────────┐
│ │
│ TrueTime API │
│ ├── GPS + atomic clock based │
│ ├── Returns uncertainty interval │
│ ├── Error: within ~7ms │
│ └── Guarantees external consistency │
│ │
│ Paxos-based Replication │
│ ├── Each Spanner server group runs Paxos │
│ ├── Synchronous replication for strong consistency│
│ └── Globally distributable │
│ │
│ Read-Write Transactions │
│ ├── 2PL (Two-Phase Locking) + 2PC │
│ ├── TrueTime assigns commit timestamps │
│ └── Read-only: lock-free snapshot reads │
│ │
└─────────────────────────────────────────────┘
11.2 Amazon DynamoDB (Dynamo Paper)
DynamoDB Key Design Decisions
┌─────────────────────────────────────────────┐
│ │
│ Consistent Hashing + Virtual Nodes │
│ ├── Partition key-based data distribution │
│ └── Automatic rebalancing │
│ │
│ Sloppy Quorum + Hinted Handoff │
│ ├── N=3, W=2, R=2 │
│ ├── Substitute nodes store on failure │
│ └── Forward to original after recovery │
│ │
│ Vector Clocks for conflict detection │
│ ├── Causality tracking │
│ └── Client resolves concurrent writes │
│ │
│ Anti-Entropy (Merkle Tree) │
│ ├── Detect replica inconsistencies │
│ └── Efficient synchronization │
│ │
│ Gossip-based Membership │
│ └── Detect node additions/removals │
│ │
└─────────────────────────────────────────────┘
11.3 Apache Kafka Internals
Kafka Replication Model
┌─────────────────────────────────────────────┐
│ │
│ Topic: orders (Partition 3, RF=3) │
│ │
│ Partition 0: │
│ Leader: Broker 1 │
│ ISR: [Broker 1, Broker 2, Broker 3] │
│ │
│ Partition 1: │
│ Leader: Broker 2 │
│ ISR: [Broker 2, Broker 3, Broker 1] │
│ │
│ Partition 2: │
│ Leader: Broker 3 │
│ ISR: [Broker 3, Broker 1, Broker 2] │
│ │
│ ISR (In-Sync Replicas): │
│ ├── Replicas caught up with leader's log │
│ ├── Removed from ISR if falling behind │
│ └── acks=all → all ISR must confirm │
│ │
│ Controller: │
│ ├── Elected via ZooKeeper/KRaft │
│ ├── Assigns partition leaders │
│ └── Re-elects leaders on broker failure │
│ │
└─────────────────────────────────────────────┘
12. Design Pattern Collection
12.1 Circuit Breaker
import time
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed" # Normal
OPEN = "open" # Blocking
HALF_OPEN = "half_open" # Testing
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5,
recovery_timeout: float = 30.0,
half_open_max: int = 3):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max = half_open_max
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0
self.half_open_count = 0
self.lock = Lock()
def call(self, func, *args, **kwargs):
"""Protected function call"""
with self.lock:
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_count = 0
else:
raise CircuitBreakerOpenError("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e
def _on_success(self):
with self.lock:
if self.state == CircuitState.HALF_OPEN:
self.half_open_count += 1
if self.half_open_count >= self.half_open_max:
self.state = CircuitState.CLOSED
self.failure_count = 0
elif self.state == CircuitState.CLOSED:
self.failure_count = 0
def _on_failure(self):
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
12.2 Retry with Exponential Backoff
import random
import time
from functools import wraps
def retry_with_backoff(max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True):
"""Retry with exponential backoff and jitter"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries:
raise e
delay = min(base_delay * (2 ** attempt), max_delay)
if jitter:
delay = delay * random.uniform(0.5, 1.5)
print(f"Attempt {attempt + 1} failed, "
f"retrying in {delay:.1f}s...")
time.sleep(delay)
return wrapper
return decorator
@retry_with_backoff(max_retries=5, base_delay=0.5)
def call_remote_service():
"""Remote service call"""
pass
13. Quiz
Q1. In the CAP theorem, what is the actual choice you must make?
Since network partitions (P) are inevitable in distributed systems, the real choice is CP vs AP.
- CP choice: Prioritize consistency during partition (reject some requests). Examples: etcd, ZooKeeper, HBase
- AP choice: Prioritize availability during partition (may return stale data). Examples: Cassandra, DynamoDB
The PACELC extension also considers the Latency (L) vs Consistency (C) trade-off during normal operation.
Q2. When is Leader Election needed in Raft?
- Initial cluster start: When no leader exists yet
- Leader failure: When a Follower does not receive the leader's heartbeat within the election timeout
- Network partition: When the leader is separated from a majority of nodes
Process: A Follower transitions to Candidate, increments its Term, and requests votes from other nodes. It becomes the new Leader if it wins a majority. If two Candidates start elections simultaneously (split vote), randomized timeouts resolve it.
Q3. What advantage do Vector Clocks have over Lamport Timestamps?
Lamport timestamps cannot distinguish whether L(a) being less than L(b) means a caused b, or it is merely coincidental. Vector clocks can precisely determine causal relationships.
With vector clocks, you can accurately determine whether two events are causally related or concurrent. This enables detection of concurrent write conflicts and application of appropriate resolution strategies. The downside is that vector size grows proportionally with the number of nodes.
Q4. Why is the Saga pattern used instead of 2PC?
2PC problems:
- Coordinator is a single point of failure (SPOF)
- Participants can be blocked after prepare
- Synchronous, causing performance degradation
- Creates tight coupling in microservice environments
Saga advantages:
- Each service independently executes local transactions
- Asynchronous for high availability
- Compensating transactions for recovery on failure
- Maintains loose coupling
However, Saga only guarantees eventual consistency, and intermediate states may be visible.
Q5. Why are virtual nodes needed in consistent hashing?
Using only physical nodes leads to uneven distribution on the hash ring, causing load imbalance.
Virtual nodes provide:
- Each physical node occupies multiple positions on the ring for even load distribution
- Gradual rebalancing when adding/removing nodes
- Different virtual node counts for heterogeneous hardware to allocate load based on capacity
For example, a server with 2x performance can be assigned 2x virtual nodes to handle more keys.
14. References
- "Designing Data-Intensive Applications" - Martin Kleppmann (essential reading)
- Raft Paper - "In Search of an Understandable Consensus Algorithm" (Diego Ongaro, 2014)
- Paxos Paper - "The Part-Time Parliament" (Leslie Lamport, 1998)
- Dynamo Paper - "Dynamo: Amazon's Highly Available Key-value Store" (2007)
- Google Spanner Paper - "Spanner: Google's Globally-Distributed Database" (2012)
- CAP Theorem Proof - "Brewer's Conjecture and the Feasibility of Consistent, Available, Partition-Tolerant Web Services" (2002)
- PACELC - Daniel Abadi, "Consistency Tradeoffs in Modern Distributed Database System Design"
- Lamport Clocks - "Time, Clocks, and the Ordering of Events in a Distributed System" (1978)
- Vector Clocks - "Timestamps in Message-Passing Systems That Preserve the Partial Ordering" (1988)
- Kafka Documentation - Apache Kafka Official Documentation
- etcd Documentation - etcd.io
- "Distributed Systems for Fun and Profit" - Mikito Takada (free online)
- Jepsen - Kyle Kingsbury's distributed systems verification project