Build an Inverted Index for Full-Text Search at 10B Documents


databases scalability performance

System Design Deep Dive

Inverted Index for Full-Text Search

Building a search engine over 10 billion documents in under 100ms means choosing the right compression, sharding, and scoring strategy from day one.

14 min readAdvancedDatabases

Think of a library where every book has been torn apart, and each word from every page has been pinned to a giant corkboard with a note saying which book and page it came from. When you ask for books about “distributed consensus”, the librarian goes directly to the “distributed” pin and the “consensus” pin, takes the intersection of their page-lists, and hands you the relevant books - all in seconds, without scanning a single full text. This is the inverted index: a mapping from terms to the documents that contain them, built offline so that query time is a lookup, not a scan.

The engineering challenge at 10 billion documents is that no single machine can hold this corkboard. A typical English web document averages 500 unique tokens. At 10B documents that is roughly 5 trillion term-document pairs. Even with aggressive compression, the index for a corpus of this size runs into hundreds of terabytes. A naive full-text scan at 1GB/s throughput would take over 100,000 seconds per query - roughly 27 hours. We need to make query time a function of the result set size, not the corpus size.

Three forces pull in opposite directions. Write latency versus index freshness: building a high-quality compressed index takes time - Lucene-style segment merging can lag by seconds to minutes. But users expect documents they just published to be searchable immediately. Compression ratio versus decode speed: delta-encoded variable-byte posting lists compress 10x better than raw arrays, but decoding them requires sequential iteration - the exact bottleneck at query time when intersecting thousands of candidates. Sharding for parallelism versus ranking quality: splitting documents across shards parallelizes index lookup, but BM25 ranking needs global statistics (document frequency across the entire corpus) that live on different shards.

We need to solve for inverted index structure, posting list compression, near-real-time indexing with segment merging, BM25 scoring with global stats, shard-aware query execution, and sub-100ms p99 latency at 10B documents simultaneously.

Requirements and Constraints

Functional Requirements

  • Index 10 billion documents averaging 500 tokens each
  • Support full-text keyword queries with relevance ranking
  • Return top-K results (K = 1 to 100) with BM25 scores
  • Support phrase queries (“exact phrase”) and boolean operators (AND, OR, NOT)
  • Near-real-time indexing: documents searchable within 5 seconds of ingestion
  • Support document updates and deletions (mark deleted, filter at query time)
  • Field-aware search: title, body, tags with per-field boost weights

Non-Functional Requirements

  • Query latency: p50 < 20ms, p99 < 100ms at 50,000 QPS
  • Index ingestion throughput: 100,000 documents per second sustained
  • Total corpus: 10 billion documents, approximately 5TB of raw text
  • Index storage: target under 500GB per shard with 10 shards (5TB total index)
  • Availability: 99.9% uptime, zero data loss on shard failure
  • Replication factor: 3 replicas per shard for read scaling and fault tolerance

Constraints and Assumptions

  • Documents are in English; multilingual support is out of scope
  • Maximum document size: 1MB of raw text
  • Query terms are pre-tokenized to match the index vocabulary
  • We do not support vector/semantic search - keyword relevance only
  • The document store (raw document retrieval) is a separate system; the index stores only postings metadata

High-Level Architecture

The system has six major components working across two logical paths: the write path that builds and maintains the index, and the read path that executes queries against it.

The Document Ingestor receives raw documents via Kafka and applies the text analysis pipeline - tokenization, lowercasing, stopword removal, and stemming. It writes tokenized posting data into an In-Memory Write Buffer on the target shard node, where a Segment Flusher periodically persists buffered data to immutable on-disk segments. A background Segment Merger runs the LSM-style compaction process, consolidating small segments into larger ones to keep per-query segment count bounded.

On the read path, queries arrive at the Query Router, which uses a consistent shard map to fan out to the relevant shards. Each shard runs a Shard Query Executor that parses the query, looks up posting lists for each term, intersects them using DAAT (Document-at-a-Time) traversal, scores candidates with BM25 using locally cached global statistics, and returns the top-K results. The Query Router merges partial results from all shards, applies global re-ranking, and returns the final ordered list to the client.

High-level architecture showing write path and read path across 10 shards
Key Insight

The most important architectural decision is separating the write path from the read path entirely - writes flow through an append-only in-memory buffer that gets flushed to immutable segments, while reads only ever touch immutable segments. This eliminates all write-read contention at query time and is exactly how Lucene, Elasticsearch, and Solr achieve their read performance guarantees.

The Inverted Index Structure

The inverted index maps each unique term in the corpus to a posting list: an ordered list of document IDs (docIDs) that contain the term, along with the term frequency within each document.

Think of a phone book but reversed: instead of name-to-number, you have word-to-document. The phone book analogy breaks down at scale because a common English word like “the” appears in virtually every document - its posting list would have 10 billion entries. This is why we separate the index into two tiers: a term dictionary (the vocabulary, stored in a B-tree or FST for O(log n) prefix lookup) and a posting store (the actual docID lists, stored in compressed binary format on disk).

