Build a Distributed Key-Value Store


distributed-systems databases reliability

System Design Deep Dive

Distributed Key-Value Store

Consistency, replication, and partition tolerance - you can only tune the tradeoffs, never escape them

⏱ 14 min read📐 Advanced🏗️ Distributed-Systems

Imagine you need to store hundreds of billions of small records - user sessions, feature flags, shopping cart state, configuration values - and retrieve any of them in under 5ms, 24/7, across data center failures. A single machine fails that requirement on day one. You need to split the data across dozens or hundreds of machines, keep multiple copies of each record in case machines die, and still give clients the illusion of a single coherent store.

Think of a distributed key-value store like a post office with thousands of branches. When you drop a letter (a PUT), the system must decide which branch handles your street address (consistent hashing), make photocopies and store them at nearby branches (replication), and ensure that when you ask “did my letter arrive?” at any branch counter (a GET), you get a truthful answer even if the original branch is temporarily closed. The moment you have three branches handling the same letter, you need rules for what happens when they disagree about whether the letter arrived at all.

The naive approach - round-robin sharding with key % N - breaks catastrophically when N changes. Add one node and every key remaps to a different machine. You trigger a full data migration, overload every machine simultaneously, and black out the system during the move. At 100 req/s and 10 nodes, this is annoying. At 100,000 req/s and 50 nodes it is a company-ending event.

At scale, the hardest part isn’t storage - it’s the geometry of distributed disagreement. When you write a value to three replicas and one replica is temporarily unreachable, do you wait for it (strong consistency, higher latency) or proceed without it (eventual consistency, risk of stale reads)? When the unreachable replica comes back with a conflicting version, whose write wins? We need to solve for partition-tolerant placement, tunable consistency, and convergent conflict resolution simultaneously.

Requirements and Constraints

Functional Requirements

  • PUT(key, value) - store or update a value for a key
  • GET(key) - retrieve the value for a key, or a not-found signal
  • DELETE(key) - remove a key from the store
  • Tunable consistency: ONE, QUORUM, ALL consistency levels per operation
  • Automatic replication of each key to N replicas (configurable, default N=3)
  • Transparent routing: any node in the cluster can receive any request and forward it appropriately
  • TTL support: keys expire after a configured duration

Non-Functional Requirements

  • Latency: GET at QUORUM consistency under 5ms p99; PUT under 10ms p99
  • Throughput: 500,000 reads/second and 100,000 writes/second across the cluster
  • Durability: survive loss of any single node with zero data loss; survive simultaneous loss of two nodes with no more than a brief availability gap
  • Availability: 99.99% uptime (52 minutes downtime/year), tolerate single datacenter failure
  • Storage: 10TB usable capacity across the cluster, keys up to 256 bytes, values up to 1MB
  • Replication factor: default N=3, configurable per keyspace

Constraints and Assumptions

  • Keys are arbitrary byte strings, not structured; no range scans required
  • No cross-key transactions or multi-key atomicity
  • Clients have access to a cluster membership list or DNS-based discovery
  • Single region deployment in v1; multi-region is a future extension
  • Clocks are not perfectly synchronized; we cannot rely on wall-clock ordering

High-Level Architecture

The system has five major components that interact along two distinct paths - the write path and the read path - plus a set of background maintenance processes that keep the cluster healthy without operator intervention.

Distributed key-value store architecture overview showing client tier, coordinator layer, storage ring, and background services

The Client SDK embeds the cluster membership map and implements consistent hash lookups locally. This means clients can compute the correct coordinator node for any key without a central router, eliminating a single point of failure. The SDK refreshes membership from any live node via gossip.

The Coordinator Node receives the client request, looks up the preference list (the N nodes responsible for that key), fans out the write or read to those replicas in parallel, and waits for W (write) or R (read) acknowledgments before responding to the client. Any node in the cluster can act as coordinator for any request.

The Storage Nodes hold the actual data using an LSM-tree (Log-Structured Merge Tree) for write-optimized storage. Each physical node is responsible for many virtual nodes on the hash ring, which enables fine-grained load balancing.

The Gossip Protocol service runs continuously on every node, exchanging membership state with two random peers every second. This ensures every node has an eventually consistent view of which nodes are alive, which have failed, and which recently rejoined.

Anti-entropy repair, hinted handoff, and read repair are background processes that detect and fix divergence between replicas without operator intervention.

Key Insight

The coordinator is not a fixed machine - it is a role that any node temporarily plays for a given request. This is what makes the system leaderless: there is no master bottleneck, and any node failure only affects the requests that node was actively coordinating, which take less than a request timeout to reroute.

