Chapter 1: Systems

If you have used the Internet, then you have interacted with computer systems. A computer system is a collection of software and hardware that is used to perform some work. Unlike a computer program, which may run on a single piece of hardware, a computer system is spread out — across multiple programs, devices, servers, and even geographic locations.

Computer systems come in many shapes and sizes. You may already be familiar with some common systems that run on single computers, such as operating systems and file storage systems. Systems are the glue that hold together different hardware and software components. This property of computer systems is fundamental: A computer system manages a set of components.

Systems are the primary building block for constructing planetary scale computers. A key feature of systems is that they are composable via some standard interface. A well-designed system behaves for a collection of hardware and software like an abstracted and encapsulated function behaves for a program. The primary goal of any system is to provide a reusable way to accomplish some task.

Architectures

Because computer systems provide well-defined interfaces that expose a system's functionality, humans have come up with different ways to relate one system to another. If systems were hollow blocks in the physical world, we would only have so many ways to put the blocks in relation to one another. We could put some blocks inside others, beside others, or on top of others.

Monolithic

A monolithic system architecture is like one big system that contains other systems inside of it. The one big system provides as many (or as few) interfaces to the systems it contains as needed. The big system “ties together” all of the other systems. A web app that exposes all of its features into different HTTP endpoints is an example of a monolithic system.

Monolithic systems have the advantage of avoiding communication with other systems in order to get their work done because all of the functionality they need is bundled in the same binary. A disadvantage of a monolithic system is that in order to update a small component of the system, you need to build and release a new version of the entire system.

You might choose a monolithic system when you need to avoid communication between components, when your binary can fit within the resources of a single server (but can be distributed across many servers), and if your build and release process can handle the volume of changes to components of the monolithic binary. With the correct build and release process, a monolithic approach can support many hundreds of features and thousands of changes per day.

Micro

Micro systems are like smaller blocks that sit beside one another. Each micro system performs a specific, encapsulated task with a relatively simple interface. A system that stores and accesses values in persistent storage is an example of a micro system. Whereas monolithic systems exist as a single binary with all functionality sharing the same address space, micro systems exist as separate binaries and run as separate processes.

Separating functionality into separate binaries comes with trade-offs. Different micro systems can be implemented in separate programming languages, allowing for greater development flexibility. Micro systems can be updated and released individually, increasing overall system availability. In addition, micro systems can be deployed across completely separate devices.

Operation becomes more complex with micro systems. A micro system must communicate with other micro systems whose functionality it uses, creating dependencies. Because micro systems may not be physically situated close to one another, latency becomes a key concern. Testing the interactions between sets of micro services must be done carefully so as to reproduce the characteristics of the real world.

Tiered

Tiered systems are a hybrid of monolithic and micro systems where work is divided among a small number of tiers and each tier communicates only with the tiers “above” and “below” it. For lower volumes of work and smaller numbers of contributors, tiers can achieve some of the benefits of micro services while achieving the lower operational costs associated with monolithic services.

There usually comes a point where for higher volumes of work and larger number of contributors, the disadvantages of monolithic services appear for tiered services. At first, additional tiers can be added by factoring out distinct work into its own tier. However, some common functionality, such as service discovery, security, and privacy, may be required across all the tiers, requiring a micro service approach.

Communication

Systems are the building blocks of planetary scale computers. And while a single system is useful, a collection of systems is truly powerful. To allow one system to harness the utility of another system, the systems must interact with one another. We usually call a system that relies on another system the client and the system that is relied on the server. There exist several common ways to handle communication between a client and a server.

Shared Libraries

A shared library allows for one system to run within the same process and address space as another system. The shared library implements some functionality which a system designer can utilize as if the shared library were a part of the system itself. A compiler either compiles and links an object representing the shared library at compile time or the operating system links dynamically a previously compiled object file at run time.

Shared libraries have the lowest communication overhead of any interaction method between systems — a single function call. Because an entire copy of the shared system must be included in the system that is sharing it, shared libraries can lead to binary bloat. In addition, a system must be recompiled and redeployed in order to use any new shared library features, causing shared libraries to potentially increase the burden of system operators.

It is often helpful to use shared libraries in combination with other system interaction methods. For example, if a server returns some data to a client that must be cached, instead of having every client implement its own method to cache the data, a shared library for communicating with the server can be provided that handles caching and server communication.