Each posting list entry stores:

  • doc_id (4 bytes, delta-encoded against the previous entry)
  • term_freq (variable-byte encoded, typically 1-2 bytes)
  • Optional: positions list for phrase queries (variable-byte, per-position delta)

Delta encoding is the key compression insight. Because posting lists are sorted by docID, consecutive entries are numerically close. Instead of storing [100235, 100289, 100301], we store [100235, 54, 12]. These small deltas compress much better with variable-byte encoding: values under 128 take 1 byte, under 16384 take 2 bytes. A raw 4-byte docID average compresses to roughly 1.3 bytes after delta + VByte encoding - a 3x reduction.

# Variable-byte encoding for a delta-encoded posting list
def encode_posting_list(doc_ids: list[int]) -> bytes:
    """Encode sorted doc_id list using delta + variable-byte compression."""
    result = bytearray()
    prev = 0
    for doc_id in doc_ids:
        delta = doc_id - prev
        prev = doc_id
        # Variable-byte encode the delta
        while delta >= 128:
            result.append((delta & 0x7F) | 0x80)
            delta >>= 7
        result.append(delta & 0x7F)
    return bytes(result)

def decode_posting_list(data: bytes) -> list[int]:
    """Decode variable-byte encoded delta-compressed posting list."""
    doc_ids = []
    current = 0
    shift = 0
    prev = 0
    for byte in data:
        current |= (byte & 0x7F) << shift
        if not (byte & 0x80):
            prev += current
            doc_ids.append(prev)
            current = 0
            shift = 0
        else:
            shift += 7
    return doc_ids

For very high-frequency terms (document frequency > 10,000), we switch to SIMD-accelerated PFOR (Patched Frame of Reference) encoding. PFOR packs 128 docID deltas into a fixed-size block, uses bit-packing at a width that fits most deltas, and “patches” the outliers separately. This enables SIMD bulk-decode at 4-8x faster throughput than scalar VByte.

Key Insight

Delta encoding plus variable-byte compression reduces the average bytes-per-posting from 4 bytes to roughly 1.3 bytes - a 3x index size reduction that translates directly to 3x more index in cache, 3x better I/O throughput, and sub-100ms query latency becoming achievable without exotic hardware.

Near-Real-Time Indexing and Segment Merging

The naive approach to making documents searchable is to rebuild the entire index from scratch whenever new documents arrive. At 10B documents and 100K docs/second ingestion, a full rebuild would take 28 hours. We need something fundamentally different.

Near-real-time (NRT) indexing works by treating the index as a log-structured merge tree. New documents land in an in-memory buffer called an in-memory segment. This segment is unsorted and uncompressed but immediately visible to queries via an in-memory hash table. When the buffer reaches a threshold (typically 16-64MB of raw posting data, or after 1-5 seconds), it is flushed to disk as an immutable, fully compressed disk segment. Queries must check all live segments - both the in-memory buffer and all disk segments.

The problem with accumulating small disk segments is that query time degrades. Answering a query requires fetching the posting list for each query term from every segment. If there are 1,000 small segments, that is 1,000 random disk reads per term per query. Segment merging solves this: a background thread continuously merges small segments into larger ones, reducing segment count. The merge policy follows tiers - segments within the same size tier get merged together, similar to how RocksDB’s level compaction works.

# Tiered merge policy - decide which segments to merge
from dataclasses import dataclass
from typing import Optional

@dataclass
class Segment:
    segment_id: str
    size_bytes: int
    doc_count: int
    deleted_doc_count: int

class TieredMergePolicy:
    """Merge segments within the same size tier."""
    TIER_FLOOR_SIZE = 10 * 1024 * 1024        # 10MB minimum segment
    MAX_SEGMENTS_PER_TIER = 10
    MAX_MERGED_SEGMENT_SIZE = 5 * 1024 * 1024 * 1024  # 5GB max merged

    def select_merge_candidates(self, segments: list[Segment]) -> Optional[list[Segment]]:
        """Return a list of segments to merge, or None if no merge needed."""
        # Sort by size ascending
        sorted_segs = sorted(segments, key=lambda s: s.size_bytes)
        # Group into tiers by log scale
        import math
        tiers: dict[int, list[Segment]] = {}
        for seg in sorted_segs:
            tier = max(0, int(math.log(max(seg.size_bytes, self.TIER_FLOOR_SIZE) / self.TIER_FLOOR_SIZE, 10)))
            tiers.setdefault(tier, []).append(seg)
        # Find a tier with too many segments
        for tier_idx, tier_segs in sorted(tiers.items()):
            if len(tier_segs) > self.MAX_SEGMENTS_PER_TIER:
                # Merge the smallest segments in this tier first
                candidates = tier_segs[:self.MAX_SEGMENTS_PER_TIER]
                merged_size = sum(s.size_bytes for s in candidates)
                if merged_size <= self.MAX_MERGED_SEGMENT_SIZE:
                    return candidates
        return None

    def merge_segments(self, candidates: list[Segment]) -> Segment:
        """Merge candidates into a new segment (simplified - real merge reads posting lists)."""
        total_docs = sum(s.doc_count - s.deleted_doc_count for s in candidates)
        total_size = sum(s.size_bytes * 0.85 for s in candidates)  # ~15% compression gain
        new_id = f"merged_{candidates[0].segment_id}_{candidates[-1].segment_id}"
        return Segment(segment_id=new_id, size_bytes=int(total_size),
                       doc_count=total_docs, deleted_doc_count=0)