The Consistent Hash Ring

The consistent hash ring is the spine of the entire system - it answers the question “which nodes are responsible for key K?” without any central directory lookup.

Think of the hash ring like seats around a circular table numbered 0 to 4 billion. Each physical node claims several seat numbers. When a key arrives, you hash it to get its seat number, then walk clockwise until you find the first occupied seat - that’s the primary replica. The next N-1 occupied seats (still going clockwise) are the other replicas. When a node leaves, only the keys in the arc between that node and its counter-clockwise neighbor need to move - every other key stays put.

Consistent hash ring component diagram showing virtual node placement, key mapping, and physical node layout

The pure consistent hash with one position per physical node has a fatal flaw: the arcs are uneven. One node might own 30% of the ring while another owns 5%, causing severe hot spots. Virtual nodes solve this by giving each physical node 150 positions on the ring. The positions are distributed using deterministic pseudorandom placement: hash("NodeA#0"), hash("NodeA#1"), …, hash("NodeA#149"). With 150 vnodes per physical node, load variance across nodes stays under 3% even with heterogeneous hardware.

# Consistent hashing ring implementation with virtual nodes
import hashlib
import bisect
from typing import Optional

class ConsistentHashRing:
    # Demonstrates virtual node placement and key lookup on a consistent hash ring
    def __init__(self, nodes: list[str], vnodes_per_node: int = 150):
        self.vnodes_per_node = vnodes_per_node
        self.ring: list[int] = []
        self.ring_map: dict[int, str] = {}
        for node in nodes:
            self.add_node(node)

    def _hash(self, key: str) -> int:
        return int(hashlib.md5(key.encode()).hexdigest(), 16) % (2**32)

    def add_node(self, node: str) -> None:
        for i in range(self.vnodes_per_node):
            vnode_key = f"{node}#{i}"
            position = self._hash(vnode_key)
            self.ring_map[position] = node
        self.ring = sorted(self.ring_map.keys())

    def remove_node(self, node: str) -> None:
        for i in range(self.vnodes_per_node):
            vnode_key = f"{node}#{i}"
            position = self._hash(vnode_key)
            del self.ring_map[position]
        self.ring = sorted(self.ring_map.keys())

    def get_preference_list(self, key: str, n: int = 3) -> list[str]:
        """Returns the N distinct physical nodes responsible for this key."""
        if not self.ring:
            return []
        position = self._hash(key)
        idx = bisect.bisect_right(self.ring, position) % len(self.ring)
        nodes_seen: list[str] = []
        physical_nodes: set[str] = set()
        while len(physical_nodes) < n and len(nodes_seen) < len(self.ring):
            ring_pos = self.ring[idx % len(self.ring)]
            physical_node = self.ring_map[ring_pos]
            if physical_node not in physical_nodes:
                physical_nodes.add(physical_node)
                nodes_seen.append(physical_node)
            idx += 1
        return nodes_seen

When a new node joins, the ring is recalculated and roughly 1/N of keys migrate from existing nodes to the new one - no other keys move. This is the core advantage over modular sharding.

Real World

Amazon Dynamo (the paper that defined this architecture) uses consistent hashing with virtual nodes, where token assignment is controlled to ensure nodes with more hardware capacity get more vnodes. Apache Cassandra adopted the same approach but later introduced “virtual node tokens” as a first-class configuration parameter - today you set num_tokens: 16 or 256 in cassandra.yaml depending on hardware homogeneity.

Quorum Reads and Writes

Once you know which N nodes hold a key, you need a protocol for reading and writing that balances consistency against availability. Quorum is the mechanism: require acknowledgment from W nodes on write and R nodes on read, where W + R > N guarantees that the read set and write set overlap by at least one node.

With N=3:

  • W=1, R=1: fastest, but stale reads possible (only one node needed)
  • W=2, R=2 (QUORUM): balanced - tolerates one node failure without availability loss
  • W=3, R=1 or W=1, R=3: strong consistency but writes or reads fail if any replica down

The intuition: if you write to 2 of 3 nodes and read from 2 of 3 nodes, at least one node must have seen the write (by pigeonhole). That node will return the latest version.

# Quorum coordinator: fan-out write, wait for W acks
import asyncio
from dataclasses import dataclass
from typing import Any

@dataclass
class WriteResult:
    success: bool
    acked_nodes: list[str]
    failed_nodes: list[str]

