第1章: システム
インターネットを使ったことがあるなら、あなたはコンピュータシステムと対話したことがあります。コンピュータシステムとは、何らかの作業を行うために使用されるソフトウェアとハードウェアの集合体です。単一のハードウェア上で動作するコンピュータプログラムとは異なり、コンピュータシステムは複数のプログラム、デバイス、サーバー、さらには地理的な場所にまたがって分散しています。
コンピュータシステムにはさまざまな形態や規模があります。オペレーティングシステムやファイルストレージシステムなど、単一のコンピュータ上で動作する一般的なシステムにはすでに馴染みがあるかもしれません。システムは、異なるハードウェアとソフトウェアのコンポーネントをつなぎ合わせる接着剤です。コンピュータシステムのこの特性は根本的なものです:コンピュータシステムはコンポーネントの集合を管理します。
システムは、プラネタリスケールコンピュータを構築するための主要なビルディングブロックです。システムの重要な特徴は、標準インターフェースを介して合成可能であることです。適切に設計されたシステムは、ハードウェアとソフトウェアの集合に対して、抽象化されカプセル化された関数がプログラムに対して果たすのと同じ役割を果たします。あらゆるシステムの主要な目的は、何らかのタスクを達成するための再利用可能な方法を提供することです。
アーキテクチャ
コンピュータシステムはシステムの機能を公開する明確に定義されたインターフェースを提供するため、人間はあるシステムを別のシステムと関連付けるさまざまな方法を考案してきました。もしシステムが物理世界の中空のブロックであれば、ブロック同士を配置する方法は限られています。いくつかのブロックを他のブロックの中に、横に、あるいは上に置くことができるでしょう。
モノリシック
モノリシックシステムアーキテクチャは、他のシステムを内部に含む一つの大きなシステムのようなものです。この大きなシステムは、含まれるシステムに対して必要に応じたインターフェースを提供します。大きなシステムが他のすべてのシステムを“つなぎ合わせる”のです。すべての機能を異なるHTTPエンドポイントに公開するWebアプリは、モノリシックシステムの一例です。
モノリシックシステムは、必要なすべての機能が同じバイナリにバンドルされているため、作業を行うために他のシステムと通信する必要がないという利点があります。欠点は、システムの小さなコンポーネントを更新するためにシステム全体の新バージョンをビルドしてリリースする必要があることです。
コンポーネント間の通信を避ける必要がある場合、バイナリが単一サーバーのリソース内に収まる場合(ただし多くのサーバーに分散は可能)、そしてビルドとリリースプロセスがモノリシックバイナリへの変更量を処理できる場合に、モノリシックシステムを選択するとよいでしょう。適切なビルドとリリースプロセスがあれば、モノリシックアプローチは数百の機能と1日あたり数千の変更をサポートできます。
マイクロ
マイクロシステムは、互いに隣り合う小さなブロックのようなものです。各マイクロシステムは、比較的シンプルなインターフェースを持つ特定のカプセル化されたタスクを実行します。永続ストレージに値を格納しアクセスするシステムはマイクロシステムの一例です。モノリシックシステムがすべての機能が同じアドレス空間を共有する単一のバイナリとして存在するのに対し、マイクロシステムは別々のバイナリとして存在し、別々のプロセスとして実行されます。
機能を別々のバイナリに分離することにはトレードオフがあります。異なるマイクロシステムは異なるプログラミング言語で実装できるため、開発の柔軟性が高まります。マイクロシステムは個別に更新・リリースでき、全体のシステム可用性が向上します。さらに、マイクロシステムは完全に別のデバイスにデプロイすることも可能です。
マイクロシステムでは運用がより複雑になります。マイクロシステムは、自らが利用する機能を持つ他のマイクロシステムと通信する必要があり、依存関係が生じます。マイクロシステムは物理的に近くに配置されていない可能性があるため、レイテンシーが重要な懸念事項になります。マイクロサービス間のインタラクションのテストは、現実世界の特性を再現するよう慎重に行う必要があります。
ティアード
ティアードシステムは、モノリシックとマイクロシステムのハイブリッドで、作業が少数のティアに分割され、各ティアは“上”と“下”のティアとのみ通信します。少ない作業量と少数のコントリビュータでは、ティアードアーキテクチャはマイクロサービスの利点の一部を、モノリシックサービスに関連する低い運用コストで実現できます。
通常、作業量が増え、コントリビュータ数が増えると、ティアードサービスにもモノリシックサービスの欠点が現れます。最初は、別々の作業を独自のティアに切り出すことで追加のティアを増やせます。しかし、サービスディスカバリ、セキュリティ、プライバシーなどの共通機能はすべてのティアで必要になる場合があり、マイクロサービスアプローチが必要となります。
通信
システムはプラネタリスケールコンピュータのビルディングブロックです。単一のシステムも有用ですが、システムの集合体は真に強力です。あるシステムが別のシステムの有用性を活用するためには、システム同士が対話する必要があります。別のシステムに依存するシステムをクライアント、依存されるシステムをサーバーと呼びます。クライアントとサーバー間の通信を処理する一般的な方法がいくつか存在します。
共有ライブラリ
共有ライブラリにより、あるシステムを別のシステムと同じプロセスとアドレス空間内で実行できます。共有ライブラリは何らかの機能を実装し、システム設計者はそれがシステム自体の一部であるかのように利用できます。コンパイラは、コンパイル時に共有ライブラリを表すオブジェクトをコンパイルしてリンクするか、ランタイムにオペレーティングシステムが以前にコンパイルされたオブジェクトファイルを動的にリンクします。
共有ライブラリは、システム間のあらゆるインタラクション方法の中で最も低い通信オーバーヘッド——単一の関数呼び出し——を持ちます。共有システムの完全なコピーをそれを共有するシステムに含める必要があるため、共有ライブラリはバイナリの肥大化につながる可能性があります。さらに、新しい共有ライブラリ機能を使用するためにはシステムの再コンパイルと再デプロイが必要であり、システム運用者の負担を増大させる可能性があります。
共有ライブラリを他のシステムインタラクション方法と組み合わせると便利な場合が多いです。例えば、サーバーがキャッシュすべきデータをクライアントに返す場合、すべてのクライアントが独自のキャッシュ方法を実装する代わりに、キャッシュとサーバー通信を処理する共有ライブラリを提供できます。
プロセス間通信
プロセス間通信(IPC)は、システムがオペレーティングシステムを使用して互いに通信することに依存しています。厳密に言えば、プロセス間で情報を渡すあらゆる手段は有効なプロセス間通信の形式です:ファイル、パイプ、共有メモリ、ソケットなどがあります。デバイス上で動作し、プロセス間通信を通じて同じデバイス上の他のシステムに機能を提供するシステムは、時として“サイドカー”と呼ばれます。
共有ライブラリとは異なり、プロセス間通信では、サイドカープロセスをその機能を利用するプロセスとは独立して更新できます。さらに、プロセス間通信は他のシステムインタラクション形式と比較して比較的低い通信オーバーヘッドを持ちます。欠点は、各サイドカーがデバイス上のコンピュート、メモリ、ストレージ、ネットワーク、その他のリソースを競合することです。
デバイスにデプロイする広く使用されるサイドカーは少数にとどめるのが通常最善です。サイドカーはシステムリソースを制限するため、サイドカーシステムの使用を選択した場合は、リソース使用量を追跡し、どの程度が過剰かを判断することが重要です。リソースニーズが高くなりすぎた場合は、サイドカーを別のインタラクション形式に昇格させる必要が生じる場合もあります。
ネットワーク
ネットワークは、プロトコルとリレーデバイスを使用して広大な距離にわたって情報を転送します。ネットワークはプラネタリスケールコンピュータのインターコネクトを提供するため、専用の章を設けて詳しく説明します。ここでは、システムがネットワークを使用して互いにインタラクションする方法を見ていきます。パケットベースのネットワークプロトコルを使用するイーサネットネットワークを想定します。
ネットワークを介して通信するために、サーバーはソケットでリッスンしてクライアントの接続を待ちます。クライアントはサーバーのアドレスを使用して情報のパケットをサーバーに送信します。サーバーがパケットを理解した場合、独自のパケットを構築してクライアントに送り返すことができます。このパターンは、クライアント、サーバー、またはネットワークのいずれかが接続を閉じるまで繰り返されます。
ネットワークは分散システムインタラクションの力を解き放ちます。広域ネットワークやインターネットに接続すると、システムは地球の遠く離れた場所、さらには宇宙空間を超えて通信できます。レイテンシー(地球全体で数十から数百ミリ秒に及ぶ可能性がある)に加えて、ネットワークインタラクションの欠点は、通信の信頼性がネットワークとプロトコルの信頼性によって制限されることです。
正規化
クライアントは情報を転送したいサーバーとはまったく異なるオペレーティング環境(ハードウェア、アーキテクチャ、オペレーティングシステムなど)上にある可能性があるため、クライアントはサーバーに送信する情報を正規化する必要があります。例えば、クライアントのデバイス上のメモリにあるビットをそのままサーバーに送信した場合、サーバーがクライアントと同じ方法でビットを解釈する保証はありません。
クライアントが送信しサーバーが受信するビットを正規化するために、クライアントは送信する情報に対してシリアライゼーションというプロセスを実行し、サーバーは情報を再構築するためにデシリアライゼーションという逆のプロセスを実行します。このプロセスはクライアントとサーバーの裏側で行われます。
正規化システムを共有ライブラリとして実装できます。proc-macro、syn、quoteクレートを使用してRust構文木のイントロスペクションを容易にし、i32、bool、String要素を持つシンプルなstructのシリアライズとデシリアライズのための手続きマクロを実装します。完全な実装では他のデータ型やエスケープされたデリミタも処理します。
normalization/src/lib.rs
まず、正規化をサポートするstructにアノテーションを付けるためのSerializableトレイトとDeserializableトレイトを定義します。これらのトレイトは、structをString表現との間で変換する関数を提供します。また、シリアライゼーションとデシリアライゼーション中に発生するエラーのenumも定義します。
#[derive(Debug, PartialEq)]
pub enum NormalizationError {
MissingField,
InvalidFormat,
ParseFailure,
}
pub trait Serializable {
fn serialize(&self) -> String;
}
pub trait Deserializable: Sized {
fn deserialize(input: &str) -> Result<Self, NormalizationError>;
}
normalization/normalization-macros/src/lib.rs
次に、シンプルなstructのためのシリアライゼーション関数を実装します。まずRustのstructを受け取り、各フィールドについてフィールドの型に応じて値を文字列表現に変換する関数を定義します。特殊文字をエスケープするためにString型は別途処理します。
fn generate_serialization_for_type(
field_name: &Option<syn::Ident>,
field_type: &TypePath,
) -> proc_macro2::TokenStream {
if field_type.path.is_ident("i32") || field_type.path.is_ident("bool") {
quote! { format!("{}: {}", stringify!(#field_name), self.#field_name) }
} else if field_type.path.is_ident("String") {
quote! {
format!(
"{}: \"{}\"",
stringify!(#field_name),
self.#field_name.replace("\\", "\\\\").replace(":", "\\:")
.replace("\"", "\\\"").replace(",", "\\,")
)
}
} else {
panic!("Unsupported type!");
}
}
最後にシリアライゼーションルーチンの核心に到達します。generate_serialization_for_type関数を使用して構造体のフィールドをシリアライズされた文字列に変換するserialize関数を実装する手続きマクロを定義します。RustコンパイラはSerializableトレイトから派生した構造体に対してこの関数を呼び出します。
#[proc_macro_derive(Serializable)]
pub fn derive_serializable(input: TokenStream) -> TokenStream {
let input = syn::parse_macro_input!(input as DeriveInput);
let name = &input.ident;
let gen = match &input.data {
Data::Struct(DataStruct {
fields: Fields::Named(FieldsNamed { named, .. }),
..
}) => {
let serialization_logic = named.iter().map(|f| {
let field_name = &f.ident;
match &f.ty {
Path(type_path) =>
generate_serialization_for_type(field_name, type_path),
_ => panic!("Unsupported type!"),
}
});
quote! {
impl #name {
pub fn serialize(&self) -> String {
let parts = vec![#(#serialization_logic),*];
format!("{{{}}}", parts.join(","))
}
}
}
}
_ => panic!("Only named structs are supported!"),
};
gen.into()
}
デシリアライゼーションマクロも同様のパターンに従いますが逆方向です:シリアライズされたStringを解析し、キー–値ペアに分割し(エスケープされたカンマやコロンに注意しながら)、型に応じてstructの各フィールドを再構築します。
normalization/src/lib.rs
正規化ライブラリを使用するには、トレイトをuseしてstructをそれらからderiveとして宣言するだけです。
pub use normalization_macros::{Serializable, Deserializable};
#[derive(Serializable, Deserializable)]
pub struct Sample {
pub number: i32,
pub flag: bool,
pub text: String,
}
normalization/tests/tests.rs
後で派生したトレイトを使用したい場合は、structをString表現との間で正規化するためにserialize関数またはdeserialize関数を呼び出すだけです。
use normalization::Sample;
#[test]
fn test_serialization() {
let sample = Sample {
number: 5,
flag: true,
text: "Hello".to_string(),
};
let serialized = sample.serialize();
assert_eq!(serialized, "{number: 5,flag: true,text: \"Hello\"}");
}
#[test]
fn test_deserialization() {
let serialized = "{number: 5,flag: true,text: \"Hello\"}";
let deserialized = Sample::deserialize(serialized);
match deserialized {
Ok(sample) => {
assert_eq!(sample.number, 5);
assert_eq!(sample.flag, true);
assert_eq!(sample.text, "Hello".to_string());
}
Err(e) => panic!("Deserialization failed with error: {:?}", e),
}
}
structの表現を正規化できるようになったので、システム間で簡単に共有できます。次に、あるシステムが別のシステムを使用して作業を行うための抽象インターフェースについて説明します。
リモートプロシージャコール
リモートプロシージャコール(RPC)は、ネットワーク通信のエレガントなラッパーを提供します。RPCの目的は、広大な距離にわたって分散している可能性のあるシステム間に関数のようなインターフェースを提供することです。このインターフェースはネットワーク通信の分散性のメリットと共有ライブラリのプログラマビリティのメリットを実現します。
リモートプロシージャコールは1981年にAndrew Nelsonによって提案され、1984年にBirrellとNelsonによって実装されました。概略として、RPCはシステムが互いに離れた別のデバイス上にある可能性があるという事実を抽象化する方法でシステム間の情報転送方法を規定するインターフェースで構成されています。
RPCインターフェースは、クライアントでのシリアライゼーション、クライアントからサーバーへのリクエストのネットワーク転送、サーバーでのデシリアライゼーション、サーバーでの作業、サーバーでのシリアライゼーション、サーバーからクライアントへのレスポンスのネットワーク転送、クライアントでのデシリアライゼーションを実行するコードに変換されます。
RPCはプラネタリスケールコンピュータの基本的な側面です。ネットワークがプラネタリスケールコンピュータのインターコネクトであるならば、RPCはバスプロトコルです。クライアントとサーバー間のシリアライゼーションとデシリアライゼーションには正規化システムを使用できます。ネットワークアクセスの詳細は本書の焦点ではないため、ネットワーク通信にはtokioライブラリを使用します。
rpc/src/lib.rs
まず、RPCクライアントとサーバー間のインターフェースを定義します。クライアントはサーバーにリクエストを送信し、サーバーに実行してほしいプロシージャを識別する番号とクライアントからのデータを含むペイロードを指定します。サーバーはリクエストを処理し、リクエスト結果を含むペイロードを持つレスポンスをクライアントに返します。
pub mod client;
pub mod server;
pub type ProcedureId = i32;
pub type Payload = String;
#[derive(Debug, Clone)]
pub struct Request {
pub procedure_id: ProcedureId,
pub payload: Payload,
}
#[derive(Debug, Clone)]
pub struct Response {
pub payload: Payload,
}
プロシージャ識別子とペイロードについての注意点です。新しいプロシージャを作成したり既存のプロシージャ識別子のペイロード構造を更新する場合、新しいユニークなプロシージャ識別子を選択することが重要です。ペイロードの変更は、新しいインターフェースを使用するクライアントと古いインターフェースを実装するサーバー間で後方互換性がないためです。
また、クライアントが古いインターフェースにリクエストを送信しなくなったことが確実になるまで、サーバー上の古いインターフェースの既存の実装を利用可能な状態に保つ必要があります。このため、常に対応する新バージョンのクライアントをデプロイする前に新バージョンのサーバーをデプロイすべきです——古いクライアントのリクエストは新しいサーバーで引き続き処理できますが、新しいクライアントのリクエストは古いサーバーでは処理できません。
rpc/src/server.rs
RPCサーバーはソケットでリクエストをリッスンし、リクエストを処理するためのスレッドを生成し、リクエストハンドラを呼び出します。リクエストハンドラはサーバーによって実装されます。クライアントがサーバーに実行を求めるプロシージャに応じて、サーバーは異なるコードを実行します。
pub async fn start_server(
addr: &str,
handler: impl Fn(Request) -> Response + Send + Sync + 'static + Clone,
) -> io::Result<()> {
let listener = TcpListener::bind(addr).await?;
loop {
let (mut socket, _) = listener.accept().await?;
let handler = handler.clone();
tokio::spawn(async move {
loop {
let mut buffer = vec![0u8; 1024];
let n = socket.read(&mut buffer).await
.expect("Failed to read from socket");
let data = String::from_utf8_lossy(&buffer[..n]);
let parts: Vec<&str> = data.splitn(2, ':').collect();
let request = Request {
procedure_id: parts[0].parse().unwrap(),
payload: parts[1].trim().to_string(),
};
let response = handler(request);
socket.write_all(response.payload.as_bytes()).await
.expect("Failed to write to socket");
}
});
}
}
各スレッドがネットワークソケットを開いたまま維持し、追加データを待機する(またはソケットが閉じられるのを待つ)ことに注目してください。これによりクライアントは各リクエストごとにネットワーク接続を確立することなく、サーバーにリクエストのストリームを送信できます。これにより追加リクエストを処理するためのレイテンシーとリソースが削減されますが、リクエストが長時間実行される場合はサーバーのソケットが不足するリスクがあります。
rpc/src/client.rs
RPCクライアントは特定のアドレスのサーバーに接続し、クライアントが識別するプロシージャをクライアントが提供するペイロードで実行するリクエストを送信します。
pub async fn send_request(
server_addr: &str,
request: Request,
) -> io::Result<Response> {
let mut stream = TcpStream::connect(server_addr).await?;
let serialized = format!("{}:{}\n", request.procedure_id, request.payload);
stream.write_all(serialized.as_bytes()).await?;
let mut buffer = vec![0u8; 1024];
let n = stream.read(&mut buffer).await?;
let response_data = String::from_utf8_lossy(&buffer[..n]);
Ok(Response {
payload: response_data.to_string(),
})
}
クライアント–サーバー
最も一般的なシステムアーキテクチャはクライアント–サーバーアーキテクチャです。その名の通り、クライアントデバイスがサーバーデバイスと通信する構造です。このアーキテクチャでは、クライアントがサーバーにリクエストを送信し、サーバーがクライアントにレスポンスを返します。クライアントはコマンドラインインターフェース、スタンドアロンバイナリ、またはリクエストを送信したい別のサーバーである可能性があります。
クライアント–サーバーアーキテクチャの理解を深め、正規化とRPCをつなぎ合わせるために、シンプルなechoクライアントとサーバーを見てみましょう。echoクライアントはechoサーバーに返してほしい文字列をペイロードとするリクエストを送信します。echoサーバーはその文字列を検査しクライアントへのレスポンスのペイロードとして返します。
echo/src/lib.rs
まずechoシステムのコンポーネントを共有ライブラリで指定します。ライブラリはクライアントがリクエストできサーバーが実行できるプロシージャの識別子を提供します。また、リクエストとレスポンスのペイロードの構造も定義します。
use normalization::{Deserializable, NormalizationError, Serializable};
use rpc::ProcedureId;
pub const SYSTEM_NAME: &str = "echo";
pub const SYSTEM_ADDRESS: &str = "127.0.0.1:10100";
pub const ECHO_PROCEDURE: ProcedureId = 1;
#[derive(Serializable, Deserializable)]
pub struct EchoArgs {
pub message: String,
}
echo/bin/server_v0.rs
echoサーバーではクライアントがリクエストを送信するのを待ちます。サーバーがリクエストを受け取るとハンドラ関数を呼び出してリクエストを処理します。ハンドラはリクエストされたプロシージャがサーバーが認識するものかどうかを確認し、そうであればサーバーはリクエストをデシリアライズしてそのメッセージをレスポンスに格納します。
fn handle_echo(args: EchoArgs) -> String {
args.message
}
fn handler(request: Request) -> Response {
match request.procedure_id {
ECHO_PROCEDURE => {
let args: EchoArgs =
EchoArgs::deserialize(&request.payload)
.expect("Failed to deserialize");
Response { payload: handle_echo(args) }
}
_ => Response {
payload: "Unknown procedure".to_string(),
},
}
}
echo/bin/client_v0.rs
echoクライアントはechoサーバーに送信するリクエストを構築して送信する役割を担います。クライアントはメッセージをシリアライズし、プロシージャ識別子を指定し、リクエストをサーバーに送信し、サーバーのレスポンスを待ち、レスポンスのペイロードを出力します。
let args = EchoArgs {
message: "Hello RPC!".to_string(),
};
let serialized_args = args.serialize();
let request = Request {
procedure_id: ECHO_PROCEDURE,
payload: serialized_args,
};
let response = client::send_request(SYSTEM_ADDRESS, request)
.await
.expect("Failed to get response");
println!("Response: {}", response.payload);
クライアント–サーバーアーキテクチャはプラネタリスケールコンピュータを構築するための強力な抽象化です。作業を個別のエンティティに分割して管理性を向上させ、シンプルなインターフェースを公開してモジュール性と再利用性を促進し、柔軟な数のスレッドやデバイスで実行できるようにして分離性とスケーラビリティを提供します。
状態
先ほど構築したechoシステムは非常にシンプルでリクエスト間の状態管理を必要としませんでした。しかし多くの実際のシステム(これから見ていくものを含む)は作業を行うために何らかの状態を管理する必要があることが多いです。アトミック参照カウンタを使用してリクエストスレッド間で安全に状態を共有する簡単な方法があります。リクエスト間で状態を共有できる関数でrpc共有ライブラリを改良します。
rpc/src/server.rs
start_serverとの主な違いは共有したい状態を表すジェネリック型パラメータ<T>を使用していることです。共有状態を渡すためのパラメータを関数に追加しハンドラの型宣言も共有状態を受け取るように変更します。ハンドラを呼び出す前にハンドラが独自のコピーを持てるように共有状態をクローンします。
pub async fn start_server_with_state<T: Send + 'static>(
addr: &str,
handler: impl Fn(Request, Arc<Mutex<T>>) ->
Pin<Box<dyn Future<Output = Response> + Send>>
+ Send + Sync + 'static + Clone,
shared_state: Arc<Mutex<T>>,
) -> io::Result<()> {
let listener = TcpListener::bind(addr).await?;
loop {
let (mut socket, _) = listener.accept().await?;
let handler = handler.clone();
let shared_state = shared_state.clone();
tokio::spawn(async move {
let mut buffer = vec![0u8; 1024];
loop {
match socket.read(&mut buffer).await {
Ok(n) if n == 0 => break,
Ok(n) => {
let data = String::from_utf8_lossy(&buffer[..n]);
let parts: Vec<&str> = data.splitn(2, ':').collect();
let request = Request {
procedure_id: parts[0].parse().unwrap(),
payload: parts[1].trim().to_string(),
};
let response = handler(request, shared_state.clone()).await;
if socket.write_all(response.payload.as_bytes())
.await.is_err() { break; }
}
Err(_) => break,
}
}
});
}
}
転送
あるシステムから別のシステムに転送される情報の旅は魅力的な冒険です。複数の段階を経て潜在的に何千マイルもの距離を移動し、途中で配信の成功を妨げる危険を回避しなければなりません。次にあるシステムから別のシステムへの情報の時系列フローとそれに関わる概念やシステムをたどります。
ディスカバリ
クライアントは通信したいサーバーの位置を発見する方法を知る必要があります。しかしサーバーはデバイス間で移動する可能性があり、また多数のデバイスでサーバーのコピーが実行されている可能性もあります——この変動の中でクライアントはどのようにして正しいサーバーを見つけることができるのでしょうか?プラネタリスケールコンピュータの実装において問題に直面した場合、私たちは通常システムで問題を解決しようとします。
ディスカバリシステムはクライアントが接続したいシステムを受け取りクライアントが接続するデバイスを選択し選択したデバイスでクライアントに応答する役割を担います。したがってディスカバリシステムはシステムの識別子を入力として受け取りクライアントがサーバーと通信するために接続できるデバイスのアドレスで応答します。
discovery/src/lib.rs
まずディスカバリシステムが実装するプロシージャの識別子を定義します。サーバーはregisterプロシージャを使用して実装するシステムの名前とリッスンしているアドレスとポートを送信します。queryプロシージャによりクライアントはシステムを名前でリクエストしそのシステムを実装するサーバーのアドレスとポートを含むレスポンスを受け取ることができます。
#[derive(Debug, Serializable, Deserializable)]
pub struct RegisterArgs {
pub name: String,
pub address: String,
}
#[derive(Debug, Serializable, Deserializable)]
pub struct QueryArgs {
pub name: String,
}
#[derive(Debug, Serializable, Deserializable)]
pub struct QueryResult {
pub address: String,
}
ディスカバリシステムは登録されたシステムを追跡するために共有状態を使用します。レジストリにはシステム名からサーバーアドレスへのマッピングとアドレスが最後にディスカバリシステムに登録した時刻が含まれています。アドレスがリクエストされるとレジストリはランダムに一つを選択して返します。クリーンアップ関数は古い登録アドレスを削除します。
#[derive(Default)]
pub struct Registry {
registry: HashMap<Name, Vec<Address>>,
last_ping: HashMap<Address, Instant>,
}
impl Registry {
fn register(&mut self, name: Name, address: Address) {
if let Some(time) = self.last_ping.get_mut(&address) {
*time = Instant::now();
} else {
self.registry.entry(name)
.or_insert_with(Vec::new)
.push(address.clone());
self.last_ping.insert(address, Instant::now());
}
}
fn get_address(&self, name: &Name) -> Option<&Address> {
self.registry.get(name)?.choose(&mut rand::thread_rng())
}
fn cleanup_stale(&mut self) {
let now = Instant::now();
let stale_addresses: HashSet<_> = self.last_ping.iter()
.filter(|&(_, time)| now.duration_since(*time) > CLEANUP_DURATION)
.map(|(address, _)| address.clone())
.collect();
for address in stale_addresses {
self.last_ping.remove(&address);
self.registry.retain(|_, v| {
v.retain(|a| a != &address);
!v.is_empty()
});
}
}
}
rpc共有ライブラリによりディスカバリサービスのロジック実装が容易になります。リクエストのプロシージャ識別子に基づくヘルパー関数と共有レジストリ状態を使用したregisterおよびqueryプロシージャの処理関数を定義します。レジストリは複数のスレッド間で共有されるためリクエストハンドラ関数はリクエスト処理前にレジストリへの排他的アクセスを確保します。
サーバーが利用可能になると対応するシステム識別子へのリクエストを処理できるものとしてディスカバリシステムにアドレスを登録します。サーバーは追加と削除が行われるためシステムは定期的にディスカバリサービスに再登録する必要があります。もちろんディスカバリシステム自体も利用不可能になることがあります。その場合サーバーは指数関数的に増加する間隔で最大量までディスカバリシステムへの接続を再試行します。
しかし重要な問題としてサービスは最初にどのようにしてディスカバリサービスを見つけるのでしょうか?一つの選択肢はクライアントまたはサーバーが常にディスカバリサービスに到達できるアドレスのセットを維持することです。他にDNSやマルチキャストを使用してディスカバリサービスを特定する方法もあります。
ルーティング
サーバーの可用性は時間とともに変化する可能性がありある時点で発見した利用可能なサーバーが後で利用不可能になることがあります。そのためクライアントのサーバー発見とサーバーへのリクエストルーティングの間に抽象化レイヤーを構築することが有用です。このサービスをルーティングサービスと呼びその主な役割はクライアントからサーバーへリクエストを効率的かつ確実に届けることです。
ルーティングサービス(クライアントは最初にディスカバリサービスで発見できます)はサービス識別子をそのサービスの利用可能なサーバーに解決します。ディスカバリサービスとは異なりルーティングサービスはサービスのサーバーセットのヘルスも確認し利用可能なサーバーにのみリクエストをルーティングします。さらにルーティングサービスはコネクションプーリングとロードバランシングを実行します。
routing/src/lib.rs
ルーティングシステムはコネクションプールの概念に基づいています。各システムには独自のコネクションプールがあり、各コネクションはシステムのサーバーへのアクティブなネットワークソケットです。ソケットの再利用により接続確立のオーバーヘッドが排除されます。
pub struct ConnectionPool {
system_name: String,
pool: Arc<Mutex<VecDeque<TcpStream>>>,
max_size: usize,
semaphore: Arc<Semaphore>,
}
impl ConnectionPool {
pub fn new(system_name: String, max_size: usize) -> Self {
Self {
system_name,
pool: Arc::new(Mutex::new(VecDeque::new())),
max_size,
semaphore: Arc::new(Semaphore::new(max_size)),
}
}
pub async fn get(&self) -> Option<TcpStream> {
self.semaphore.acquire().await
.expect("Unable to acquire permit").forget();
let mut pool = self.pool.lock().await;
let mut conn = pool.pop_front();
if conn.is_none() {
let address = discovery::query(self.system_name.clone()).await;
conn = TcpStream::connect(&address.address).await.ok();
}
conn
}
pub async fn release(&self, conn: TcpStream) {
let mut pool = self.pool.lock().await;
if pool.len() < self.max_size {
pool.push_back(conn);
}
self.semaphore.add_permits(1);
}
}
ルーティングシステムはプロキシシステムと共有ライブラリの2つのモードで動作できます。プロキシ版はRPCインターフェースを使用し、共有ライブラリ版はクライアント上で全機能を実行します。
routing/src/lib.rs
共有ライブラリはプロキシと同様の機能をクライアントがインスタンス化する構造体内で実行します。
pub struct Router {
pools: Arc<Mutex<HashMap<String, ConnectionPool>>>,
}
impl Router {
pub fn new() -> Self {
Self {
pools: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn send_request(
&self,
name: String,
procedure_id: ProcedureId,
payload: &str,
) -> Response {
let mut pools = self.pools.lock().await;
let pool = pools.entry(name.clone())
.or_insert_with(|| ConnectionPool::new(name.clone(), 10));
route_request(
RouteArgs { name, procedure_id, payload: payload.to_string() },
pool,
).await
}
}
echoシステムを更新してディスカバリとルーティングシステムを使用し通信プロセスを簡素化できます。
// echo client using routing
let args = EchoArgs {
message: "Hello RPC!".to_string(),
};
let serialized_args = args.serialize();
let routing = Router::new();
let response = routing
.send_request("echo".to_string(), ECHO_PROCEDURE, serialized_args.as_str())
.await;
println!("Response: {}", response.payload);
配信
クライアントがサーバーを発見してリクエストをルーティングした後、リクエストはサーバープロセスに配信される必要があります。サーバーが前のリクエストを処理している間に別のリクエストが到着する可能性があるため、アクティブなリクエストのキューを維持することが有用です。新しいリクエストはキューに入れられ配信されたリクエストはキューから削除されます。
配信キューはリクエストが送信されるクライアント側、またはリクエストが受信されるサーバー側のいずれかで維持できます。サーバーにキューを配置するとフロー制御の管理が不要になりますが、適切なロードバランシングがないとサーバーキューが満杯になる可能性があります。
並行性
サーバーはリクエストキューとワーカータスクを使用して複数のリクエストを並行処理できます。サーバーは受信接続を受け入れリクエストを有界チャネルに入れ、自然なバックプレッシャーを提供します。別のワーカータスクがリクエストをデキューして処理します:
// Create a channel to hold in-flight requests.
let (tx, mut rx) = mpsc::channel::<InflightRequest>(100);
// Spawn a worker task to process requests.
task::spawn(async move {
while let Some(request) = rx.recv().await {
let request_str = String::from_utf8_lossy(&request.data)
.trim().to_string();
let req: Request = Request::deserialize(&request_str).unwrap();
let response = Response { result: req.a * req.b };
let res_str = response.serialize();
request.client_socket.write_all(res_str.as_bytes()).await.unwrap();
}
});
タイムアウト
プラネタリスケールコンピュータの障害によりサーバーの応答がクライアントの期待よりも長くなったり、まったく応答しなくなったりする可能性があります。ソフトウェアのバグ、ハードウェア障害、ネットワークや電源の障害などが原因となり得ます。
予測不可能なバグ、障害、停止に対して回復力を持つためにシステムはタイムアウトを使用して作業の試行に費やす時間を制限できます。タイムアウトはリクエストに時間がかかりすぎていることをクライアントに知らせ、貴重なキュースロットとシステムリソースの占有を防ぎます。
// Wrap the processing in a timeout.
let process_result = timeout(REQUEST_TIMEOUT, async {
let request_str = String::from_utf8_lossy(&request.data)
.trim().to_string();
let req: Request = Request::deserialize(&request_str).unwrap();
let response = Response { result: req.a * req.b };
let res_str = response.serialize();
request.client_socket.write_all(res_str.as_bytes()).await
}).await;
// Send a fatal response to the client on timeout.
if process_result.is_err() {
let fatal_msg = "Fatal: Request timed out".as_bytes();
let _ = request.client_socket.write_all(fatal_msg).await;
}
リトライ
リクエストがタイムアウトしたり一時的なエラーが発生した場合クライアントはリクエストを再試行できます。しかしリトライは慎重に実装する必要があります。多くのクライアントが同時にリトライすると、サーバーを圧倒する“サンダリングハード”が発生する可能性があります。これを緩和するためにリトライの遅延にランダムなジッターを追加しリトライを時間的に分散させます:
const MAX_RETRIES: u32 = 3;
const BASE_RETRY_DELAY: Duration = Duration::from_secs(1);
const JITTER_MS: u64 = 200;
for attempt in 1..=MAX_RETRIES {
let response = send_request(&request).await;
match response {
Ok(response) => {
println!("5 * 10 = {}", response.result);
break;
},
Err(e) if e == "Fatal: Request timed out" => {
let jitter = rand::thread_rng().gen_range(0..JITTER_MS);
let delay = BASE_RETRY_DELAY + Duration::from_millis(jitter);
println!("Attempt {} failed. Retrying in {:?}...", attempt, delay);
tokio::time::sleep(delay).await;
},
Err(e) => {
println!("Error: {}", e);
break;
}
}
}
システムが成長するにつれて最終的には複数の地理的リージョンにまたがる必要が出てきます。第24章: ジオレプリケーションではレイテンシーを低く保ちデータの一貫性を維持しながらフルシステムスタックをリージョン間でレプリケーションする方法を探ります。