Document deletions are handled lazily. When a document is deleted or updated, its docID is added to a delete bitmap (a Roaring Bitmap for memory efficiency). At query time, results matching deleted docIDs are filtered out. At merge time, deleted documents are physically removed from posting lists, reclaiming space. This means updates are implemented as delete-then-reindex.

Real World

Elasticsearch uses exactly this NRT segment model inherited from Apache Lucene. By default it refreshes the in-memory segment every 1 second (configurable via index.refresh_interval), making new documents searchable within 1-2 seconds. Lucene’s IndexWriter flushes to disk segments, and IndexReader can be refreshed to see new segments without closing, which is the mechanism behind Elasticsearch’s NRT search.

Query Execution Plan

A query execution plan is the search engine’s equivalent of a SQL query plan - it defines the order of operations that transforms a query string into a ranked result list. Getting this plan wrong means scanning orders of magnitude more data than necessary.

The query pipeline has five stages:

Stage 1 - Query parsing: The query string "distributed consensus algorithm" is tokenized and analyzed identically to how documents are indexed. Stop words are removed, terms are stemmed (algorithm -> algorithm). Boolean structure is parsed: phrase queries are detected by quotes, AND/OR/NOT operators by keywords or proximity.

Stage 2 - Posting list retrieval: For each query term, retrieve the compressed posting list from the segment store. High-frequency terms (appearing in > 10% of the corpus) are candidate query rewrites - they may be skipped or given low weight via IDF dampening.

Stage 3 - DAAT intersection: Document-at-a-Time traversal intersects posting lists by advancing all iterators in lock-step toward the next candidate document. The smallest posting list drives the iteration; all other iterators use gallop (exponential search) to skip ahead. This avoids scanning all entries in large lists when the smaller list constrains candidates.

# Document-at-a-Time intersection with gallop skip
import bisect

def intersect_posting_lists(lists: list[list[int]]) -> list[int]:
    """
    DAAT intersection: find docIDs present in ALL posting lists.
    Uses gallop search to skip over non-matching docIDs efficiently.
    """
    if not lists:
        return []
    # Sort by length ascending - smallest drives iteration
    lists = sorted(lists, key=len)
    result = []
    # Start with first docID from the shortest list
    pointers = [0] * len(lists)

    while pointers[0] < len(lists[0]):
        candidate = lists[0][pointers[0]]
        matched = True
        for i in range(1, len(lists)):
            lst = lists[i]
            ptr = pointers[i]
            # Gallop search: jump ahead exponentially
            step = 1
            while ptr + step < len(lst) and lst[ptr + step] < candidate:
                step *= 2
            # Binary search in the identified range
            lo = ptr + step // 2
            hi = min(ptr + step, len(lst))
            new_ptr = bisect.bisect_left(lst, candidate, lo, hi)
            pointers[i] = new_ptr
            if new_ptr >= len(lst) or lst[new_ptr] != candidate:
                matched = False
                # Advance shortest-list pointer to next candidate after the gap
                skip_target = lst[new_ptr] if new_ptr < len(lst) else float('inf')
                pointers[0] = bisect.bisect_left(lists[0], int(skip_target), pointers[0])
                break
        if matched:
            result.append(candidate)
            pointers[0] += 1
    return result

Stage 4 - BM25 scoring: For each candidate document from the intersection, compute its BM25 relevance score. BM25 is the industry-standard term weighting function used by virtually every production search engine.

Stage 5 - Top-K selection: Use a min-heap of size K to efficiently track the top K scores without sorting the entire candidate set. For 10B documents across 10 shards, each shard returns its top K, and the coordinator merges 10 * K results into the final top K.

Animated data flow from document ingestion through indexing to query execution
Key Insight

The gallop (exponential) search in DAAT intersection is the non-obvious performance trick: when posting lists have very different lengths (e.g., “the” with 10B entries vs “quorum” with 50K entries), galloping over the large list skips 99.9% of candidates in O(log n) time per skip instead of O(n) linear scan.

Index Sharding Strategy

A single machine cannot store the index for 10B documents. At 50 bytes per posting on average (after compression), 5 trillion postings consume 250TB - far beyond single-node capacity. We shard the index horizontally.