async def coordinated_put(
    key: str,
    value: bytes,
    vector_clock: dict[str, int],
    preference_list: list[str],
    W: int,
    storage_client,
) -> WriteResult:
    # Demonstrates quorum write with parallel fan-out and partial failure handling
    acked: list[str] = []
    failed: list[str] = []

    async def write_to_node(node: str) -> None:
        try:
            await asyncio.wait_for(
                storage_client.put(node, key, value, vector_clock),
                timeout=0.05  # 50ms per-node timeout
            )
            acked.append(node)
        except Exception:
            failed.append(node)

    # Fan out all writes in parallel
    await asyncio.gather(
        *[write_to_node(node) for node in preference_list],
        return_exceptions=True
    )

    if len(acked) >= W:
        return WriteResult(success=True, acked_nodes=acked, failed_nodes=failed)
    return WriteResult(success=False, acked_nodes=acked, failed_nodes=failed)
Watch Out

Setting W=1 for fast writes and R=1 for fast reads gives you the worst of both worlds: you get no consistency guarantees AND you are running replicas that do nothing useful. The most dangerous configuration is W+R less than or equal to N with business logic that assumes strong consistency - this causes silent data corruption in production, not crashes.

Vector Clocks

Two clients might concurrently write to the same key on different coordinator nodes before those coordinators have synchronized. Which write wins? Wall-clock time is unreliable in distributed systems - clocks drift, NTP adjustments are not atomic, and two events one millisecond apart on different machines have no guaranteed ordering.

Vector clocks solve this by treating causality as a version number per node. Think of them like a shared calendar with one column per team member - you can only say “event A definitely happened before event B” if every team member’s entry in A’s calendar is less than or equal to their entry in B’s calendar.

# Vector clock implementation with merge and conflict detection
from typing import Optional
from copy import deepcopy

VectorClock = dict[str, int]

def vc_increment(vc: VectorClock, node_id: str) -> VectorClock:
    """Increment this node's counter when it writes."""
    updated = deepcopy(vc)
    updated[node_id] = updated.get(node_id, 0) + 1
    return updated

def vc_merge(vc1: VectorClock, vc2: VectorClock) -> VectorClock:
    """Take element-wise max - used on read to advance clock."""
    all_nodes = set(vc1) | set(vc2)
    return {n: max(vc1.get(n, 0), vc2.get(n, 0)) for n in all_nodes}

def vc_happens_before(vc1: VectorClock, vc2: VectorClock) -> bool:
    """True if vc1 causally precedes vc2 (vc1 is an ancestor of vc2)."""
    all_nodes = set(vc1) | set(vc2)
    at_least_one_less = False
    for n in all_nodes:
        if vc1.get(n, 0) > vc2.get(n, 0):
            return False
        if vc1.get(n, 0) < vc2.get(n, 0):
            at_least_one_less = True
    return at_least_one_less

def vc_concurrent(vc1: VectorClock, vc2: VectorClock) -> bool:
    """True if neither clock happened before the other - a conflict."""
    return not vc_happens_before(vc1, vc2) and not vc_happens_before(vc2, vc1) and vc1 != vc2

# Example: two concurrent writes create a conflict
vc_client_a = {"node1": 2, "node2": 1}
vc_client_b = {"node1": 1, "node2": 2}
print(vc_concurrent(vc_client_a, vc_client_b))  # True - conflict!

When a conflict is detected, the coordinator returns both versions to the client and lets application logic resolve them. This is what Amazon’s shopping cart used: adding items from two browser tabs would merge the carts rather than silently discarding one. For simple values where last-write-wins is acceptable, the coordinator can resolve automatically using the wall-clock timestamp as a tiebreaker.

Key Insight

Vector clocks do not prevent conflicts - they detect them with certainty. The design decision is whether to resolve conflicts at write time (simpler clients, possible data loss) or at read time (clients bear merge complexity, no silent data loss). Most production KV stores choose write-time LWW (Last-Write-Wins) for simplicity and accept the rare lost update.

Data Model

The storage engine on each node uses an LSM-tree (Log-Structured Merge Tree). Writes go to an in-memory memtable first, then are flushed as immutable SSTables to disk. Reads check the memtable, then walk SSTables newest-first with a Bloom filter to skip files that definitely don’t contain the key.