Inter Process

Inter process communication (IPC) relies on systems using the operating system to communicate with one another. Strictly speaking, any means of passing information between processes is a valid form of inter process communication: Files, pipes, shared memory, and sockets, to name a few. A system that runs on a device and provides some functionality to other systems on the same device via inter process communication is sometimes called a “sidecar.”

Unlike shared libraries, inter process communication allows the sidecar process to be updated independently of the process that uses its functionality. In addition, inter process communication has relatively low communication overhead compared to other forms of system interaction. A downside of inter process communication is that each sidecar contends for compute, memory, storage, network, and other resources on the devices it runs on.

It is usually best to have only a small number of widely used sidecars deployed on devices. Because sidecars limit system resources, if you choose to use sidecar systems it is important for you to keep track of their resource usage and decide on how much is too much. You may also need to promote a sidecar to a different form of interaction if its resource needs become too high.

Networks

Networks use protocols and relay devices to transfer information across vast distances. As networks provide the interconnect for planetary scale computers, we will devote an entire chapter to them. Here, though, we look at how systems can interact with each other using networks. We assume an Ethernet network using a packet-based network protocol.

To communicate over a network, a server listens on a socket and waits for a client to connect. A client uses the address of a server to send packets of information to the server. If the server understands the packets, it can construct its own packets to send back to the client. This pattern repeats until either the client, the server, or the network closes the connection.

Networks unlock the power of distributed system interaction. When connected to wide area networks and the Internet, systems can communicate across the far reaches of the planet and even into space. In addition to latency (which can range from tens to hundreds of milliseconds across the planet), a downside of network interaction is that the reliability of the communication is limited by the reliability of the network and the protocol.

Normalization

Because a client may reside on a completely different operating environment (hardware, architecture, operating system, and so on) from the server it wishes to transfer information to, the client must normalize any information it sends to the server. For example, if we simply sent the bits of memory exactly as they appeared on a client's device to a server, there is no guarantee that the server would interpret the bits in the same way as the client.

To normalize the bits a client sends and a server receives, the client performs a process called serialization on any information it sends and the server performs an inverse process called deserialization to reconstruct the information. This process happens behind the scenes for the client and server.

We can implement a normalization system as a shared library. We use the proc-macro, syn, and quote crates to make it easier to introspect on the Rust syntax tree and implement procedural macros to serialize and deserialize simple structs with i32, bool, and String elements. A full implementation would handle other data types and escaped delimiters.

normalization/src/lib.rs We start by defining the Serializable and Deserializable traits to annotate structs that support normalization. The traits provide functions that allow a struct to be converted to and from a String representation. We also define an enum for any errors we encounter during serialization and deserialization.

#[derive(Debug, PartialEq)]
pub enum NormalizationError {
    MissingField,
    InvalidFormat,
    ParseFailure,
}

pub trait Serializable {
    fn serialize(&self) -> String;
}

pub trait Deserializable: Sized {
    fn deserialize(input: &str) -> Result<Self, NormalizationError>;
}

normalization/normalization-macros/src/lib.rs We then implement a serialization function for simple structs. The first thing we do is define a function that takes a Rust struct and, for each of its fields, depending on the type of the field, converts the value of the field to a string representation. We handle String types separately in order to escape special characters.