The two canonical sharding strategies are document sharding (each shard holds a subset of documents) and term sharding (each shard holds a subset of the vocabulary). Document sharding wins for most production systems: term sharding requires routing every query to the shard that owns each query term, which means a query with 5 terms requires 5 serial round-trips. Document sharding allows all shards to execute the full query in parallel.

With 10 shards, each shard holds 1 billion documents. Routing uses a shard assignment function: shard_id = hash(doc_id) % num_shards. This gives uniform distribution but makes re-sharding expensive (all documents move). Consistent hashing with virtual nodes reduces re-shard cost to 1/N of documents moving when adding the (N+1)th shard.

Each shard has one primary and two replicas. Writes go to the primary, which synchronously replicates to replicas before acknowledging. Reads distribute across replicas using round-robin. If a primary fails, replica election uses Raft consensus to promote a replica.

The shard coordinator receives query results from all 10 shard primaries (or any replica), applies global BM25 normalization (cross-shard document frequency stats are cached in the coordinator), and merges the top-K lists.

# Shard coordinator: fan-out query and merge results
import asyncio
from heapq import nlargest

async def scatter_gather_query(
    query_terms: list[str],
    top_k: int,
    shard_clients: list,  # gRPC stubs to each shard
    global_idf: dict[str, float],  # pre-fetched from stats service
) -> list[tuple[float, int]]:
    """
    Fan out query to all shards, collect partial top-K, merge globally.
    Returns list of (score, doc_id) sorted descending.
    """
    # Fire all shard queries concurrently
    shard_tasks = [
        client.execute_query(terms=query_terms, top_k=top_k, global_idf=global_idf)
        for client in shard_clients
    ]
    shard_results = await asyncio.gather(*shard_tasks, return_exceptions=True)

    # Collect all (score, doc_id) pairs, skip failed shards
    all_candidates = []
    for result in shard_results:
        if isinstance(result, Exception):
            continue  # Partial results acceptable; degraded mode
        all_candidates.extend(result.scored_docs)

    # Global top-K merge using heap selection
    return nlargest(top_k, all_candidates, key=lambda x: x[0])
Real World

Elasticsearch calls this pattern “scatter-gather” and implements it with a two-phase approach: the “query phase” fetches doc IDs and scores from all shards, the coordinator picks the global top-K IDs, and the “fetch phase” retrieves the actual documents only for those top-K IDs. This two-phase design avoids transferring full document payloads for candidates that won’t make the final result set.

Sharding diagram showing 10 document shards with primary-replica replication and query fan-out

Data Model

The data model has four layers: the term dictionary, the posting store, the segment metadata catalog, and the global statistics store.

-- Term dictionary: maps term string to its posting list location
-- One row per unique term per shard
CREATE TABLE term_dictionary (
    shard_id        SMALLINT        NOT NULL,
    term            VARCHAR(256)    NOT NULL,
    doc_freq        INT             NOT NULL,     -- # docs containing this term
    posting_offset  BIGINT          NOT NULL,     -- byte offset in posting store file
    posting_length  INT             NOT NULL,     -- compressed byte length of posting list
    flags           SMALLINT        NOT NULL DEFAULT 0,  -- 0=normal, 1=high-freq-PFOR
    PRIMARY KEY (shard_id, term)
) PARTITION BY LIST (shard_id);

-- Segment catalog: tracks all segments (in-memory + on-disk) per shard
CREATE TABLE segment_catalog (
    shard_id        SMALLINT        NOT NULL,
    segment_id      VARCHAR(64)     NOT NULL,
    status          VARCHAR(16)     NOT NULL,     -- 'memory', 'flushing', 'active', 'merging', 'deleted'
    size_bytes      BIGINT          NOT NULL,
    doc_count       INT             NOT NULL,
    deleted_count   INT             NOT NULL DEFAULT 0,
    min_doc_id      BIGINT          NOT NULL,
    max_doc_id      BIGINT          NOT NULL,
    created_at      TIMESTAMPTZ     NOT NULL DEFAULT NOW(),
    merged_into     VARCHAR(64),                  -- parent segment ID if this was merged
    INDEX idx_segment_shard_status (shard_id, status),
    PRIMARY KEY (shard_id, segment_id)
);

-- Global statistics: document frequencies for BM25 cross-shard ranking
CREATE TABLE global_term_stats (
    term            VARCHAR(256)    NOT NULL PRIMARY KEY,
    global_doc_freq BIGINT          NOT NULL,     -- total docs containing term across all shards
    updated_at      TIMESTAMPTZ     NOT NULL DEFAULT NOW()
);

-- Document metadata: maps doc_id to external metadata (not the raw doc)
CREATE TABLE document_metadata (
    doc_id          BIGINT          NOT NULL,
    shard_id        SMALLINT        NOT NULL,
    external_id     VARCHAR(512)    NOT NULL,     -- original document identifier
    indexed_at      TIMESTAMPTZ     NOT NULL,
    field_lengths   JSONB           NOT NULL,     -- {"title": 8, "body": 423, "tags": 3}
    is_deleted      BOOLEAN         NOT NULL DEFAULT FALSE,
    PRIMARY KEY (doc_id),
    INDEX idx_docmeta_external (external_id),
    INDEX idx_docmeta_shard (shard_id, is_deleted)
) PARTITION BY HASH (doc_id);