-- Core keyspace schema stored per storage node
-- Each row represents one version of a key
CREATE TABLE kv_store (
    key           VARBINARY(256)   NOT NULL,
    vector_clock  JSON             NOT NULL,   -- {"node1": 3, "node2": 1}
    value         BYTEA,                       -- NULL means tombstone (delete)
    timestamp_ms  BIGINT           NOT NULL,   -- wall clock for TTL expiry
    ttl_ms        BIGINT           DEFAULT NULL,  -- NULL = no expiry
    checksum      BYTEA            NOT NULL,   -- CRC32 of value for corruption detection
    PRIMARY KEY   (key)
);

-- Bloom filter metadata per SSTable segment
CREATE TABLE sstable_metadata (
    segment_id    BIGINT           NOT NULL,
    min_key       VARBINARY(256)   NOT NULL,
    max_key       VARBINARY(256)   NOT NULL,
    bloom_filter  BYTEA            NOT NULL,   -- serialized Bloom filter
    entry_count   BIGINT           NOT NULL,
    size_bytes    BIGINT           NOT NULL,
    created_at    BIGINT           NOT NULL,
    PRIMARY KEY   (segment_id)
);

-- Hinted handoff queue: writes for temporarily unavailable nodes
CREATE TABLE hinted_handoff (
    hint_id       BIGSERIAL        PRIMARY KEY,
    target_node   VARCHAR(64)      NOT NULL,
    key           VARBINARY(256)   NOT NULL,
    value         BYTEA,
    vector_clock  JSON             NOT NULL,
    hint_created  BIGINT           NOT NULL,
    retry_count   INT              DEFAULT 0
);

CREATE INDEX idx_hints_target ON hinted_handoff(target_node, hint_created);

The partitioning key is the raw key bytes passed to hash(). This gives even distribution but prevents range scans. For workloads that need range scans, a sorted partition scheme (like HBase’s row key ranges) would be needed, at the cost of potential hot spots for sequential writes.

Data flow diagram showing write path from client through coordinator to replicas with quorum acknowledgment and vector clock update

Deletes use tombstones - rather than removing the key from storage, a DELETE writes a special marker with value=NULL. This is critical because a genuine delete must propagate to all replicas; without a tombstone, a stale replica that missed the delete would silently resurrect the key on the next read repair.

Watch Out

Tombstones accumulate until compaction runs. In workloads with frequent deletes and slow compaction (often caused by heavy write load slowing LSM compaction threads), tombstone accumulation causes read amplification - reads scan thousands of tombstones before finding a live value or confirming absence. Cassandra’s “tombstone warnings” exist for exactly this reason.

Key Algorithms and Protocols

Anti-Entropy Repair

Anti-entropy is the process by which replicas that have diverged due to network partitions, node crashes, or slow replication eventually converge back to the same state. The naive approach - comparing every key-value pair between two nodes - is impractical at billions of keys.

The solution is a Merkle tree: build a binary hash tree where leaf nodes are hashes of individual key-value pairs, and each internal node is the hash of its children. To compare two replicas, compare the root hashes. If they match, the replicas are identical (no work needed). If they differ, descend the tree - the subtree with a differing hash contains the diverged keys. This turns an O(N) comparison into O(K log N) where K is the number of diverged keys.

# Merkle tree for anti-entropy between two replicas
import hashlib
from typing import Optional

class MerkleTree:
    # Demonstrates Merkle tree construction for efficient replica comparison
    def __init__(self, key_value_pairs: list[tuple[bytes, bytes]]):
        sorted_pairs = sorted(key_value_pairs, key=lambda x: x[0])
        self.leaves = [
            hashlib.sha256(k + b":" + v).digest()
            for k, v in sorted_pairs
        ]
        self.keys = [k for k, _ in sorted_pairs]
        self.tree = self._build(self.leaves)

    def _build(self, nodes: list[bytes]) -> list[bytes]:
        if len(nodes) == 0:
            return [b"\x00" * 32]
        if len(nodes) == 1:
            return nodes
        if len(nodes) % 2 != 0:
            nodes = nodes + [nodes[-1]]  # duplicate last leaf if odd count
        parents = [
            hashlib.sha256(nodes[i] + nodes[i + 1]).digest()
            for i in range(0, len(nodes), 2)
        ]
        return self._build(parents) + nodes

    @property
    def root(self) -> bytes:
        return self.tree[0]

    def diff_ranges(self, other: "MerkleTree") -> list[tuple[bytes, bytes]]:
        """Return key ranges where this tree differs from other. O(K log N)."""
        diverged: list[tuple[bytes, bytes]] = []
        self._compare(other, 0, 0, len(self.keys), diverged)
        return diverged

    def _compare(
        self, other: "MerkleTree",
        node_idx: int, lo: int, hi: int,
        diverged: list[tuple[bytes, bytes]]
    ) -> None:
        if lo >= hi or node_idx >= len(self.tree):
            return
        if self.tree[node_idx] == other.tree[node_idx]:
            return  # subtree matches - skip entirely
        if hi - lo == 1:
            diverged.append((self.keys[lo], self.keys[min(lo + 1, len(self.keys) - 1)]))
            return
        mid = (lo + hi) // 2
        left_child = 2 * node_idx + 1
        right_child = 2 * node_idx + 2
        self._compare(other, left_child, lo, mid, diverged)
        self._compare(other, right_child, mid, hi, diverged)

