Chapter 3: Consensus

In a distributed system, multiple servers must often agree on a shared state — who is the leader, what entries are in the log, whether a transaction should commit. Reaching this agreement in the presence of failures is the problem of consensus. Consensus is one of the most fundamental and challenging problems in distributed computing, and getting it right is essential for building reliable planetary scale computers.

The difficulty arises from the nature of distributed systems: messages can be delayed, reordered, or lost; servers can crash and restart; and there is no global clock. Despite these challenges, consensus algorithms allow a group of servers (called an ensemble or cluster) to behave as a single coherent unit, even when some members fail.

Quorum-Based Consensus

The most widely used approach to consensus is quorum-based voting. The key insight is that if a majority of servers agree on a decision, then any two majorities must overlap in at least one server. This overlap ensures that decisions are not lost even when some servers fail. A system of five servers can tolerate two failures; a system of three can tolerate one.

Our implementation follows the Raft consensus algorithm, designed by Diego Ongaro and John Ousterhout for understandability. Raft divides the consensus problem into three sub-problems: leader election (choosing a single leader), log replication (the leader distributing entries to followers), and safety (ensuring that committed entries are never lost).

Roles and State

consensus/src/member.rs Every member of the ensemble is in one of three roles at any given time: leader, follower, or candidate. The leader handles all client requests and replicates log entries to followers. Followers passively accept entries from the leader. A candidate is a follower that is attempting to become the new leader.

#[derive(Copy, Clone, Debug, PartialEq)]
pub enum Role {
    Leader,
    Follower,
    Candidate,
}

pub struct Member {
    pub address: Arc<Mutex<String>>,
    pub role: Arc<Mutex<Role>>,
    pub peers: Arc<RwLock<Vec<String>>>,
    pub last_heartbeat: Arc<Mutex<Instant>>,
    pub term: Arc<Mutex<u64>>,
    pub log: Arc<RwLock<Vec<LogEntry>>>,
    pub commit_index: Arc<Mutex<usize>>,
    pub last_applied: Arc<Mutex<usize>>,
    pub state_machine: Arc<Mutex<dyn StateMachine + Send + Sync>>,
}

The Member struct contains all the state a consensus participant needs. The term is a logical clock that increases monotonically with each election — it allows members to detect stale leaders. The log is an ordered sequence of entries that all members must agree upon. The commit_index tracks how far into the log has been safely replicated to a majority, and last_applied tracks how far the state machine has consumed. The state_machine is the application-specific logic that processes committed entries.

consensus/src/lib.rs Each log entry records the term in which it was created, an action identifier, and a payload. The StateMachine trait defines how the application processes committed entries:

#[derive(Serializable, Deserializable, Clone, Debug)]
pub struct LogEntry {
    term: u64,
    action: u32,
    payload: String,
}

#[async_trait]
pub trait StateMachine {
    async fn apply(&mut self, action: u32, payload: String);
    async fn handle(&mut self, request: Request) -> Response;
}

The Main Loop

A member's lifecycle is a loop that switches behavior based on its current role. On startup, a member joins the ensemble by contacting the existing leader (discovered via the discovery service) or, if no leader exists, initializing a new ensemble as the first leader:

pub async fn run(&self) {
    self.join_ensemble().await;

    loop {
        let current_role = {
            let role_lock = self.role.lock().await;
            *role_lock
        };

        match current_role {
            Role::Follower => self.run_as_follower().await,
            Role::Leader => self.run_as_leader().await,
            Role::Candidate => self.become_candidate().await,
        }
    }
}

Leader Election

consensus/src/follower.rs A follower monitors heartbeats from the leader. If no heartbeat arrives within a randomized timeout (1500–3000 milliseconds), the follower assumes the leader has failed and transitions to the candidate role to start an election:

pub async fn run_as_follower(&self) {
    loop {
        let timeout = rand::thread_rng().gen_range(1500..3000);
        tokio::time::sleep(Duration::from_millis(timeout)).await;

        let last_heartbeat = *self.last_heartbeat.lock().await;
        if last_heartbeat.elapsed() >= Duration::from_millis(timeout as u64) {
            warn!("Too long since last heartbeat from leader");
            self.become_candidate().await;
            break;
        }
    }
}

The randomized timeout is critical. If all followers used the same timeout, they would all start elections simultaneously, splitting votes and preventing any candidate from winning. Randomization ensures that in most cases, a single follower times out first and wins the election before others start their own.

consensus/src/candidate.rs A candidate increments its term (to distinguish this election from previous ones), votes for itself, and requests votes from all peers. If it receives votes from a majority, it becomes the leader:

pub async fn become_candidate(&self) {
    let mut term = self.term.lock().await;
    *term += 1;
    drop(term);

    *self.role.lock().await = Role::Candidate;
    self.start_election().await;
}

async fn start_election(&self) {
    let mut votes = 1; // Start with 1 vote for self
    let term = *self.term.lock().await;
    let needed_votes = (self.peers.read().await.len() / 2) + 1;

    let peers = self.peers.read().await.clone();
    for peer in peers.iter() {
        if self.request_vote(peer, term).await {
            votes += 1;
        }
    }

    if *self.role.lock().await != Role::Candidate {
        return; // Role changed during election
    }

    if votes >= needed_votes {
        *self.role.lock().await = Role::Leader;
    }
}

The majority requirement ((peers.len() / 2) + 1) is the heart of quorum-based consensus. In a five-member ensemble, three votes are needed. This guarantees that two concurrent elections cannot both succeed, because their majorities would need to overlap.

Log Replication

consensus/src/leader.rs Once elected, the leader registers with the discovery service so that clients and new members can find it. It then sends periodic heartbeats to followers by notifying them of the current peer list and term. These heartbeats serve double duty: they prevent followers from starting unnecessary elections and they keep the ensemble's peer lists synchronized:

pub async fn run_as_leader(&self) {
    let address = self.address.lock().await.clone();
    discovery::register(ENSEMBLE_NAME.to_string(), address.to_string());

    let heartbeat_interval = Duration::from_millis(1500);
    let mut interval = time::interval(heartbeat_interval);
    loop {
        interval.tick().await;
        self.notify_members_of_peers().await;
    }
}

When a client submits a new entry, the leader appends it to its own log and then replicates the entry to all followers. The entry is only considered committed once a majority of members have acknowledged it. If consensus cannot be reached, the entry is rolled back:

pub async fn handle_append_entry(&self, args: AppendEntryArgs) -> String {
    if *self.role.lock().await != Role::Leader {
        return "Member is not leader and cannot append entries.".to_string();
    }

    let entry = LogEntry {
        term: *self.term.lock().await,
        action: args.action,
        payload: args.payload,
    };

    let mut log = self.log.write().await;
    log.push(entry.clone());

    match self.replicate_entry_to_followers(&entry).await {
        Ok(_) => {
            let mut commit_index = self.commit_index.lock().await;
            *commit_index = log.len();
            format!("Appended entry: {:?}", entry)
        }
        Err(e) => {
            log.pop(); // Roll back
            e
        }
    }
}

The replication process contacts each follower and counts acknowledgments. The entry is committed only if a majority respond positively:

async fn replicate_entry_to_followers(
    &self, entry: &LogEntry,
) -> Result<(), String> {
    let mut acks = 1; // Self-vote
    let peers = self.peers.read().await;
    for peer in peers.iter() {
        if self.replicate_entry(peer, entry).await {
            acks += 1;
        }
    }

    let majority = (peers.len() / 2) + 1;
    if acks >= majority {
        Ok(())
    } else {
        Err("Failed to achieve consensus".to_string())
    }
}

consensus/src/follower.rs When a follower receives a replicated entry, it appends the entry to its log, updates its commit index, and applies any newly committed entries to its local state machine:

pub async fn handle_replicate_entry(
    &self, args: ReplicateEntryArgs,
) -> String {
    if *self.role.lock().await != Role::Follower {
        return "Member is not follower".to_string();
    }

    let mut log = self.log.write().await;
    log.push(args.entry);

    *self.commit_index.lock().await = args.commit_index;

    let last_applied = *self.last_applied.lock().await;
    for entry in last_applied..args.commit_index {
        let log_entry = self.log.read().await[entry].clone();
        self.state_machine.lock().await
            .apply(log_entry.action, log_entry.payload).await;
    }

    ReplicateEntryResponse { ack: true }.serialize()
}

Application: Distributed Locking

Consensus becomes truly powerful when applied to a specific domain. A distributed lock service demonstrates this: multiple clients need to coordinate exclusive access to shared resources, and the lock state must be consistent across all servers even when failures occur.

locking_v0/src/main.rs The locking service builds on the consensus protocol. Each server maintains a map of locks, and lock operations (acquire and release) are recorded as log entries. Because the log is replicated through consensus, all servers agree on which locks are held and by whom:

struct Lock {
    is_locked: bool,
    owner: Option<String>,
}

struct Server {
    term: Arc<Mutex<i32>>,
    role: Arc<Mutex<Role>>,
    log: Arc<RwLock<Vec<LogEntry>>>,
    peers: Arc<RwLock<Vec<String>>>,
    locks: Arc<Mutex<HashMap<String, Lock>>>,
    commit_index: Arc<Mutex<usize>>,
    last_applied: Arc<Mutex<usize>>,
    // ...
}

When a client requests a lock, the server checks whether it is available and, if so, appends an acquire entry to the consensus log. The entry is only applied after it has been replicated to a majority of servers:

async fn handle_acquire(&self, args: AcquireArgs) -> String {
    let lock_id = &args.lock_id;
    let mut locks = self.locks.lock().await;
    let lock = locks.entry(lock_id.to_string()).or_insert(Lock {
        is_locked: false,
        owner: None,
    });

    if lock.is_locked {
        return "Lock is already acquired".to_string();
    } else {
        lock.is_locked = true;
        lock.owner = Some(args.owner.to_string());
        let entry = LogEntry {
            term: *self.term.lock().await,
            lock_id: lock_id.to_string(),
            action: LOCK_ACQUIRE,
            member_id: self.address.lock().await.to_string(),
        };
        self.log.write().await.push(entry);
        return "Lock acquired".to_string();
    }
}

New members joining the ensemble must catch up on the current lock state before they can participate. They do this by fetching the committed log from the leader and replaying it locally:

async fn join_ensemble(&self) {
    *self.is_full_member.lock().await = false;
    self.catch_up_log().await;
    let address = self.address.lock().await;
    let _ = self.request_full_membership(address.to_string()).await;
}

Design Discussion

Consensus algorithms like Raft make strong guarantees: once an entry is committed, it will not be lost even if a minority of servers fail. These guarantees come at a cost. Every write must be replicated to a majority of servers before it can be acknowledged, adding latency proportional to the slowest server in the majority. Read operations must also be linearized (either by routing through the leader or using read leases) to prevent stale reads.

The choice of timeouts is critical for system behavior. The election timeout must be long enough that normal heartbeat delays do not trigger spurious elections, but short enough that actual leader failures are detected promptly. Our implementation uses 1500–3000 milliseconds for elections and 1500 milliseconds for heartbeats, suitable for a local network. A geographically distributed system might use timeouts of several seconds.

The StateMachine trait provides a clean separation between the consensus protocol and the application logic. Any application that can express its state changes as a sequence of (action, payload) pairs can be built on top of this consensus implementation: key-value stores, configuration services, lock managers, and coordination services.

In production, consensus-based systems like etcd, ZooKeeper, and Consul form the backbone of distributed coordination. They provide the primitives — leader election, distributed locking, configuration management, and service discovery — upon which larger systems are built.