Partitioning strategy: term_dictionary partitions by shard_id so each shard node owns its partition entirely - no cross-node joins. document_metadata partitions by hash of doc_id for uniform distribution. global_term_stats is a small table (vocabulary size is bounded) and can live on a dedicated statistics service with in-memory caching.

The posting store itself is not a SQL table - it is a collection of binary files, one per segment, with byte-addressed posting lists. The term_dictionary.posting_offset and posting_length columns are the pointers into those files.

Key Insight

Storing posting lists as raw binary files rather than database blobs is critical: SQL databases add serialization overhead, page headers, and MVCC metadata that waste I/O budget. A binary file lets us use pread() for O(1) random access and mmap() for OS-managed page caching - the same approach used by Lucene’s MMapDirectory.

Key Algorithms and Protocols

BM25 Scoring

BM25 (Best Match 25) is the relevance scoring function that computes how well a document matches a query. It improves on TF-IDF by adding document length normalization and saturation: a term appearing 100 times in a document is not 100x more relevant than one appearing once.

The BM25 score for document D and query Q with terms t1…tn is:

score(D, Q) = sum over terms t of: IDF(t) * (tf(t,D) * (k1+1)) / (tf(t,D) + k1 * (1 - b + b * len(D)/avgdl))

Where:

  • IDF(t) = log((N - df(t) + 0.5) / (df(t) + 0.5) + 1) - inverse document frequency
  • tf(t,D) - term frequency in document D
  • len(D) - document length in tokens
  • avgdl - average document length across corpus
  • k1 = 1.2 - term frequency saturation parameter
  • b = 0.75 - length normalization parameter
# BM25 scorer with pre-computed per-document field norms
import math
from dataclasses import dataclass

@dataclass
class BM25Params:
    k1: float = 1.2
    b: float = 0.75

@dataclass
class GlobalStats:
    total_docs: int
    avg_doc_length: float  # in tokens

def compute_idf(doc_freq: int, total_docs: int) -> float:
    """Robertson-Sparck Jones IDF formula (BM25 variant)."""
    return math.log((total_docs - doc_freq + 0.5) / (doc_freq + 0.5) + 1.0)

def bm25_score(
    query_terms: list[str],
    term_freqs: dict[str, int],       # {term: tf in this doc}
    doc_length: int,                  # total tokens in this doc
    doc_freqs: dict[str, int],        # {term: df across corpus}
    stats: GlobalStats,
    params: BM25Params = BM25Params(),
) -> float:
    """
    Compute BM25 relevance score for a single document.
    query_terms: terms from the user query
    term_freqs: term frequencies in the candidate document
    doc_freqs: corpus-wide document frequency per term (from global stats service)
    """
    score = 0.0
    length_norm = 1.0 - params.b + params.b * (doc_length / stats.avg_doc_length)

    for term in query_terms:
        tf = term_freqs.get(term, 0)
        if tf == 0:
            continue  # Term not in this doc (possible in OR queries)
        df = doc_freqs.get(term, 1)
        idf = compute_idf(df, stats.total_docs)
        # Saturated TF with length normalization
        tf_normalized = (tf * (params.k1 + 1)) / (tf + params.k1 * length_norm)
        score += idf * tf_normalized

    return score

# Example: score a document for query "distributed consensus"
stats = GlobalStats(total_docs=10_000_000_000, avg_doc_length=500)
score = bm25_score(
    query_terms=["distributed", "consensus"],
    term_freqs={"distributed": 5, "consensus": 2},
    doc_length=420,
    doc_freqs={"distributed": 2_000_000, "consensus": 150_000},
    stats=stats,
)
print(f"BM25 score: {score:.4f}")  # e.g., 8.73

Roaring Bitmap for Delete Tracking

Deleted document IDs are tracked per segment using Roaring Bitmaps - a compressed bitset that uses three container types depending on density: sorted arrays for sparse sets, bitsets for dense sets, and run-length encoding for long runs. The key property is that union, intersection, and contains() operations are all O(n/64) or better with SIMD.

# Delete bitmap operations using roaring bitmap library
from roaringbitmap import RoaringBitmap