Anti-entropy runs on a schedule (typically once per hour per replica pair) to catch divergence that slipped through read repair or hinted handoff.

Key Insight

Merkle trees make anti-entropy sublinear: comparing two 10-billion-key replicas where only 1,000 keys differ requires examining only about 1,000 * log2(10 billion) ≈ 33,000 tree nodes, not 10 billion key pairs. The tree is rebuilt periodically from the SSTable segments, not maintained in real-time - building it is the expensive part, diffing is cheap.

Hinted Handoff

When a write request arrives but one of the N target replicas is temporarily unreachable, rather than failing the write (which would reduce availability), the coordinator writes the data to an available node along with a hint - metadata saying “this write was meant for Node X, please deliver it when Node X comes back.”

# Hinted handoff: store write on surrogate node with delivery metadata
import json
import time

def store_with_hint(
    storage_client,
    surrogate_node: str,
    intended_node: str,
    key: bytes,
    value: bytes,
    vector_clock: dict[str, int],
    hint_ttl_seconds: int = 3600,
) -> None:
    # Demonstrates storing a hinted write to a surrogate node
    hint_metadata = {
        "intended_node": intended_node,
        "hint_created_ms": int(time.time() * 1000),
        "hint_expires_ms": int((time.time() + hint_ttl_seconds) * 1000),
        "original_vc": vector_clock,
    }
    # Store the value with a special hint-prefix key on the surrogate
    hint_key = b"__hint__" + intended_node.encode() + b":" + key
    storage_client.put_local(
        surrogate_node,
        hint_key,
        value,
        json.dumps(hint_metadata).encode()
    )

def replay_hints(storage_client, recovered_node: str) -> int:
    # When Node X recovers, replay all hints stored on surrogates
    hints = storage_client.scan_hints(prefix=f"__hint__{recovered_node}:")
    replayed = 0
    for hint_key, value, metadata in hints:
        meta = json.loads(metadata)
        if int(time.time() * 1000) > meta["hint_expires_ms"]:
            storage_client.delete_hint(hint_key)
            continue
        original_key = hint_key.split(b":", 1)[1]
        storage_client.put(recovered_node, original_key, value, meta["original_vc"])
        storage_client.delete_hint(hint_key)
        replayed += 1
    return replayed

Hinted handoff is a “best effort” availability mechanism. Hints are not durable - if the surrogate node also fails before the intended node recovers, the hint is lost. Anti-entropy repair is the fallback that catches these gaps.

Read Repair

Read repair runs opportunistically during a normal GET request. When the coordinator fans out a read to R nodes and collects responses, it compares the vector clocks of all returned values. If any replica returns a stale version (its vector clock is dominated by another replica’s version), the coordinator sends an asynchronous write to the stale replica with the fresher value.

# Read repair: compare versions across replicas and async-update stale ones
async def coordinated_get(
    key: str,
    preference_list: list[str],
    R: int,
    storage_client,
) -> tuple[Optional[bytes], dict]:
    # Demonstrates read repair: coordinator heals stale replicas on each read
    responses: list[tuple[str, bytes, dict]] = []

    async def read_from_node(node: str) -> None:
        try:
            value, vc = await asyncio.wait_for(
                storage_client.get(node, key),
                timeout=0.05
            )
            responses.append((node, value, vc))
        except Exception:
            pass

    await asyncio.gather(*[read_from_node(n) for n in preference_list])

    if len(responses) < R:
        raise Exception(f"Read quorum failed: only {len(responses)}/{R} nodes responded")

    # Find the newest version by vector clock dominance
    newest_vc = responses[0][2]
    newest_value = responses[0][1]
    for _, value, vc in responses[1:]:
        if vc_happens_before(newest_vc, vc):
            newest_vc = vc
            newest_value = value

    # Async repair: send the newest version to stale replicas
    stale_nodes = [
        node for node, _, vc in responses
        if vc_happens_before(vc, newest_vc)
    ]
    if stale_nodes:
        asyncio.create_task(
            asyncio.gather(*[
                storage_client.put(node, key, newest_value, newest_vc)
                for node in stale_nodes
            ])
        )

    return newest_value, newest_vc

