Chapter 22: Consistency

When data is replicated across multiple servers, a fundamental question arises: what guarantees do readers have about the data they see? Consistency is the set of rules that govern the relationship between writes and subsequent reads in a distributed system.

Storage Quorum Replication

Our storage service runs as three replicas (N=3), each maintaining its own WAL and snapshot. Every value is tagged with a monotonically increasing version number:

See storage/src/engine.rs for versioning.
struct VersionedValue {
    value: String,
    version: u64,
}

// put assigns the next version
pub fn put(&mut self, key: String, value: String) -> u64 {
    let version = self.next_version;
    self.next_version += 1;
    self.append_wal(&format!("VPUT {}={}@{}", key, value, version));
    self.data.insert(key, VersionedValue { value, version });
    version
}

Replication Interface

storage/src/lib.rs The replication interface uses separate procedures that bypass the quorum logic, preventing cascading replication. A GET_PEERS procedure exposes the current replication topology:

pub const REPLICATE_PUT_PROCEDURE: ProcedureId = 5;
pub const REPLICATE_DELETE_PROCEDURE: ProcedureId = 6;
pub const GET_PEERS_PROCEDURE: ProcedureId = 7;

#[derive(Debug, Serializable, Deserializable)]
pub struct ReplicatePutArgs {
    pub key: String,
    pub value: String,
    pub version: i32,
}

#[derive(Debug, Serializable, Deserializable)]
pub struct GetPeersResult {
    pub peer_count: i32,
    pub quorum_w: i32,
    pub quorum_r: i32,
}

Each storage replica maintains its own data directory, derived from its port: storage_data_10600, storage_data_10601, storage_data_10602. This ensures replicas do not share WAL or snapshot files, which would corrupt the versioning guarantees. The scheduler assigns each instance a unique port, and each instance self-registers with discovery, making all three replicas discoverable via discovery::list("storage").

Quorum writes require W acks before returning to the client. With W=2 and N=3, the writing node performs a local write (which counts as one ack), then replicates to peers via discovery::list("storage") and waits for one additional peer ack:

See storage/src/main.rs for the quorum write path.
// Local write counts as 1
let version = engine.put(key.clone(), value.clone());

// Replicate to W-1 peers
let peers = discovery::list("storage").await;
let mut acks = 0;
for peer in &peers {
    if acks >= W - 1 { break; }
    let result = replicate_put(peer, key, value, version).await;
    if result == "OK" { acks += 1; }
}

Quorum reads work symmetrically: with R=2, the reading node reads locally and from one peer, returning the value with the highest version. The key invariant is W + R > N (2 + 2 > 3), which guarantees that any read quorum overlaps with any write quorum — at least one node in the read set has the latest write.

Version Gating

storage/src/engine.rs When a replication message arrives, the receiving node must decide whether to accept it. The put_versioned method implements last-writer-wins: a write is accepted only if its version is at least as recent as the current value. This prevents old replication messages (delayed by the network) from overwriting newer data:

pub fn put_versioned(
    &mut self, key: String, value: String, version: u64,
) -> bool {
    if let Some(current) = self.data.get(&key) {
        if version < current.version {
            return false; // Reject stale write
        }
    }
    self.append_wal(&format!("VPUT {}={}@{}", key, value, version));
    self.data.insert(key, VersionedValue { value, version });
    true
}

This version check is the core of conflict resolution. Because versions are monotonically increasing and assigned by the originating node, two concurrent writes to different nodes will have different versions, and the higher version always wins. This is a practical form of last-writer-wins that resolves conflicts without coordination.

Quorum Read Path

storage/src/main.rs A quorum read contacts R−1 peers (since the local read counts as one) and returns the value with the highest version found. The code filters out the reading node itself using discovery::list:

async fn get_peers(own_addr: &str) -> Vec<String> {
    let result = discovery::list(SYSTEM_NAME.to_string()).await;
    result.addresses.split(';')
        .filter(|s| !s.is_empty() && *s != own_addr)
        .map(|s| s.to_string())
        .collect()
}

// In the GET handler:
let local = engine.get_versioned(&key);
let mut best_value = local.map(|v| v.value.clone()).unwrap_or_default();
let mut best_found = if local.is_some() { 1 } else { 0 };