class DeleteTracker:
    """Per-segment delete bitmap with efficient merge and query."""

    def __init__(self):
        self.deleted: RoaringBitmap = RoaringBitmap()

    def mark_deleted(self, doc_id: int) -> None:
        self.deleted.add(doc_id)

    def is_deleted(self, doc_id: int) -> bool:
        return doc_id in self.deleted

    def filter_results(self, doc_ids: list[int]) -> list[int]:
        """Remove deleted doc_ids from a result list efficiently."""
        result_bitmap = RoaringBitmap(doc_ids)
        result_bitmap -= self.deleted  # Set difference in O(n) time
        return sorted(result_bitmap)

    def merge(self, other: 'DeleteTracker') -> 'DeleteTracker':
        """Merge two delete trackers when merging segments."""
        merged = DeleteTracker()
        merged.deleted = self.deleted | other.deleted
        return merged

    def serialize(self) -> bytes:
        return self.deleted.serialize()

    @classmethod
    def deserialize(cls, data: bytes) -> 'DeleteTracker':
        tracker = cls()
        tracker.deleted = RoaringBitmap.deserialize(data)
        return tracker
Key Insight

Roaring Bitmaps use 10-100x less memory than naive bitsets for sparse delete sets (which is typical - most documents are not deleted) while offering O(1) membership tests and SIMD-accelerated intersection. This is why both Lucene and Druid use Roaring Bitmaps internally for docID filtering.

Scaling and Performance

Full-text search has two dominant bottlenecks: I/O for posting list reads and CPU for BM25 scoring. At 10B documents and 50K QPS, let us estimate the resource budget.

Capacity Estimation:

Corpus:
  10B documents * 500 tokens avg = 5 trillion token occurrences
  Raw posting data: 5T * 4 bytes (doc_id) = 20TB
  After delta + VByte compression (~3x): ~6.7TB
  With 10 shards: ~670GB per shard

Query load:
  50,000 QPS total, 5,000 QPS per shard
  Avg query: 3 terms, each term posting list = 500K entries avg
  DAAT intersection candidate docs per query: ~50K after initial filter
  BM25 scoring: 50K docs * 3 terms = 150K float ops per query
  Total: 5,000 * 150K = 750M float ops/second per shard (1 CPU core = ~2B FLOPS)

Memory requirements per shard:
  Term dictionary (10M unique terms * 100 bytes): 1GB
  Hot posting list cache (LRU, 20% hit rate target): 32GB
  In-memory write buffer: 2GB
  OS page cache for segment files: 64GB (target)
  Total per shard node: 128GB RAM

Storage per shard:
  Index files: 670GB
  Document metadata: 50GB
  WAL + segment catalog: 10GB
  Total: ~730GB per shard SSD

The primary read bottleneck is I/O for low-frequency terms: terms that appear in fewer than 10K documents have tiny posting lists that are not cached, requiring a random SSD read per term per query. The mitigation is a posting list prefetch cache keyed by (shard_id, term) with LRU eviction, sized at 32GB to capture the hot tail of the query term distribution.

The primary write bottleneck is segment flush latency: when the in-memory buffer fills and the flusher is blocked, new documents queue up and indexing latency spikes. Mitigation: run two write buffers in parallel (double-buffering), so one can flush to disk while the other accepts new documents.

For hot shards - shards that receive disproportionate query load because their documents are more popular - we add read replicas on demand. A shard can have 2 replicas under normal load and scale to 5+ replicas if query QPS to that shard exceeds a threshold.

Real World

Elasticsearch handles posting list caching via the OS page cache backed by NVMe SSDs, rather than an application-level cache. The key insight from their blog posts is that a 128GB NVMe SSD with 2M IOPS can serve roughly 40K random posting-list reads per second - meaning you need the hot 20% of posting lists to stay in the OS page cache to hit 50K QPS targets. Elasticsearch’s indices.memory.index_buffer_size (default 10% of heap) controls the in-memory write buffer size.

Segment merging internals showing tiered compaction from in-memory buffer to large immutable segments

Failure Modes and Recovery

FailureDetectionImpactRecovery
Shard primary node crashReplica heartbeat timeout (3s), Raft leader electionQueries to that shard fail; writes buffer in ingestor queueRaft elects new primary from replicas in under 10s; shard coordinator routes to new primary
Segment file corruptionChecksum mismatch on read; per-segment CRC64Affected segment returns corrupt results; other segments unaffectedRestore segment from replica; if all replicas corrupt, rebuild from WAL or document store
In-memory buffer OOMJVM/process heap exhaustionWrite ingestion stops; in-memory documents lost if not flushedDouble-buffer with disk spill; WAL replay on restart; docs re-ingested from Kafka at-least-once
Segment merge stormCPU > 80%, I/O saturation, merge queue depth > 100Query latency spikes; segment count grows uncheckedThrottle merge I/O rate; pause low-priority merges; add read replicas to absorb query load
Shard map stalenessCoordinator routes to wrong shard; queries return empty resultsMissing documents in results; false negativesShard map versioning with monotonic epoch; coordinator validates epoch on every response
Replica lag under write surgeReplication lag monitor > 5sQueries to lagging replica return stale resultsCircuit-break lagging replicas; route queries only to primary until lag clears
Watch Out