fn generate_serialization_for_type(
    field_name: &Option<syn::Ident>,
    field_type: &TypePath,
) -> proc_macro2::TokenStream {
    if field_type.path.is_ident("i32") || field_type.path.is_ident("bool") {
        quote! { format!("{}: {}", stringify!(#field_name), self.#field_name) }
    } else if field_type.path.is_ident("String") {
        quote! {
            format!(
                "{}: \"{}\"",
                stringify!(#field_name),
                self.#field_name.replace("\\", "\\\\").replace(":", "\\:")
                    .replace("\"", "\\\"").replace(",", "\\,")
            )
        }
    } else {
        panic!("Unsupported type!");
    }
}

At last, we get to the heart of the serialization routine. We define a procedural macro that implements the serialize function, which converts the fields of a struct to a serialized string using the generate_serialization_for_type function. The Rust compiler calls this function on structures derived from the Serializable trait.

#[proc_macro_derive(Serializable)]
pub fn derive_serializable(input: TokenStream) -> TokenStream {
    let input = syn::parse_macro_input!(input as DeriveInput);
    let name = &input.ident;

    let gen = match &input.data {
        Data::Struct(DataStruct {
            fields: Fields::Named(FieldsNamed { named, .. }),
            ..
        }) => {
            let serialization_logic = named.iter().map(|f| {
                let field_name = &f.ident;
                match &f.ty {
                    Path(type_path) =>
                        generate_serialization_for_type(field_name, type_path),
                    _ => panic!("Unsupported type!"),
                }
            });

            quote! {
                impl #name {
                    pub fn serialize(&self) -> String {
                        let parts = vec![#(#serialization_logic),*];
                        format!("{{{}}}", parts.join(","))
                    }
                }
            }
        }
        _ => panic!("Only named structs are supported!"),
    };

    gen.into()
}

The deserialization macro follows a similar pattern in reverse: it parses a serialized String, splits it into key–value pairs (taking care to handle escaped commas and colons), and reconstructs each field of the struct according to its type.

normalization/src/lib.rs To use the normalization library, we simply use the traits and declare a struct as derived from them.

pub use normalization_macros::{Serializable, Deserializable};

#[derive(Serializable, Deserializable)]
pub struct Sample {
    pub number: i32,
    pub flag: bool,
    pub text: String,
}

normalization/tests/tests.rs Later on, when we want to use the derived traits, we simply call the serialize or deserialize function to normalize our struct to and from a String representation.

use normalization::Sample;

#[test]
fn test_serialization() {
    let sample = Sample {
        number: 5,
        flag: true,
        text: "Hello".to_string(),
    };
    let serialized = sample.serialize();
    assert_eq!(serialized, "{number: 5,flag: true,text: \"Hello\"}");
}

#[test]
fn test_deserialization() {
    let serialized = "{number: 5,flag: true,text: \"Hello\"}";
    let deserialized = Sample::deserialize(serialized);

    match deserialized {
        Ok(sample) => {
            assert_eq!(sample.number, 5);
            assert_eq!(sample.flag, true);
            assert_eq!(sample.text, "Hello".to_string());
        }
        Err(e) => panic!("Deserialization failed with error: {:?}", e),
    }
}

Now that we can normalize the representation of structs, we can share them easily between systems. We next turn to an abstract interface for how one system can use another system to do work.

Remote Procedure Calls

Remote procedure calls (RPCs) provide an elegant wrapper around network communication. The goal of remote procedure calls is to provide a function-like interface between systems that may be distributed across vast distances. This interface achieves the distribution benefits of network communication with the programmability benefits of shared libraries.

Remote procedure calls were proposed by Andrew Nelson in 1981 and later implemented by Birrell and Nelson in 1984. At a high level, they consist of an interface that specifies how to transfer information between systems in a way that abstracts the fact that those systems may reside on separate devices far away from each other.

The remote procedure call interface is translated into code that performs serialization on the client, network transfer of a request from the client to a server, deserialization on the server, work on the server, serialization on the server, network transfer of a response from the server back to the client, and deserialization on the client.

Remote procedure calls are a fundamental aspect of planetary scale computers. If networks are the interconnect of planetary scale computers, remote procedure calls are the bus protocol. We can use the normalization system to perform serialization and deserialization between the client and the server. We use the tokio library to handle network communication since the details of network access are not the focus of this book.

rpc/src/lib.rs We begin by defining the interface between the remote procedure call client and server. The client sends a request to a server, specifying a number identifying the procedure it would like the server to execute as well as a payload containing data from the client. The server processes the request and sends a response back to the client containing a payload with the result of the request.

pub mod client;
pub mod server;

pub type ProcedureId = i32;
pub type Payload = String;

#[derive(Debug, Clone)]
pub struct Request {
    pub procedure_id: ProcedureId,
    pub payload: Payload,
}

#[derive(Debug, Clone)]
pub struct Response {
    pub payload: Payload,
}

A note on procedure identifiers and payloads. When you want to create a new procedure, or update the payload structure of an existing procedure identifier, it is important to select a new, unique procedure identifier. This is because payload changes are not backwards compatible between a client using the new interface and a server implementing the old interface.

You also want to keep existing implementations of the old interface available on the server until you are certain that no clients are sending requests to the old interface anymore. For this reason, you should always deploy a new version of a server before deploying the corresponding new version of the client — old client requests sent to new servers can still be serviced, but new client requests sent to old servers will never be serviced.

rpc/src/server.rs A remote procedure call server listens on a socket for a request, spawns a thread to handle the request, and calls a request handler for the request. The request handler is implemented by the server. Depending on the procedure the client asks the server to perform, the server will execute different code.

pub async fn start_server(
    addr: &str,
    handler: impl Fn(Request) -> Response + Send + Sync + 'static + Clone,
) -> io::Result<()> {
    let listener = TcpListener::bind(addr).await?;

    loop {
        let (mut socket, _) = listener.accept().await?;
        let handler = handler.clone();

        tokio::spawn(async move {
            loop {
                let mut buffer = vec![0u8; 1024];
                let n = socket.read(&mut buffer).await
                    .expect("Failed to read from socket");
                let data = String::from_utf8_lossy(&buffer[..n]);

                let parts: Vec<&str> = data.splitn(2, ':').collect();
                let request = Request {
                    procedure_id: parts[0].parse().unwrap(),
                    payload: parts[1].trim().to_string(),
                };

                let response = handler(request);
                socket.write_all(response.payload.as_bytes()).await
                    .expect("Failed to write to socket");
            }
        });
    }
}

Notice that each thread keeps its network socket open, waiting for additional data (or for the socket to be closed). This allows a client to send a stream of requests to a server, without establishing network connections for each request. This reduces the latency and resources needed to handle the additional requests, but if requests are long running there is a risk that we could run out of sockets on the server.

rpc/src/client.rs A remote procedure call client connects to a server at a particular address and sends a request to execute the procedure the client identifies on the payload the client provides.

pub async fn send_request(
    server_addr: &str,
    request: Request,
) -> io::Result<Response> {
    let mut stream = TcpStream::connect(server_addr).await?;
    let serialized = format!("{}:{}\n", request.procedure_id, request.payload);
    stream.write_all(serialized.as_bytes()).await?;

    let mut buffer = vec![0u8; 1024];
    let n = stream.read(&mut buffer).await?;
    let response_data = String::from_utf8_lossy(&buffer[..n]);

    Ok(Response {
        payload: response_data.to_string(),
    })
}

Client–Server

The most common system architecture is the client–server architecture, so named because it involves a client device communicating with a server device. In this architecture, the client sends a request to a server, and the server sends a response back to the client. A client could be a command line interface, a stand alone binary, or a server that itself wishes to send a request.

To solidify our understanding of the client–server architecture, and tie together normalization and remote procedure calls, we will examine a simple echo client and server. The echo client sends a request whose payload is a string that it would like the echo server to repeat back to the client. The echo server then inspects the string and returns it as the payload of a response back to the client.

echo/src/lib.rs We start by specifying the components of the echo system in a shared library. The library provides an identity to the procedure that the client can request and that the server can perform. It also defines the structure of the request and response payloads.

use normalization::{Deserializable, NormalizationError, Serializable};
use rpc::ProcedureId;

pub const SYSTEM_NAME: &str = "echo";
pub const SYSTEM_ADDRESS: &str = "127.0.0.1:10100";
pub const ECHO_PROCEDURE: ProcedureId = 1;

#[derive(Serializable, Deserializable)]
pub struct EchoArgs {
    pub message: String,
}

echo/bin/server_v0.rs In the echo server, we wait for a client to send a request. When the server receives a request, it calls a handler function to process the request. The handler checks to see if the requested procedure is the one that the server recognizes, and if so, the server deserializes the request and places its message in a response.

fn handle_echo(args: EchoArgs) -> String {
    args.message
}

fn handler(request: Request) -> Response {
    match request.procedure_id {
        ECHO_PROCEDURE => {
            let args: EchoArgs =
                EchoArgs::deserialize(&request.payload)
                    .expect("Failed to deserialize");
            Response { payload: handle_echo(args) }
        }
        _ => Response {
            payload: "Unknown procedure".to_string(),
        },
    }
}

echo/bin/client_v0.rs The echo client is responsible for constructing a request to send to an echo server and sending the request. The client serializes the message, specifies the identity of the procedure, sends the request to the server, awaits the server's response, and prints the payload of the response.

let args = EchoArgs {
    message: "Hello RPC!".to_string(),
};
let serialized_args = args.serialize();

let request = Request {
    procedure_id: ECHO_PROCEDURE,
    payload: serialized_args,
};

let response = client::send_request(SYSTEM_ADDRESS, request)
    .await
    .expect("Failed to get response");
println!("Response: {}", response.payload);

The client–server architecture is a powerful abstraction with which to build planetary scale computers. It improves manageability by dividing the work that needs to be done into separate entities that can be more easily managed than one big entity. It encourages modularity and reusability by exposing simple interfaces with which to perform specific work. It provides isolation and scalability by allowing components of work to be run on a flexible number of threads or devices.

State

The echo system we just built was quite simplistic and did not need to manage any state among requests. However, many real systems (including the ones we will look at next) often need to manage some state in order to perform their work. There is a straightforward way to share state safely among request threads using atomic reference counters. We refine our rpc shared library with a function that allows us to share state among requests.

rpc/src/server.rs The main difference compared to start_server is the use of a generic type parameter <T> to represent the state that we wish to share among requests. We add a parameter to our function to pass the shared state and modify the type declaration of our handler to also receive the shared state. Before calling the handler, we clone the shared state so that the handler has its own copy of the shared state.

pub async fn start_server_with_state<T: Send + 'static>(
    addr: &str,
    handler: impl Fn(Request, Arc<Mutex<T>>) ->
        Pin<Box<dyn Future<Output = Response> + Send>>
        + Send + Sync + 'static + Clone,
    shared_state: Arc<Mutex<T>>,
) -> io::Result<()> {
    let listener = TcpListener::bind(addr).await?;

    loop {
        let (mut socket, _) = listener.accept().await?;
        let handler = handler.clone();
        let shared_state = shared_state.clone();

        tokio::spawn(async move {
            let mut buffer = vec![0u8; 1024];
            loop {
                match socket.read(&mut buffer).await {
                    Ok(n) if n == 0 => break,
                    Ok(n) => {
                        let data = String::from_utf8_lossy(&buffer[..n]);
                        let parts: Vec<&str> = data.splitn(2, ':').collect();
                        let request = Request {
                            procedure_id: parts[0].parse().unwrap(),
                            payload: parts[1].trim().to_string(),
                        };
                        let response = handler(request, shared_state.clone()).await;
                        if socket.write_all(response.payload.as_bytes())
                            .await.is_err() { break; }
                    }
                    Err(_) => break,
                }
            }
        });
    }
}

Transfer

The voyage of a bit of information transferred from one system to another is a fascinating journey. It involves multiple stages, potentially many thousands of miles, and avoiding hazards along the way that can prevent the successful delivery. We next trace the chronological flow of information from one system to another and the concepts and systems that are involved in doing so.

Discovery

A client must know how to discover the location of a server it wishes to communicate with. However, a server can potentially move between devices or there can be many devices that copies of the server run on — how can a client find the right server given this flux? When faced with a problem in implementing a planetary scale computer, we typically turn to solving the problem with a system.

A discovery system is responsible for taking a system that a client wants to connect to, choosing a device for a client to connect to, and responding to the client with the chosen device. Thus, a discovery system takes as an input some identifier for a system, and responds with the address of a device that the client can connect to in order to communicate with the server.

discovery/src/lib.rs We begin by defining the identifiers for the procedures that the discovery system implements. A server uses the register procedure to submit the name of the system it implements along with the address and port that the server is listening on. The query procedure allows a client to request a system by its name and receive a response indicating the address and port of a server that implements that system.

#[derive(Debug, Serializable, Deserializable)]
pub struct RegisterArgs {
    pub name: String,
    pub address: String,
}

#[derive(Debug, Serializable, Deserializable)]
pub struct QueryArgs {
    pub name: String,
}

#[derive(Debug, Serializable, Deserializable)]
pub struct QueryResult {
    pub address: String,
}

The discovery system uses shared state to keep track of the registered systems. The registry it maintains contains a mapping of system name to server addresses as well as the time when an address last registered with the discovery system. When an address for a system is requested, the registry randomly selects and returns one. A clean up function removes stale registered addresses.

#[derive(Default)]
pub struct Registry {
    registry: HashMap<Name, Vec<Address>>,
    last_ping: HashMap<Address, Instant>,
}

impl Registry {
    fn register(&mut self, name: Name, address: Address) {
        if let Some(time) = self.last_ping.get_mut(&address) {
            *time = Instant::now();
        } else {
            self.registry.entry(name)
                .or_insert_with(Vec::new)
                .push(address.clone());
            self.last_ping.insert(address, Instant::now());
        }
    }

    fn get_address(&self, name: &Name) -> Option<&Address> {
        self.registry.get(name)?.choose(&mut rand::thread_rng())
    }

    fn cleanup_stale(&mut self) {
        let now = Instant::now();
        let stale_addresses: HashSet<_> = self.last_ping.iter()
            .filter(|&(_, time)| now.duration_since(*time) > CLEANUP_DURATION)
            .map(|(address, _)| address.clone())
            .collect();

        for address in stale_addresses {
            self.last_ping.remove(&address);
            self.registry.retain(|_, v| {
                v.retain(|a| a != &address);
                !v.is_empty()
            });
        }
    }
}

The rpc shared library makes it easy to implement the logic for the discovery service. We define helper functions for processing requests based on the request's procedure identifier and handling the register and query procedures using the shared registry state. Since the registry is shared between multiple threads, a request handler function ensures that a thread receives exclusive access to the registry before processing the request.

When a server becomes available, it registers its address with the discovery system as being capable of serving requests to its corresponding system identifier. Since servers can come and go, a system must occasionally re-register itself with the discovery service. Of course, even the discovery system can become unavailable. When this happens, a server retries connecting to the discovery system with an exponentially increasing period, up to a maximum amount.

A critical question, however, is, how do services discover the discovery service in the first place? One option is to maintain a set of addresses where a client or server should always be able to reach a discovery service. Other options include using the domain name service (DNS) or multicast to locate the discovery service.

Routing

A server's availability can change over time and an available server discovered by a client at one point in time can later become unavailable. Thus, it is helpful to build a layer of abstraction between a client's discovery of servers and routing requests to those servers. We call such a service the routing service and its primary job is to get requests from a client to a server efficiently and reliably.

The routing service (which a client can initially discover using the discovery service) resolves a service identifier to an available server for the service. Unlike the discovery service, the routing service also checks the health of the set of servers for a service and only routes requests to available servers. In addition, the routing service performs connection pooling and load balancing across the set of servers for a service.

routing/src/lib.rs We base our routing system on the concept of a connection pool. Each system that a client requests from the routing service has its own pool of connections. Each connection in that pool is an active network socket to a server for the system. Reusing sockets eliminates the need for the network protocol latency and resource overheads associated with establishing connections between a client and a server.

pub struct ConnectionPool {
    system_name: String,
    pool: Arc<Mutex<VecDeque<TcpStream>>>,
    max_size: usize,
    semaphore: Arc<Semaphore>,
}

impl ConnectionPool {
    pub fn new(system_name: String, max_size: usize) -> Self {
        Self {
            system_name,
            pool: Arc::new(Mutex::new(VecDeque::new())),
            max_size,
            semaphore: Arc::new(Semaphore::new(max_size)),
        }
    }

    pub async fn get(&self) -> Option<TcpStream> {
        self.semaphore.acquire().await
            .expect("Unable to acquire permit").forget();
        let mut pool = self.pool.lock().await;
        let mut conn = pool.pop_front();
        if conn.is_none() {
            let address = discovery::query(self.system_name.clone()).await;
            conn = TcpStream::connect(&address.address).await.ok();
        }
        conn
    }

    pub async fn release(&self, conn: TcpStream) {
        let mut pool = self.pool.lock().await;
        if pool.len() < self.max_size {
            pool.push_back(conn);
        }
        self.semaphore.add_permits(1);
    }
}

The routing system can run in two modes: As a proxy system and as a shared library. The proxy system version uses a remote procedure call interface to route requests between clients and servers, with the connection pool maintained on the routing server. The shared library version performs all of the routing functionality on a client, including managing the connection pool.

routing/src/lib.rs The shared library performs similar functionality to the proxy system, but within a structure that a client instantiates.

pub struct Router {
    pools: Arc<Mutex<HashMap<String, ConnectionPool>>>,
}

impl Router {
    pub fn new() -> Self {
        Self {
            pools: Arc::new(Mutex::new(HashMap::new())),
        }
    }

    pub async fn send_request(
        &self,
        name: String,
        procedure_id: ProcedureId,
        payload: &str,
    ) -> Response {
        let mut pools = self.pools.lock().await;
        let pool = pools.entry(name.clone())
            .or_insert_with(|| ConnectionPool::new(name.clone(), 10));
        route_request(
            RouteArgs { name, procedure_id, payload: payload.to_string() },
            pool,
        ).await
    }
}

We can now update the echo system to use the discovery and routing systems to simplify the client–server communication process.

// echo client using routing
let args = EchoArgs {
    message: "Hello RPC!".to_string(),
};
let serialized_args = args.serialize();

let routing = Router::new();
let response = routing
    .send_request("echo".to_string(), ECHO_PROCEDURE, serialized_args.as_str())
    .await;
println!("Response: {}", response.payload);

Delivery

After a client discovers and routes a request to a server, the request needs to be delivered to the server process. Because another request can arrive to a server while a server is busy processing previous requests, it is useful to maintain a queue of active requests. New requests are placed in the queue and delivered requests are removed from the queue.

The delivery queue can be maintained either in the client where the request is sent or the server where the request is received. Placing queues on the server removes the need for managing flow control of requests to the server to avoid congestion, but can lead to server queues filling up without good load balancing.

Concurrency

A server can process multiple requests concurrently by using a request queue and worker tasks. The server accepts incoming connections and places requests into a bounded channel, which provides natural back pressure. A separate worker task dequeues and processes requests:

// Create a channel to hold in-flight requests.
let (tx, mut rx) = mpsc::channel::<InflightRequest>(100);

// Spawn a worker task to process requests.
task::spawn(async move {
    while let Some(request) = rx.recv().await {
        let request_str = String::from_utf8_lossy(&request.data)
            .trim().to_string();
        let req: Request = Request::deserialize(&request_str).unwrap();
        let response = Response { result: req.a * req.b };
        let res_str = response.serialize();
        request.client_socket.write_all(res_str.as_bytes()).await.unwrap();
    }
});

Timeouts

Faults in a planetary scale computer can cause a server to take longer to respond than a client expects — or to not respond at all. A software bug could cause a server to stop or slow down. A hardware fault could render the device a server is running on unreliable. Or, a network or power fault could take an entire building or geographic region offline.

To be resilient in the face of unpredictable bugs, faults, and outages, a system can bound the amount of time spent trying to do work using timeouts. Timeouts benefit clients by exposing the fact that a request is taking too long. The client can use that information to make an intelligent decision about what to do next. Timeouts also benefit servers by preventing long standing requests from occupying precious queue slots and system resources.

// Wrap the processing in a timeout.
let process_result = timeout(REQUEST_TIMEOUT, async {
    let request_str = String::from_utf8_lossy(&request.data)
        .trim().to_string();
    let req: Request = Request::deserialize(&request_str).unwrap();
    let response = Response { result: req.a * req.b };
    let res_str = response.serialize();
    request.client_socket.write_all(res_str.as_bytes()).await
}).await;

// Send a fatal response to the client on timeout.
if process_result.is_err() {
    let fatal_msg = "Fatal: Request timed out".as_bytes();
    let _ = request.client_socket.write_all(fatal_msg).await;
}

Retries

When a request times out or encounters a transient error, a client can retry the request. However, retries must be implemented carefully. If many clients retry at the same time, they can create a “thundering herd” that overwhelms the server. To mitigate this, we add random jitter to the retry delay, spreading out the retries over time:

const MAX_RETRIES: u32 = 3;
const BASE_RETRY_DELAY: Duration = Duration::from_secs(1);
const JITTER_MS: u64 = 200;

for attempt in 1..=MAX_RETRIES {
    let response = send_request(&request).await;
    match response {
        Ok(response) => {
            println!("5 * 10 = {}", response.result);
            break;
        },
        Err(e) if e == "Fatal: Request timed out" => {
            let jitter = rand::thread_rng().gen_range(0..JITTER_MS);
            let delay = BASE_RETRY_DELAY + Duration::from_millis(jitter);
            println!("Attempt {} failed. Retrying in {:?}...", attempt, delay);
            tokio::time::sleep(delay).await;
        },
        Err(e) => {
            println!("Error: {}", e);
            break;
        }
    }
}

As systems grow, they eventually need to span multiple geographic regions. Chapter 24: Geo Replication explores how to replicate a full system stack across regions while keeping latency low and data consistent.