let peers = get_peers(&own_addr).await;
let needed = (quorum_r - 1) as usize;
let mut acks = 0;
for peer in &peers {
    if acks >= needed { break; }
    let result = storage::remote_get(peer, key.clone()).await;
    acks += 1;
    if result.found == 1 && best_found == 0 {
        best_value = result.value.clone();
        best_found = 1;
    }
}

The get_peers pattern — calling discovery::list and filtering out self — appears in both storage and caching. Each replica is both a server (handling client requests with quorum logic) and a peer (accepting replication requests directly).

Cache Consistency Modes

While storage uses fixed quorum parameters, our caching service supports three configurable consistency modes, switchable at runtime:

See caching/src/main.rs for mode implementation.

Eventual Consistency

The default mode. Writes go to the local cache and are replicated asynchronously in a background task. Reads serve only from the local cache. This provides the lowest latency but may serve stale data if a write has reached some replicas but not others.

// Eventual: fire-and-forget replication
tokio::spawn(async move {
    let peers = get_peers(&own_addr).await;
    for peer in &peers {
        let _ = replicate_set(peer, key, value, ttl, version).await;
    }
});

Quorum Consistency

Writes wait for one peer ack (W=2 of N=3) before returning. Reads fetch from one peer in addition to the local cache (R=2 of N=3), returning the highest-versioned value. Since W + R > N, quorum mode guarantees that reads always see the latest completed write:

// Quorum: wait for 1 peer ack (W=2 of N=3)
let peers = get_peers(&own_addr).await;
let mut acks = 0;
for peer in &peers {
    if acks >= 1 { break; }
    let result = caching::replicate_set(
        peer, key.clone(), value.clone(), ttl_secs, version,
    ).await;
    if !result.starts_with("ERROR") {
        acks += 1;
    }
}

Strong Consistency

Writes wait for all peer acks (W=N) before returning. Reads query all peers (R=N). This provides the strongest guarantees but the highest latency, since every operation must contact every replica. A single slow or unavailable replica blocks the entire operation:

// Strong: wait for ALL peer acks (W=N)
let peers = get_peers(&own_addr).await;
for peer in &peers {
    let _ = caching::replicate_set(
        peer, key.clone(), value.clone(), ttl_secs, version,
    ).await;
}

Runtime Mode Switching

caching/src/lib.rs The consistency mode can be changed at runtime via the MODE_PROCEDURE (id=7). Sending an empty mode string returns the current mode; sending a mode name switches to it. This allows operators to dynamically trade off between latency and consistency without restarting the service:

pub const MODE_PROCEDURE: ProcedureId = 7;

#[derive(Debug, Serializable, Deserializable)]
pub struct ModeArgs {
    pub mode: String,
}

#[derive(Debug, Serializable, Deserializable)]
pub struct ModeResult {
    pub mode: String,
}

The consistency dashboard uses this procedure to display and control the mode. In practice, an operator might switch to strong consistency during a data migration, then return to eventual mode for normal operation.

Trade-offs

The CAP theorem formalizes the fundamental tension: a distributed system can provide at most two of three properties — Consistency, Availability, and Partition tolerance. Since network partitions are unavoidable, the practical choice is between consistency (reject requests during partitions) and availability (serve potentially stale data during partitions).

Our system makes this trade-off configurable at two levels. Storage defaults to quorum consistency (W=2, R=2) because durable data demands stronger guarantees. Caching defaults to eventual consistency because stale cache entries are merely a performance issue, not a correctness issue — the backing store remains the source of truth.

The quorum parameters W and R can be tuned independently. Setting R=1 (read from local only) maximizes read speed at the cost of potentially stale reads. Setting W=3 (write to all replicas) maximizes write durability but makes writes vulnerable to a single slow replica. The invariant W + R > N must hold for reads to always see the latest write.

Version-based conflict resolution has an important limitation: it resolves conflicts by picking the highest version, which means one write always “wins” and the other is silently discarded. For simple key-value stores this is acceptable, but applications that need to merge concurrent updates (like collaborative editors) require more sophisticated techniques like CRDTs (Conflict-free Replicated Data Types) or application-level merge functions.

Visit the consistency dashboard to change modes and observe the effect on latency and behavior.

Cross-region latency makes these consistency trade-offs even more consequential. When peers span continents, strong consistency may add hundreds of milliseconds to every operation. Chapter 24: Geo Replication examines how our system uses local quorum for fast writes and asynchronous replication for cross-region consistency.