The most common operational failure mode is the “segment merge storm”: when a system falls behind on merging (due to a traffic spike or a maintenance window), it accumulates hundreds of small segments. When the system returns to normal load, the merge policy tries to catch up by running aggressive parallel merges, consuming all I/O bandwidth and causing query latency to spike from 20ms to 2-3 seconds. Always set a maximum merge concurrency limit and a write throttle to prevent this positive feedback loop.

Comparison of Approaches

ApproachWrite LatencyQuery LatencyIndex SizeBest Fit
In-memory inverted index (single node)< 1ms< 5msLimited by RAM (~100M docs)Development, small corpora, unit tests
Lucene/Elasticsearch (NRT segments)1-5s refresh interval10-100ms at scale50-70% of raw text (compressed)General-purpose search, e-commerce, log search
Trie-based forward index + scan< 1msO(corpus) - impracticalSame as sourceAutocomplete only; never for full-text
Column-store (Druid/ClickHouse) bitmap indexSeconds to minutes (batch)5-50ms for aggregations10-30% of raw textAnalytics queries, time-series filtering, not ranked search
Hybrid: dense index + re-rank (BM25 + neural)5-10s (need embedding)50-200ms2x (store both index types)Semantic search with relevance fallback
Dedicated search SaaS (Algolia, Typesense)< 1s< 10msManagedHigh-QPS, latency-sensitive, ops burden traded for cost

For a 10B document corpus at 50K QPS, the clear choice is a horizontally sharded Lucene-based architecture (Elasticsearch or OpenSearch) with NRT indexing and document sharding. The column-store approach would be competitive for analytics aggregations but cannot return ranked results efficiently. A dedicated SaaS solution is prohibitively expensive at 10B documents - Algolia’s pricing model breaks down well below this scale.

The most important architectural choice within the Lucene approach is the merge policy: an aggressive merge policy (fewer segments, better query performance) trades against write throughput and merge I/O. For a write-heavy corpus (news, social media), prefer a tiered policy with high merge factors. For a read-heavy corpus (stable product catalog), prefer a level-based policy that minimizes segment count at the cost of more frequent merges.

Key Takeaways

  • Inverted index structure: The mapping from terms to sorted posting lists is the foundational data structure - query time becomes a function of result set size, not corpus size.
  • Delta + variable-byte compression: Reduces posting list size by 3x, directly improving cache hit rates and I/O throughput at query time.
  • NRT indexing via in-memory segments: New documents land in a RAM buffer and become searchable within seconds; immutability of flushed segments eliminates write-read locking.
  • Tiered segment merging: Keeps segment count bounded (typically < 20 segments per shard) to maintain consistent query latency; left unchecked, segment proliferation is the #1 cause of query latency degradation.
  • BM25 with global stats: Document frequency must be computed across the entire corpus, not just the local shard - global_term_stats caching is mandatory for correct cross-shard ranking.
  • Document sharding over term sharding: Allows all shards to execute the full query in parallel (scatter-gather), whereas term sharding requires serial term lookups across shards.
  • Gallop search in DAAT intersection: Exponential skip search reduces the cost of intersecting imbalanced posting lists from O(n) to O(log n) per skip, making AND queries on rare terms practical.
  • Roaring Bitmaps for deletes: Provide O(1) membership tests and SIMD-accelerated filtering with 10-100x better memory efficiency than naive bitsets for sparse deletion sets.

The counter-intuitive lesson from building full-text search at scale is that the hardest problem is not the index structure itself - it is keeping the segment count bounded under mixed read-write load. A system that builds a beautiful inverted index but accumulates 500 small segments will serve queries 50x slower than a simpler system with 10 well-merged segments. The merge policy is the operational heart of a search engine, and getting it wrong feels like a gradual performance mystery rather than an obvious failure.

Frequently Asked Questions

Q: Why use BM25 instead of TF-IDF or a neural ranking model?

A: TF-IDF does not apply document length normalization - longer documents score higher simply by having more term occurrences, which is a bias. BM25 fixes this with the length normalization factor b. Neural ranking models (BERT-based cross-encoders) produce better relevance but require 100-1000ms per document pair, making them impractical for first-stage retrieval over 10B documents. The standard production pattern is BM25 for first-stage retrieval (100ms, returns top-1000 candidates) followed by a neural re-ranker on the top-K subset (50ms for 100 candidates).

Q: Why not use PostgreSQL’s full-text search for this?

A: PostgreSQL’s tsvector/tsquery uses an inverted index (GIN index) and is excellent up to roughly 50M documents on a single node. Beyond that, it cannot be sharded natively, lacks NRT indexing (reindex blocks the table), and its posting list compression is less aggressive than Lucene’s PFOR encoding. At 10B documents you need a distributed architecture that PostgreSQL does not provide out of the box.

Q: How do you handle queries with very common terms like “the” or “is”?

A: Two approaches. First, stopword removal during analysis eliminates terms with near-zero discriminating power from both the index and queries. Second, for terms that are indexed but have very high document frequency (> 20% of corpus), their IDF score approaches zero, contributing negligible weight to BM25 scores. The DAAT intersection logic can apply a minimum score threshold early: if adding a high-df term to the intersection cannot change the top-K results, skip it entirely using the WAND (Weak AND) algorithm.