Read repair works well for hot keys (read frequently, healed quickly) but does nothing for cold keys that are rarely read. Anti-entropy fills that gap.

Real World

Apache Cassandra uses all three repair mechanisms in concert: read repair heals actively accessed keys within the read path, hinted handoff covers short node outages (up to 3 hours by default, configurable via max_hint_window_in_ms), and nodetool repair triggers full Merkle tree anti-entropy for cold data. DynamoDB uses a similar combination but hides all three behind the fully-managed control plane - you just see eventual consistency at the application layer.

Scaling and Performance

The system scales horizontally by adding nodes to the ring. Each new node gets assigned 150 virtual nodes (token positions), and keys in those token ranges migrate from existing neighbors. The migration is incremental and online - existing nodes serve requests while streaming data to the new node in the background.

Scaling diagram showing node addition to consistent hash ring and Merkle tree anti-entropy repair between replicas
Capacity estimation - 500K reads/sec, 100K writes/sec:

Given:
  - 500,000 reads/second at 5ms p99 target
  - 100,000 writes/second
  - Average value size: 1KB
  - Replication factor: 3
  - Read/write ratio: 5:1

Write amplification:
  100,000 writes/sec * 3 replicas = 300,000 physical writes/sec
  300,000 * 1KB = 300 MB/sec ingest to LSM-tree memtables

Read throughput per node (10 nodes):
  500,000 / 10 nodes = 50,000 reads/node/sec
  With R=2 quorum: 100,000 read RPCs/node/sec (coordinator fans out to 2)

Storage (1 year, no TTL):
  100,000 writes/sec * 1KB * 3 replicas * 86400 sec/day * 365 days
  = 100,000 * 1000 * 3 * 31,536,000
  = ~9.5 PB raw - clearly requires aggressive TTLs or tiered storage

Practical 30-day retention:
  100,000 * 1000 * 3 * 2,592,000 = ~777 TB
  ~78 TB per node at 10 nodes with RF=3

Memory (memtable budget per node):
  100MB memtable flush threshold * 4 concurrent flushes = 400MB working set
  + Bloom filters: ~1.2 bytes per key * 1B keys/node = 1.2 GB

Recommended per-node hardware:
  32 vCPU, 128 GB RAM, 4x NVMe SSD (10 TB each), 25 Gbps NIC

The dominant bottleneck at high write throughput is LSM compaction. As SSTables accumulate, background compaction merges and sorts them - this process competes with writes for I/O bandwidth. If compaction falls behind, read latency grows because reads must scan more SSTable files.

Mitigation strategies:

  • Dedicate 30-40% of I/O bandwidth to compaction using rate limiting (compaction_throughput_mb_per_sec)
  • Use tiered compaction for write-heavy workloads (fewer, larger compaction runs)
  • Use leveled compaction for read-heavy workloads (bounded read amplification)
  • Add nodes before the cluster hits 60% storage utilization - migration at 90% full causes compaction starvation
Real World

Facebook’s RocksDB (their LSM-tree engine powering MyRocks, ZippyDB, and others) introduced a “dynamic leveled compaction” strategy that adjusts compaction aggressiveness based on the ratio of uncompacted SSTables to available I/O. At Facebook’s scale, a compaction storm on a single node caused cascading reads to other replicas, which triggered compaction there too - a resonance failure. The fix was adding compaction backpressure that throttles writes when the compaction queue depth exceeds a threshold.

Failure Modes and Recovery

FailureDetectionImpactRecovery
Single node crashGossip timeout (10-30 sec)Reads/writes route to remaining replicas, W/R still satisfied at QUORUMHinted handoff delivers missed writes; anti-entropy repairs cold data on rejoin
Network partition (split brain)Gossip detects node as unreachableMinority partition may reject writes if can’t reach W nodesWrites to majority partition continue; minority heals via anti-entropy on partition heal
Disk corruptionChecksum mismatch on readSingle key returns error; node flags corrupted segmentRead repair provides clean copy from other replicas; corrupted SSTable rebuilt
Slow node (GC pause, I/O saturation)Request timeout on coordinatorCoordinator waits up to 50ms then falls back to other replicasHedged reads (send to R+1 nodes, use first R responses) absorb slow outliers
Clock skewNTP divergence metricsLWW conflicts resolved by wrong winner (rare)Vector clocks provide true causality; LWW is fallback only for genuine conflicts
Coordinator failure mid-writeClient timeout; no ACK receivedWrite may be partially applied (some replicas got it, others didn’t)Client retries with same vector clock; idempotent replicas detect duplicate and skip
Watch Out

