Build a Distributed Key-Value Store
distributed-systems databases reliability
System Design Deep Dive
Distributed Key-Value Store
Consistency, replication, and partition tolerance - you can only tune the tradeoffs, never escape them
Imagine you need to store hundreds of billions of small records - user sessions, feature flags, shopping cart state, configuration values - and retrieve any of them in under 5ms, 24/7, across data center failures. A single machine fails that requirement on day one. You need to split the data across dozens or hundreds of machines, keep multiple copies of each record in case machines die, and still give clients the illusion of a single coherent store.
Think of a distributed key-value store like a post office with thousands of branches. When you drop a letter (a PUT), the system must decide which branch handles your street address (consistent hashing), make photocopies and store them at nearby branches (replication), and ensure that when you ask “did my letter arrive?” at any branch counter (a GET), you get a truthful answer even if the original branch is temporarily closed. The moment you have three branches handling the same letter, you need rules for what happens when they disagree about whether the letter arrived at all.
The naive approach - round-robin sharding with key % N - breaks catastrophically when N changes. Add one node and every key remaps to a different machine. You trigger a full data migration, overload every machine simultaneously, and black out the system during the move. At 100 req/s and 10 nodes, this is annoying. At 100,000 req/s and 50 nodes it is a company-ending event.
At scale, the hardest part isn’t storage - it’s the geometry of distributed disagreement. When you write a value to three replicas and one replica is temporarily unreachable, do you wait for it (strong consistency, higher latency) or proceed without it (eventual consistency, risk of stale reads)? When the unreachable replica comes back with a conflicting version, whose write wins? We need to solve for partition-tolerant placement, tunable consistency, and convergent conflict resolution simultaneously.
Requirements and Constraints
Functional Requirements
PUT(key, value)- store or update a value for a keyGET(key)- retrieve the value for a key, or a not-found signalDELETE(key)- remove a key from the store- Tunable consistency:
ONE,QUORUM,ALLconsistency levels per operation - Automatic replication of each key to N replicas (configurable, default N=3)
- Transparent routing: any node in the cluster can receive any request and forward it appropriately
- TTL support: keys expire after a configured duration
Non-Functional Requirements
- Latency:
GETatQUORUMconsistency under 5ms p99;PUTunder 10ms p99 - Throughput: 500,000 reads/second and 100,000 writes/second across the cluster
- Durability: survive loss of any single node with zero data loss; survive simultaneous loss of two nodes with no more than a brief availability gap
- Availability: 99.99% uptime (52 minutes downtime/year), tolerate single datacenter failure
- Storage: 10TB usable capacity across the cluster, keys up to 256 bytes, values up to 1MB
- Replication factor: default N=3, configurable per keyspace
Constraints and Assumptions
- Keys are arbitrary byte strings, not structured; no range scans required
- No cross-key transactions or multi-key atomicity
- Clients have access to a cluster membership list or DNS-based discovery
- Single region deployment in v1; multi-region is a future extension
- Clocks are not perfectly synchronized; we cannot rely on wall-clock ordering
High-Level Architecture
The system has five major components that interact along two distinct paths - the write path and the read path - plus a set of background maintenance processes that keep the cluster healthy without operator intervention.
The Client SDK embeds the cluster membership map and implements consistent hash lookups locally. This means clients can compute the correct coordinator node for any key without a central router, eliminating a single point of failure. The SDK refreshes membership from any live node via gossip.
The Coordinator Node receives the client request, looks up the preference list (the N nodes responsible for that key), fans out the write or read to those replicas in parallel, and waits for W (write) or R (read) acknowledgments before responding to the client. Any node in the cluster can act as coordinator for any request.
The Storage Nodes hold the actual data using an LSM-tree (Log-Structured Merge Tree) for write-optimized storage. Each physical node is responsible for many virtual nodes on the hash ring, which enables fine-grained load balancing.
The Gossip Protocol service runs continuously on every node, exchanging membership state with two random peers every second. This ensures every node has an eventually consistent view of which nodes are alive, which have failed, and which recently rejoined.
Anti-entropy repair, hinted handoff, and read repair are background processes that detect and fix divergence between replicas without operator intervention.
The coordinator is not a fixed machine - it is a role that any node temporarily plays for a given request. This is what makes the system leaderless: there is no master bottleneck, and any node failure only affects the requests that node was actively coordinating, which take less than a request timeout to reroute.
The Consistent Hash Ring
The consistent hash ring is the spine of the entire system - it answers the question “which nodes are responsible for key K?” without any central directory lookup.
Think of the hash ring like seats around a circular table numbered 0 to 4 billion. Each physical node claims several seat numbers. When a key arrives, you hash it to get its seat number, then walk clockwise until you find the first occupied seat - that’s the primary replica. The next N-1 occupied seats (still going clockwise) are the other replicas. When a node leaves, only the keys in the arc between that node and its counter-clockwise neighbor need to move - every other key stays put.
The pure consistent hash with one position per physical node has a fatal flaw: the arcs are uneven. One node might own 30% of the ring while another owns 5%, causing severe hot spots. Virtual nodes solve this by giving each physical node 150 positions on the ring. The positions are distributed using deterministic pseudorandom placement: hash("NodeA#0"), hash("NodeA#1"), …, hash("NodeA#149"). With 150 vnodes per physical node, load variance across nodes stays under 3% even with heterogeneous hardware.
# Consistent hashing ring implementation with virtual nodes
import hashlib
import bisect
from typing import Optional
class ConsistentHashRing:
# Demonstrates virtual node placement and key lookup on a consistent hash ring
def __init__(self, nodes: list[str], vnodes_per_node: int = 150):
self.vnodes_per_node = vnodes_per_node
self.ring: list[int] = []
self.ring_map: dict[int, str] = {}
for node in nodes:
self.add_node(node)
def _hash(self, key: str) -> int:
return int(hashlib.md5(key.encode()).hexdigest(), 16) % (2**32)
def add_node(self, node: str) -> None:
for i in range(self.vnodes_per_node):
vnode_key = f"{node}#{i}"
position = self._hash(vnode_key)
self.ring_map[position] = node
self.ring = sorted(self.ring_map.keys())
def remove_node(self, node: str) -> None:
for i in range(self.vnodes_per_node):
vnode_key = f"{node}#{i}"
position = self._hash(vnode_key)
del self.ring_map[position]
self.ring = sorted(self.ring_map.keys())
def get_preference_list(self, key: str, n: int = 3) -> list[str]:
"""Returns the N distinct physical nodes responsible for this key."""
if not self.ring:
return []
position = self._hash(key)
idx = bisect.bisect_right(self.ring, position) % len(self.ring)
nodes_seen: list[str] = []
physical_nodes: set[str] = set()
while len(physical_nodes) < n and len(nodes_seen) < len(self.ring):
ring_pos = self.ring[idx % len(self.ring)]
physical_node = self.ring_map[ring_pos]
if physical_node not in physical_nodes:
physical_nodes.add(physical_node)
nodes_seen.append(physical_node)
idx += 1
return nodes_seen
When a new node joins, the ring is recalculated and roughly 1/N of keys migrate from existing nodes to the new one - no other keys move. This is the core advantage over modular sharding.
Amazon Dynamo (the paper that defined this architecture) uses consistent hashing with virtual nodes, where token assignment is controlled to ensure nodes with more hardware capacity get more vnodes. Apache Cassandra adopted the same approach but later introduced “virtual node tokens” as a first-class configuration parameter - today you set num_tokens: 16 or 256 in cassandra.yaml depending on hardware homogeneity.
Quorum Reads and Writes
Once you know which N nodes hold a key, you need a protocol for reading and writing that balances consistency against availability. Quorum is the mechanism: require acknowledgment from W nodes on write and R nodes on read, where W + R > N guarantees that the read set and write set overlap by at least one node.
With N=3:
W=1, R=1: fastest, but stale reads possible (only one node needed)W=2, R=2(QUORUM): balanced - tolerates one node failure without availability lossW=3, R=1orW=1, R=3: strong consistency but writes or reads fail if any replica down
The intuition: if you write to 2 of 3 nodes and read from 2 of 3 nodes, at least one node must have seen the write (by pigeonhole). That node will return the latest version.
# Quorum coordinator: fan-out write, wait for W acks
import asyncio
from dataclasses import dataclass
from typing import Any
@dataclass
class WriteResult:
success: bool
acked_nodes: list[str]
failed_nodes: list[str]
async def coordinated_put(
key: str,
value: bytes,
vector_clock: dict[str, int],
preference_list: list[str],
W: int,
storage_client,
) -> WriteResult:
# Demonstrates quorum write with parallel fan-out and partial failure handling
acked: list[str] = []
failed: list[str] = []
async def write_to_node(node: str) -> None:
try:
await asyncio.wait_for(
storage_client.put(node, key, value, vector_clock),
timeout=0.05 # 50ms per-node timeout
)
acked.append(node)
except Exception:
failed.append(node)
# Fan out all writes in parallel
await asyncio.gather(
*[write_to_node(node) for node in preference_list],
return_exceptions=True
)
if len(acked) >= W:
return WriteResult(success=True, acked_nodes=acked, failed_nodes=failed)
return WriteResult(success=False, acked_nodes=acked, failed_nodes=failed)
Setting W=1 for fast writes and R=1 for fast reads gives you the worst of both worlds: you get no consistency guarantees AND you are running replicas that do nothing useful. The most dangerous configuration is W+R less than or equal to N with business logic that assumes strong consistency - this causes silent data corruption in production, not crashes.
Vector Clocks
Two clients might concurrently write to the same key on different coordinator nodes before those coordinators have synchronized. Which write wins? Wall-clock time is unreliable in distributed systems - clocks drift, NTP adjustments are not atomic, and two events one millisecond apart on different machines have no guaranteed ordering.
Vector clocks solve this by treating causality as a version number per node. Think of them like a shared calendar with one column per team member - you can only say “event A definitely happened before event B” if every team member’s entry in A’s calendar is less than or equal to their entry in B’s calendar.
# Vector clock implementation with merge and conflict detection
from typing import Optional
from copy import deepcopy
VectorClock = dict[str, int]
def vc_increment(vc: VectorClock, node_id: str) -> VectorClock:
"""Increment this node's counter when it writes."""
updated = deepcopy(vc)
updated[node_id] = updated.get(node_id, 0) + 1
return updated
def vc_merge(vc1: VectorClock, vc2: VectorClock) -> VectorClock:
"""Take element-wise max - used on read to advance clock."""
all_nodes = set(vc1) | set(vc2)
return {n: max(vc1.get(n, 0), vc2.get(n, 0)) for n in all_nodes}
def vc_happens_before(vc1: VectorClock, vc2: VectorClock) -> bool:
"""True if vc1 causally precedes vc2 (vc1 is an ancestor of vc2)."""
all_nodes = set(vc1) | set(vc2)
at_least_one_less = False
for n in all_nodes:
if vc1.get(n, 0) > vc2.get(n, 0):
return False
if vc1.get(n, 0) < vc2.get(n, 0):
at_least_one_less = True
return at_least_one_less
def vc_concurrent(vc1: VectorClock, vc2: VectorClock) -> bool:
"""True if neither clock happened before the other - a conflict."""
return not vc_happens_before(vc1, vc2) and not vc_happens_before(vc2, vc1) and vc1 != vc2
# Example: two concurrent writes create a conflict
vc_client_a = {"node1": 2, "node2": 1}
vc_client_b = {"node1": 1, "node2": 2}
print(vc_concurrent(vc_client_a, vc_client_b)) # True - conflict!
When a conflict is detected, the coordinator returns both versions to the client and lets application logic resolve them. This is what Amazon’s shopping cart used: adding items from two browser tabs would merge the carts rather than silently discarding one. For simple values where last-write-wins is acceptable, the coordinator can resolve automatically using the wall-clock timestamp as a tiebreaker.
Vector clocks do not prevent conflicts - they detect them with certainty. The design decision is whether to resolve conflicts at write time (simpler clients, possible data loss) or at read time (clients bear merge complexity, no silent data loss). Most production KV stores choose write-time LWW (Last-Write-Wins) for simplicity and accept the rare lost update.
Data Model
The storage engine on each node uses an LSM-tree (Log-Structured Merge Tree). Writes go to an in-memory memtable first, then are flushed as immutable SSTables to disk. Reads check the memtable, then walk SSTables newest-first with a Bloom filter to skip files that definitely don’t contain the key.
-- Core keyspace schema stored per storage node
-- Each row represents one version of a key
CREATE TABLE kv_store (
key VARBINARY(256) NOT NULL,
vector_clock JSON NOT NULL, -- {"node1": 3, "node2": 1}
value BYTEA, -- NULL means tombstone (delete)
timestamp_ms BIGINT NOT NULL, -- wall clock for TTL expiry
ttl_ms BIGINT DEFAULT NULL, -- NULL = no expiry
checksum BYTEA NOT NULL, -- CRC32 of value for corruption detection
PRIMARY KEY (key)
);
-- Bloom filter metadata per SSTable segment
CREATE TABLE sstable_metadata (
segment_id BIGINT NOT NULL,
min_key VARBINARY(256) NOT NULL,
max_key VARBINARY(256) NOT NULL,
bloom_filter BYTEA NOT NULL, -- serialized Bloom filter
entry_count BIGINT NOT NULL,
size_bytes BIGINT NOT NULL,
created_at BIGINT NOT NULL,
PRIMARY KEY (segment_id)
);
-- Hinted handoff queue: writes for temporarily unavailable nodes
CREATE TABLE hinted_handoff (
hint_id BIGSERIAL PRIMARY KEY,
target_node VARCHAR(64) NOT NULL,
key VARBINARY(256) NOT NULL,
value BYTEA,
vector_clock JSON NOT NULL,
hint_created BIGINT NOT NULL,
retry_count INT DEFAULT 0
);
CREATE INDEX idx_hints_target ON hinted_handoff(target_node, hint_created);
The partitioning key is the raw key bytes passed to hash(). This gives even distribution but prevents range scans. For workloads that need range scans, a sorted partition scheme (like HBase’s row key ranges) would be needed, at the cost of potential hot spots for sequential writes.
Deletes use tombstones - rather than removing the key from storage, a DELETE writes a special marker with value=NULL. This is critical because a genuine delete must propagate to all replicas; without a tombstone, a stale replica that missed the delete would silently resurrect the key on the next read repair.
Tombstones accumulate until compaction runs. In workloads with frequent deletes and slow compaction (often caused by heavy write load slowing LSM compaction threads), tombstone accumulation causes read amplification - reads scan thousands of tombstones before finding a live value or confirming absence. Cassandra’s “tombstone warnings” exist for exactly this reason.
Key Algorithms and Protocols
Anti-Entropy Repair
Anti-entropy is the process by which replicas that have diverged due to network partitions, node crashes, or slow replication eventually converge back to the same state. The naive approach - comparing every key-value pair between two nodes - is impractical at billions of keys.
The solution is a Merkle tree: build a binary hash tree where leaf nodes are hashes of individual key-value pairs, and each internal node is the hash of its children. To compare two replicas, compare the root hashes. If they match, the replicas are identical (no work needed). If they differ, descend the tree - the subtree with a differing hash contains the diverged keys. This turns an O(N) comparison into O(K log N) where K is the number of diverged keys.
# Merkle tree for anti-entropy between two replicas
import hashlib
from typing import Optional
class MerkleTree:
# Demonstrates Merkle tree construction for efficient replica comparison
def __init__(self, key_value_pairs: list[tuple[bytes, bytes]]):
sorted_pairs = sorted(key_value_pairs, key=lambda x: x[0])
self.leaves = [
hashlib.sha256(k + b":" + v).digest()
for k, v in sorted_pairs
]
self.keys = [k for k, _ in sorted_pairs]
self.tree = self._build(self.leaves)
def _build(self, nodes: list[bytes]) -> list[bytes]:
if len(nodes) == 0:
return [b"\x00" * 32]
if len(nodes) == 1:
return nodes
if len(nodes) % 2 != 0:
nodes = nodes + [nodes[-1]] # duplicate last leaf if odd count
parents = [
hashlib.sha256(nodes[i] + nodes[i + 1]).digest()
for i in range(0, len(nodes), 2)
]
return self._build(parents) + nodes
@property
def root(self) -> bytes:
return self.tree[0]
def diff_ranges(self, other: "MerkleTree") -> list[tuple[bytes, bytes]]:
"""Return key ranges where this tree differs from other. O(K log N)."""
diverged: list[tuple[bytes, bytes]] = []
self._compare(other, 0, 0, len(self.keys), diverged)
return diverged
def _compare(
self, other: "MerkleTree",
node_idx: int, lo: int, hi: int,
diverged: list[tuple[bytes, bytes]]
) -> None:
if lo >= hi or node_idx >= len(self.tree):
return
if self.tree[node_idx] == other.tree[node_idx]:
return # subtree matches - skip entirely
if hi - lo == 1:
diverged.append((self.keys[lo], self.keys[min(lo + 1, len(self.keys) - 1)]))
return
mid = (lo + hi) // 2
left_child = 2 * node_idx + 1
right_child = 2 * node_idx + 2
self._compare(other, left_child, lo, mid, diverged)
self._compare(other, right_child, mid, hi, diverged)
Anti-entropy runs on a schedule (typically once per hour per replica pair) to catch divergence that slipped through read repair or hinted handoff.
Merkle trees make anti-entropy sublinear: comparing two 10-billion-key replicas where only 1,000 keys differ requires examining only about 1,000 * log2(10 billion) ≈ 33,000 tree nodes, not 10 billion key pairs. The tree is rebuilt periodically from the SSTable segments, not maintained in real-time - building it is the expensive part, diffing is cheap.
Hinted Handoff
When a write request arrives but one of the N target replicas is temporarily unreachable, rather than failing the write (which would reduce availability), the coordinator writes the data to an available node along with a hint - metadata saying “this write was meant for Node X, please deliver it when Node X comes back.”
# Hinted handoff: store write on surrogate node with delivery metadata
import json
import time
def store_with_hint(
storage_client,
surrogate_node: str,
intended_node: str,
key: bytes,
value: bytes,
vector_clock: dict[str, int],
hint_ttl_seconds: int = 3600,
) -> None:
# Demonstrates storing a hinted write to a surrogate node
hint_metadata = {
"intended_node": intended_node,
"hint_created_ms": int(time.time() * 1000),
"hint_expires_ms": int((time.time() + hint_ttl_seconds) * 1000),
"original_vc": vector_clock,
}
# Store the value with a special hint-prefix key on the surrogate
hint_key = b"__hint__" + intended_node.encode() + b":" + key
storage_client.put_local(
surrogate_node,
hint_key,
value,
json.dumps(hint_metadata).encode()
)
def replay_hints(storage_client, recovered_node: str) -> int:
# When Node X recovers, replay all hints stored on surrogates
hints = storage_client.scan_hints(prefix=f"__hint__{recovered_node}:")
replayed = 0
for hint_key, value, metadata in hints:
meta = json.loads(metadata)
if int(time.time() * 1000) > meta["hint_expires_ms"]:
storage_client.delete_hint(hint_key)
continue
original_key = hint_key.split(b":", 1)[1]
storage_client.put(recovered_node, original_key, value, meta["original_vc"])
storage_client.delete_hint(hint_key)
replayed += 1
return replayed
Hinted handoff is a “best effort” availability mechanism. Hints are not durable - if the surrogate node also fails before the intended node recovers, the hint is lost. Anti-entropy repair is the fallback that catches these gaps.
Read Repair
Read repair runs opportunistically during a normal GET request. When the coordinator fans out a read to R nodes and collects responses, it compares the vector clocks of all returned values. If any replica returns a stale version (its vector clock is dominated by another replica’s version), the coordinator sends an asynchronous write to the stale replica with the fresher value.
# Read repair: compare versions across replicas and async-update stale ones
async def coordinated_get(
key: str,
preference_list: list[str],
R: int,
storage_client,
) -> tuple[Optional[bytes], dict]:
# Demonstrates read repair: coordinator heals stale replicas on each read
responses: list[tuple[str, bytes, dict]] = []
async def read_from_node(node: str) -> None:
try:
value, vc = await asyncio.wait_for(
storage_client.get(node, key),
timeout=0.05
)
responses.append((node, value, vc))
except Exception:
pass
await asyncio.gather(*[read_from_node(n) for n in preference_list])
if len(responses) < R:
raise Exception(f"Read quorum failed: only {len(responses)}/{R} nodes responded")
# Find the newest version by vector clock dominance
newest_vc = responses[0][2]
newest_value = responses[0][1]
for _, value, vc in responses[1:]:
if vc_happens_before(newest_vc, vc):
newest_vc = vc
newest_value = value
# Async repair: send the newest version to stale replicas
stale_nodes = [
node for node, _, vc in responses
if vc_happens_before(vc, newest_vc)
]
if stale_nodes:
asyncio.create_task(
asyncio.gather(*[
storage_client.put(node, key, newest_value, newest_vc)
for node in stale_nodes
])
)
return newest_value, newest_vc
Read repair works well for hot keys (read frequently, healed quickly) but does nothing for cold keys that are rarely read. Anti-entropy fills that gap.
Apache Cassandra uses all three repair mechanisms in concert: read repair heals actively accessed keys within the read path, hinted handoff covers short node outages (up to 3 hours by default, configurable via max_hint_window_in_ms), and nodetool repair triggers full Merkle tree anti-entropy for cold data. DynamoDB uses a similar combination but hides all three behind the fully-managed control plane - you just see eventual consistency at the application layer.
Scaling and Performance
The system scales horizontally by adding nodes to the ring. Each new node gets assigned 150 virtual nodes (token positions), and keys in those token ranges migrate from existing neighbors. The migration is incremental and online - existing nodes serve requests while streaming data to the new node in the background.
Capacity estimation - 500K reads/sec, 100K writes/sec:
Given:
- 500,000 reads/second at 5ms p99 target
- 100,000 writes/second
- Average value size: 1KB
- Replication factor: 3
- Read/write ratio: 5:1
Write amplification:
100,000 writes/sec * 3 replicas = 300,000 physical writes/sec
300,000 * 1KB = 300 MB/sec ingest to LSM-tree memtables
Read throughput per node (10 nodes):
500,000 / 10 nodes = 50,000 reads/node/sec
With R=2 quorum: 100,000 read RPCs/node/sec (coordinator fans out to 2)
Storage (1 year, no TTL):
100,000 writes/sec * 1KB * 3 replicas * 86400 sec/day * 365 days
= 100,000 * 1000 * 3 * 31,536,000
= ~9.5 PB raw - clearly requires aggressive TTLs or tiered storage
Practical 30-day retention:
100,000 * 1000 * 3 * 2,592,000 = ~777 TB
~78 TB per node at 10 nodes with RF=3
Memory (memtable budget per node):
100MB memtable flush threshold * 4 concurrent flushes = 400MB working set
+ Bloom filters: ~1.2 bytes per key * 1B keys/node = 1.2 GB
Recommended per-node hardware:
32 vCPU, 128 GB RAM, 4x NVMe SSD (10 TB each), 25 Gbps NIC
The dominant bottleneck at high write throughput is LSM compaction. As SSTables accumulate, background compaction merges and sorts them - this process competes with writes for I/O bandwidth. If compaction falls behind, read latency grows because reads must scan more SSTable files.
Mitigation strategies:
- Dedicate 30-40% of I/O bandwidth to compaction using rate limiting (
compaction_throughput_mb_per_sec) - Use tiered compaction for write-heavy workloads (fewer, larger compaction runs)
- Use leveled compaction for read-heavy workloads (bounded read amplification)
- Add nodes before the cluster hits 60% storage utilization - migration at 90% full causes compaction starvation
Facebook’s RocksDB (their LSM-tree engine powering MyRocks, ZippyDB, and others) introduced a “dynamic leveled compaction” strategy that adjusts compaction aggressiveness based on the ratio of uncompacted SSTables to available I/O. At Facebook’s scale, a compaction storm on a single node caused cascading reads to other replicas, which triggered compaction there too - a resonance failure. The fix was adding compaction backpressure that throttles writes when the compaction queue depth exceeds a threshold.
Failure Modes and Recovery
| Failure | Detection | Impact | Recovery |
|---|---|---|---|
| Single node crash | Gossip timeout (10-30 sec) | Reads/writes route to remaining replicas, W/R still satisfied at QUORUM | Hinted handoff delivers missed writes; anti-entropy repairs cold data on rejoin |
| Network partition (split brain) | Gossip detects node as unreachable | Minority partition may reject writes if can’t reach W nodes | Writes to majority partition continue; minority heals via anti-entropy on partition heal |
| Disk corruption | Checksum mismatch on read | Single key returns error; node flags corrupted segment | Read repair provides clean copy from other replicas; corrupted SSTable rebuilt |
| Slow node (GC pause, I/O saturation) | Request timeout on coordinator | Coordinator waits up to 50ms then falls back to other replicas | Hedged reads (send to R+1 nodes, use first R responses) absorb slow outliers |
| Clock skew | NTP divergence metrics | LWW conflicts resolved by wrong winner (rare) | Vector clocks provide true causality; LWW is fallback only for genuine conflicts |
| Coordinator failure mid-write | Client timeout; no ACK received | Write may be partially applied (some replicas got it, others didn’t) | Client retries with same vector clock; idempotent replicas detect duplicate and skip |
The most dangerous operational mistake is disabling anti-entropy repair to reduce background I/O during peak traffic, then never re-enabling it. Stale replicas accumulate silently. When a node crashes and is replaced, the replacement node bootstraps from peers - if those peers have months of silent divergence, the new node gets inconsistent data that read repair will never fix (because nobody reads those cold keys). Repair must run continuously at low priority, not periodically at high priority.
Comparison of Approaches
| Approach | Consistency | Latency | Complexity | Best Fit |
|---|---|---|---|---|
| Single-node Redis | Strong | Sub-ms | Low | Session state under 100K QPS, single AZ |
| Modulo sharding + replication | Strong per shard | Low | Medium | Fixed cluster size, rare resharding |
| Consistent hashing + quorum (this design) | Tunable (ONE/QUORUM/ALL) | Low-Medium (5-10ms) | High | High write volume, elastic scaling, multi-AZ |
| Raft-based KV (etcd, TiKV) | Strong (linearizable) | Medium (20-50ms) | Very High | Config store, leader election, metadata |
| DynamoDB / Cassandra (managed) | Tunable | Low | Low (ops) | General purpose, serverless scaling |
The tunable consistency model (this design) is the right choice when you are building a general-purpose cache or session store where occasional stale reads are acceptable, write throughput must scale horizontally, and you cannot afford the latency penalty of linearizable consensus. If you need transactions or linearizability (financial balances, inventory counts), use a Raft-based system like TiKV or CockroachDB - the quorum model described here does not give you those guarantees.
For most web application workloads - user profiles, session state, feature flags, rate-limit counters - QUORUM reads and writes give you the right tradeoff: reads see the latest write within one replication cycle (milliseconds), and you can survive any single node failure without impacting clients.
Key Takeaways
- Consistent hashing eliminates the full data migration problem on cluster resize - only 1/N of keys move when adding one node to an N-node cluster.
- Virtual nodes solve load imbalance inherent in pure consistent hashing - 150 vnodes per physical node reduces load variance below 3%.
- Quorum (W + R > N) guarantees that read and write sets overlap, providing a tunable consistency/availability tradeoff without a central coordinator.
- Vector clocks detect concurrent writes with certainty by tracking causality per node, enabling conflict detection without relying on unreliable wall-clock ordering.
- Hinted handoff recovers availability during short node outages by temporarily storing writes on surrogate nodes, enabling the system to meet quorum without the intended replica.
- Read repair heals stale replicas opportunistically during normal reads - hot keys stay fresh with zero extra background load.
- Anti-entropy with Merkle trees repairs cold data divergence efficiently by comparing replica contents in O(K log N) time rather than comparing every key.
- Tombstones for deletes prevent deleted keys from being resurrected by stale replicas, at the cost of compaction overhead if deletes are frequent.
The counter-intuitive lesson from building this system is that the write path is simpler than it looks, but the repair path is what makes or breaks production reliability. Any engineer can implement consistent hashing and quorum writes in a weekend. The months of engineering go into the background systems - hinted handoff with bounded storage, Merkle tree construction that doesn’t block reads, read repair that does not amplify load on already-slow nodes. Get those repair mechanisms wrong and you have a KV store that appears healthy until a cascade of node failures reveals years of silent divergence.
Frequently Asked Questions
Q: Why not use a leader-per-shard model (like Kafka partitions) instead of leaderless replication? A: Leader-per-shard gives you stronger consistency guarantees because all writes go through a single leader who applies them in sequence. But it means every write must reach the leader - if the leader is in a different availability zone, you add a cross-AZ round trip to every write. It also makes scaling harder: adding shards requires migrating data and re-electing leaders. Leaderless quorum trades some consistency complexity for lower write latency and simpler elastic scaling.
Q: Won’t vector clocks grow unboundedly as nodes are added and removed?
A: Yes, without pruning. The original Dynamo paper addressed this with a technique called “vector clock truncation” - you keep only the N most recently updated node entries in each clock. This introduces a small risk of missing a conflict (treating a conflict as a clean succession), but in practice node churn is slow enough that the clock size stays bounded. An alternative is using a monotonic epoch counter per write rather than a per-node counter - this is how Cassandra’s TIMESTAMP column works.
Q: Why not just use last-write-wins (LWW) based on timestamps and skip vector clocks entirely? A: LWW with timestamps silently discards one of two concurrent writes. If client A and client B both update a shopping cart at the same millisecond, LWW throws away one update - the user sees their item disappear with no error. Vector clocks surface the conflict so the application can merge both carts. LWW is acceptable for idempotent data (user profile photo URL, feature flag value) but dangerous for additive data (counters, sets, cart items).
Q: How does the cluster handle a prolonged partition (hours or days) rather than a brief node outage? A: Hinted handoff has a TTL (default 3 hours in Cassandra). After that, hints are discarded to prevent unbounded storage growth. For longer partitions, the only recovery path is anti-entropy repair via Merkle tree comparison after the partition heals. If the partition lasts so long that the separated replica’s data is completely stale, it may be faster to decommission and re-bootstrap the node from scratch (streaming all data from its peers).
Q: How do you prevent hot spots when many keys hash to the same virtual node?
A: With 150 virtual nodes per physical node and MD5 hashing, key distribution variance is statistically bounded - less than 3% load imbalance across nodes. For application-level hot keys (a single viral post’s view counter), the solution is at the client layer: spread the key into key#shard0 through key#shardN, write to and sum across all shards. This is the scatter-gather pattern used by high-throughput counters.
Q: What happens if the coordinator crashes mid-write after sending to some but not all replicas? A: The client receives a timeout and retries with the same vector clock. The retry may reach a new coordinator (the client’s SDK routes to the next live node). Because the write is idempotent - replicas check the vector clock and skip writes they already have - the retry succeeds cleanly. Replicas that got the first attempt ignore the duplicate; replicas that missed it apply it now. This is why vector clocks on every write are non-negotiable: they are the idempotency key for the write operation.
Interview Questions
Q: Walk me through what happens when a client sends PUT("user:123", profile_data) to a 5-node cluster with N=3, W=2.
Expected depth: Start with the client SDK hashing the key to find the ring position and deriving the 3-node preference list. Explain that any live node can act as coordinator. Describe the parallel fan-out to all 3 nodes with the current vector clock. Walk through the quorum wait - coordinator responds to client after 2 ACKs, 3rd write completes asynchronously. Discuss what happens if one replica is down: hinted handoff kicks in, coordinator writes to a surrogate and stores a hint for delivery on recovery.
Q: How would you handle a network partition where 2 nodes can see each other but not the other 3?
Expected depth: With N=3, W=2 - the majority partition (3 nodes) can still satisfy quorum and continues serving writes. The minority partition (2 nodes) cannot satisfy W=2 and must reject writes, returning an error to clients. On partition heal, anti-entropy repair runs Merkle tree comparison to find keys that diverged - the minority partition’s writes during the split (if any clients had W=1) must be reconciled using vector clock comparison. Discuss how W=1 on the minority creates write conflicts that vector clocks surface.
Q: A single key is receiving 100,000 writes/second (hot key). How does consistent hashing hurt you here, and what do you do?
Expected depth: Consistent hashing concentrates all writes for a given key to its 3 replicas - adding more nodes to the cluster does not help, because the key still maps to the same ring positions. The fix is client-side sharding: shard the key into key#0 through key#N-1, distribute writes round-robin across all shards, and sum across shards on read. Discuss the read complexity (scatter-gather) and counter semantics. Mention that DynamoDB’s adaptive capacity does this automatically by splitting hot partition keys.
Q: Why does your system use LSM-trees for storage rather than B-trees?
Expected depth: KV stores have write-heavy workloads - LSM-trees amortize random writes by buffering in memory and flushing sequentially to disk, achieving near-sequential write throughput regardless of key distribution. B-trees require random in-place updates on every write, causing write amplification that saturates NVMe IOPS at scale. The tradeoff: LSM-trees have read amplification (must check multiple SSTables) mitigated by Bloom filters. Discuss when B-trees win: workloads with heavy random reads and low write rates, or when read latency must be strictly bounded.
Q: How do you test that your quorum implementation is actually providing the consistency guarantees you claim?
Expected depth: Unit tests are insufficient - they cannot simulate real network partitions. Discuss Jepsen-style testing: inject partitions, pauses, and clock skew using tools like Chaos Monkey or Toxiproxy; run concurrent reads and writes; verify with a consistency checker (Elle) that no linearizability violations occurred at QUORUM level. Mention property-based testing for vector clock merging: generate random concurrent write histories and verify that the merge function always produces a clock that dominates both inputs.
Premium Content
Unlock the full article along with everything else in the archive — all in one place.