Q: How does near-real-time indexing interact with BM25 global statistics?

A: It introduces a staleness window. Global term stats (document frequencies) are updated asynchronously by a stats aggregator that periodically polls all shards. If you index 1M new documents about “Rust programming”, the IDF for “rust” temporarily appears inflated because the global doc_freq has not yet caught up. In practice, this staleness (typically 30-300 seconds) has negligible impact on ranking quality - BM25 scores change logarithmically with document frequency, so a 10% change in df causes less than 1% change in score.

Q: What is the right shard count for 10B documents?

A: The target is 1-2 billion documents per shard, giving each shard node a manageable 600-700GB index size that fits on a single NVMe drive with room for OS page cache. With 10 shards at 1B docs each, a 10-node cluster handles 50K QPS. As the corpus grows to 20B documents, you add 10 more shards (rolling re-shard using consistent hashing to minimize data movement).

Q: Can you use SSDs or must the index live in RAM for 100ms latency?

A: Modern NVMe SSDs (1-2M IOPS, 10-50us random read latency) are fast enough for posting list access, provided the hot tail of the query term distribution stays in the OS page cache. A 128GB RAM server with 64GB allocated to page cache can keep the top 30% most-queried posting lists in memory, giving effectively RAM speeds for those terms. The remaining 70% of queries hit NVMe at 50-100us per read, which is still well within the 100ms budget for a 3-5 term query.

Interview Questions

Q: Walk me through how you would shard an inverted index for 10 billion documents, and what tradeoffs you’d consider between document sharding and term sharding.

Expected depth: Explain that document sharding assigns each document to one shard (uniform via hash(doc_id)), enabling parallel query execution via scatter-gather. Term sharding assigns each term’s posting list to one shard, which avoids scatter-gather but requires one round-trip per query term (serial or parallel with N shards for N terms). Discuss global statistics for BM25: document sharding complicates IDF because doc_freq must be aggregated across shards. Cover re-sharding cost and consistent hashing to minimize data movement. Mention Elasticsearch uses document sharding.

Q: How would you design the segment merge policy to maintain sub-100ms query latency under a sustained write load of 100,000 documents per second?

Expected depth: Explain the tension between merge frequency (fewer segments = faster queries) and write amplification (merging consumes I/O that competes with ingestion). Describe tiered merge: group segments by size into tiers (10MB, 100MB, 1GB, 10GB), merge within a tier when segment count exceeds threshold. Discuss merge throttling (max merge I/O rate), double-buffering for in-memory segments, and the “merge storm” failure mode. Mention that Elasticsearch’s index.merge.scheduler.max_thread_count controls merge parallelism.

Q: Explain how BM25 scoring works across multiple shards, specifically how you handle the global document frequency (IDF) calculation.

Expected depth: BM25 IDF requires knowing the total number of documents in the corpus and the number of documents containing each term globally. With 10 shards, each shard only knows its local stats. Solutions: (1) periodic global stat aggregation - each shard reports its per-term doc_freqs to a stats service, which the coordinator caches; (2) approximate IDF using local stats (good enough for most queries, biased for rare terms that cluster in one shard). Discuss staleness window and its negligible ranking impact due to log-scale IDF. Mention Elasticsearch’s search_type=dfs_query_then_fetch which does a preliminary stats collection pass.

Q: How would you implement near-real-time indexing such that documents are searchable within 5 seconds of ingestion, without blocking query performance?

Expected depth: Describe the NRT segment model: documents land in an in-memory hash index (immediately searchable via in-memory scan), then the segment flusher writes them to a compressed disk segment at 1-5 second intervals. Queries must check both in-memory and disk segments. Cover the refresh/reopen mechanism (Lucene’s IndexReader.openIfChanged), the cost of the in-memory scan (acceptable because the buffer is small), and delete-then-reindex for updates. Mention that the refresh interval is the primary lever controlling NRT latency vs. indexing throughput.

Q: Describe the DAAT (Document-at-a-Time) algorithm and explain why it outperforms TAAT (Term-at-a-Time) for AND queries on large posting lists.

Expected depth: TAAT processes one term at a time - retrieve all docs for term 1, all docs for term 2, then intersect. For a query like “A AND B” where A has 1M docs and B has 100 docs, TAAT reads all 1M docs for A before discovering only 100 match B. DAAT advances all posting list iterators in lock-step, using gallop search to skip non-matching docIDs: when the B iterator is at docID 500, the A iterator jumps exponentially (1, 2, 4, 8…) to find 500. DAAT reads only as many entries as there are actual matches, making it O(|result set| * log |smallest list|) instead of O(sum of all list sizes).

Premium Content

Unlock the full article along with everything else in the archive — all in one place.

In-depth analysis Expert insights Full archive access
Unlock Full Article