The most dangerous operational mistake is disabling anti-entropy repair to reduce background I/O during peak traffic, then never re-enabling it. Stale replicas accumulate silently. When a node crashes and is replaced, the replacement node bootstraps from peers - if those peers have months of silent divergence, the new node gets inconsistent data that read repair will never fix (because nobody reads those cold keys). Repair must run continuously at low priority, not periodically at high priority.

Comparison of Approaches

ApproachConsistencyLatencyComplexityBest Fit
Single-node RedisStrongSub-msLowSession state under 100K QPS, single AZ
Modulo sharding + replicationStrong per shardLowMediumFixed cluster size, rare resharding
Consistent hashing + quorum (this design)Tunable (ONE/QUORUM/ALL)Low-Medium (5-10ms)HighHigh write volume, elastic scaling, multi-AZ
Raft-based KV (etcd, TiKV)Strong (linearizable)Medium (20-50ms)Very HighConfig store, leader election, metadata
DynamoDB / Cassandra (managed)TunableLowLow (ops)General purpose, serverless scaling

The tunable consistency model (this design) is the right choice when you are building a general-purpose cache or session store where occasional stale reads are acceptable, write throughput must scale horizontally, and you cannot afford the latency penalty of linearizable consensus. If you need transactions or linearizability (financial balances, inventory counts), use a Raft-based system like TiKV or CockroachDB - the quorum model described here does not give you those guarantees.

For most web application workloads - user profiles, session state, feature flags, rate-limit counters - QUORUM reads and writes give you the right tradeoff: reads see the latest write within one replication cycle (milliseconds), and you can survive any single node failure without impacting clients.

Key Takeaways

  • Consistent hashing eliminates the full data migration problem on cluster resize - only 1/N of keys move when adding one node to an N-node cluster.
  • Virtual nodes solve load imbalance inherent in pure consistent hashing - 150 vnodes per physical node reduces load variance below 3%.
  • Quorum (W + R > N) guarantees that read and write sets overlap, providing a tunable consistency/availability tradeoff without a central coordinator.
  • Vector clocks detect concurrent writes with certainty by tracking causality per node, enabling conflict detection without relying on unreliable wall-clock ordering.
  • Hinted handoff recovers availability during short node outages by temporarily storing writes on surrogate nodes, enabling the system to meet quorum without the intended replica.
  • Read repair heals stale replicas opportunistically during normal reads - hot keys stay fresh with zero extra background load.
  • Anti-entropy with Merkle trees repairs cold data divergence efficiently by comparing replica contents in O(K log N) time rather than comparing every key.
  • Tombstones for deletes prevent deleted keys from being resurrected by stale replicas, at the cost of compaction overhead if deletes are frequent.

The counter-intuitive lesson from building this system is that the write path is simpler than it looks, but the repair path is what makes or breaks production reliability. Any engineer can implement consistent hashing and quorum writes in a weekend. The months of engineering go into the background systems - hinted handoff with bounded storage, Merkle tree construction that doesn’t block reads, read repair that does not amplify load on already-slow nodes. Get those repair mechanisms wrong and you have a KV store that appears healthy until a cascade of node failures reveals years of silent divergence.

Frequently Asked Questions

Q: Why not use a leader-per-shard model (like Kafka partitions) instead of leaderless replication? A: Leader-per-shard gives you stronger consistency guarantees because all writes go through a single leader who applies them in sequence. But it means every write must reach the leader - if the leader is in a different availability zone, you add a cross-AZ round trip to every write. It also makes scaling harder: adding shards requires migrating data and re-electing leaders. Leaderless quorum trades some consistency complexity for lower write latency and simpler elastic scaling.

Q: Won’t vector clocks grow unboundedly as nodes are added and removed? A: Yes, without pruning. The original Dynamo paper addressed this with a technique called “vector clock truncation” - you keep only the N most recently updated node entries in each clock. This introduces a small risk of missing a conflict (treating a conflict as a clean succession), but in practice node churn is slow enough that the clock size stays bounded. An alternative is using a monotonic epoch counter per write rather than a per-node counter - this is how Cassandra’s TIMESTAMP column works.

Q: Why not just use last-write-wins (LWW) based on timestamps and skip vector clocks entirely? A: LWW with timestamps silently discards one of two concurrent writes. If client A and client B both update a shopping cart at the same millisecond, LWW throws away one update - the user sees their item disappear with no error. Vector clocks surface the conflict so the application can merge both carts. LWW is acceptable for idempotent data (user profile photo URL, feature flag value) but dangerous for additive data (counters, sets, cart items).

Q: How does the cluster handle a prolonged partition (hours or days) rather than a brief node outage? A: Hinted handoff has a TTL (default 3 hours in Cassandra). After that, hints are discarded to prevent unbounded storage growth. For longer partitions, the only recovery path is anti-entropy repair via Merkle tree comparison after the partition heals. If the partition lasts so long that the separated replica’s data is completely stale, it may be faster to decommission and re-bootstrap the node from scratch (streaming all data from its peers).

Q: How do you prevent hot spots when many keys hash to the same virtual node? A: With 150 virtual nodes per physical node and MD5 hashing, key distribution variance is statistically bounded - less than 3% load imbalance across nodes. For application-level hot keys (a single viral post’s view counter), the solution is at the client layer: spread the key into key#shard0 through key#shardN, write to and sum across all shards. This is the scatter-gather pattern used by high-throughput counters.

Q: What happens if the coordinator crashes mid-write after sending to some but not all replicas? A: The client receives a timeout and retries with the same vector clock. The retry may reach a new coordinator (the client’s SDK routes to the next live node). Because the write is idempotent - replicas check the vector clock and skip writes they already have - the retry succeeds cleanly. Replicas that got the first attempt ignore the duplicate; replicas that missed it apply it now. This is why vector clocks on every write are non-negotiable: they are the idempotency key for the write operation.

Interview Questions

Q: Walk me through what happens when a client sends PUT("user:123", profile_data) to a 5-node cluster with N=3, W=2.

Expected depth: Start with the client SDK hashing the key to find the ring position and deriving the 3-node preference list. Explain that any live node can act as coordinator. Describe the parallel fan-out to all 3 nodes with the current vector clock. Walk through the quorum wait - coordinator responds to client after 2 ACKs, 3rd write completes asynchronously. Discuss what happens if one replica is down: hinted handoff kicks in, coordinator writes to a surrogate and stores a hint for delivery on recovery.

Q: How would you handle a network partition where 2 nodes can see each other but not the other 3?

Expected depth: With N=3, W=2 - the majority partition (3 nodes) can still satisfy quorum and continues serving writes. The minority partition (2 nodes) cannot satisfy W=2 and must reject writes, returning an error to clients. On partition heal, anti-entropy repair runs Merkle tree comparison to find keys that diverged - the minority partition’s writes during the split (if any clients had W=1) must be reconciled using vector clock comparison. Discuss how W=1 on the minority creates write conflicts that vector clocks surface.

Q: A single key is receiving 100,000 writes/second (hot key). How does consistent hashing hurt you here, and what do you do?

Expected depth: Consistent hashing concentrates all writes for a given key to its 3 replicas - adding more nodes to the cluster does not help, because the key still maps to the same ring positions. The fix is client-side sharding: shard the key into key#0 through key#N-1, distribute writes round-robin across all shards, and sum across shards on read. Discuss the read complexity (scatter-gather) and counter semantics. Mention that DynamoDB’s adaptive capacity does this automatically by splitting hot partition keys.

Q: Why does your system use LSM-trees for storage rather than B-trees?

Expected depth: KV stores have write-heavy workloads - LSM-trees amortize random writes by buffering in memory and flushing sequentially to disk, achieving near-sequential write throughput regardless of key distribution. B-trees require random in-place updates on every write, causing write amplification that saturates NVMe IOPS at scale. The tradeoff: LSM-trees have read amplification (must check multiple SSTables) mitigated by Bloom filters. Discuss when B-trees win: workloads with heavy random reads and low write rates, or when read latency must be strictly bounded.

Q: How do you test that your quorum implementation is actually providing the consistency guarantees you claim?

Expected depth: Unit tests are insufficient - they cannot simulate real network partitions. Discuss Jepsen-style testing: inject partitions, pauses, and clock skew using tools like Chaos Monkey or Toxiproxy; run concurrent reads and writes; verify with a consistency checker (Elle) that no linearizability violations occurred at QUORUM level. Mention property-based testing for vector clock merging: generate random concurrent write histories and verify that the merge function always produces a clock that dominates both inputs.

Premium Content

Unlock the full article along with everything else in the archive — all in one place.

In-depth analysis Expert insights Full archive access
Unlock Full Article