第11章: スケジューリング

プラネタリスケールコンピュータは、数千から数百万台のマシンで構成され、各マシンは多くのプロセスを実行できます。スケジューリングとは、これらのマシン上で作業をいつどこで実行するかを決定する技術と科学です。スケジューリングサービスは、フリート全体を統括します——サービスプロセスのスポーン、ポートの割り当て、ヘルスのモニタリング、望ましい状態と実際の状態の調整を行います。

スケジューラのデータモデル

scheduling/src/main.rs スケジューラは2つのコアデータ構造を維持します。ServiceSpecは望ましい状態を記述します:サービス名、Cargoマニフェストパス、オプションのバイナリ名、望ましいレプリカ数。Instanceは現実を記述します:ID、ポート、OSプロセスID、ヘルスステータスを持つ実行中のプロセスです。

struct ServiceSpec {
    name: String,
    manifest_path: String,
    bin_name: String,
    desired_replicas: i32,
}

struct Instance {
    id: String,
    service_name: String,
    port: u16,
    pid: u32,
    status: String, // "starting", "healthy", "unhealthy", "stopped"
}

望ましい状態と実際の状態のギャップが、すべてのスケジューリング決定を駆動します。サービススペックが3レプリカを指定しているのに2インスタンスしか実行されていない場合、スケジューラはもう1つスポーンします。インスタンスのヘルスチェックが失敗した場合、スケジューラはそれを不健全としてマークし、置き換える可能性があります。

プロセスのスポーン

echo/src/bin/server_v1.rs — このパターンはすべてのサービスに現れます スケジューラは各サービスをstd::process::Command経由でOSプロセスとしてスポーンし、PORT環境変数を通じて割り当てられたポートを渡します。システム内のすべてのサービスは起動時にこの変数をチェックします:

let addr = std::env::var("PORT")
    .map(|p| format!("127.0.0.1:{}", p))
    .unwrap_or_else(|_| SYSTEM_ADDRESS.to_string());

この3行のパターンはすべてのサービスのmain.rsに現れます。サービスはスタンドアロンでよく知られたポートで実行する(開発用)ことも、動的に割り当てられたポートを受け入れる(スケジューラによる管理時)こともできます。スポーンされたプロセスはディスカバリに自己登録し、他のサービスから即座に発見可能になります。

フリートブートストラップ

scheduling/src/main.rs 起動時に、スケジューラはハードコードされた構成テーブルからフリート全体をブートストラップします。各エントリはサービス名、Cargoマニフェストパス、オプションのバイナリ名、レプリカ数、ベースポートを指定します。単一レプリカのサービスはよく知られたポートを取得します。マルチレプリカのサービスはベースからの連番ポートを取得します。

const DEFAULT_FLEET: &[FleetEntry] = &[
    FleetEntry { name: "security",      manifest_path: "security/Cargo.toml",
                 bin_name: "", replicas: 1, base_port: 11100 },
    FleetEntry { name: "configuration", manifest_path: "configuration/Cargo.toml",
                 bin_name: "", replicas: 1, base_port: 10500 },
    FleetEntry { name: "echo",          manifest_path: "echo/Cargo.toml",
                 bin_name: "server_v1", replicas: 3, base_port: 10100 },
    FleetEntry { name: "frontend",      manifest_path: "frontend/Cargo.toml",
                 bin_name: "", replicas: 2, base_port: 8081 },
    // ... storage, caching, routing, monitoring, release
];

scheduling/src/main.rsspawn_instance 各エントリに対して、スケジューラはspawn_instanceを呼び出します。これはマニフェストパスでcargo runコマンドを構築し、PORT環境変数を通じて割り当てられたポートを渡します。子プロセスIDは、スケジューラが後でインスタンスを必要に応じてキルできるようにキャプチャされます。

fn spawn_instance(&mut self, spec: &ServiceSpec, port: u16)
    -> Option<Instance>
{
    let mut cmd = Command::new("cargo");
    cmd.arg("run")
       .arg("--manifest-path").arg(&manifest);
    if !spec.bin_name.is_empty() {
        cmd.arg("--bin").arg(&spec.bin_name);
    }
    cmd.env("PORT", port.to_string());
    match cmd.spawn() {
        Ok(child) => Some(Instance {
            id, service_name: spec.name.clone(),
            port, pid: child.id(),
            status: "starting".to_string(),
        }),
        Err(e) => None,
    }
}

ヘルスモニタリング

scheduling/src/main.rshealth_check_loop バックグラウンドループが5秒ごとに各インスタンスにTCP接続を試みてプローブします。接続が成功すればインスタンスは健全とマークされます。失敗すれば不健全とマークされます。これは最もシンプルなヘルスチェックです——本番のスケジューラはアプリケーションレベルのヘルスエンドポイント、リソース使用率、レスポンスレイテンシーもチェックするでしょう。

async fn health_check_loop(shared_state: Arc<Mutex<SchedulerState>>) {
    loop {
        sleep(Duration::from_secs(5)).await;
        let mut state = shared_state.lock().await;
        for instance in state.instances.iter_mut() {
            if instance.status == "stopped" { continue; }
            let addr = format!("127.0.0.1:{}", instance.port);
            match tokio::net::TcpStream::connect(&addr).await {
                Ok(_)  => { instance.status = "healthy".to_string(); }
                Err(_) => { instance.status = "unhealthy".to_string(); }
            }
        }
    }
}

RPCインターフェース

scheduling/src/lib.rs スケジューリングサービスはRPCフレームワークを通じて5つのプロシージャを公開します。SCHEDULE_SERVICEは新しいサービススペックを登録して調整します。LIST_INSTANCESはすべての実行中インスタンスを返します。SCALE_SERVICEはレプリカ数を更新します。STOP_INSTANCEはプロセスIDで特定のインスタンスをキルします。GET_SERVICEは1つのサービスのスペックとインスタンスを返します。

pub const SCHEDULE_SERVICE_PROCEDURE: ProcedureId = 401;
pub const LIST_INSTANCES_PROCEDURE: ProcedureId = 402;
pub const SCALE_SERVICE_PROCEDURE: ProcedureId = 403;
pub const STOP_INSTANCE_PROCEDURE: ProcedureId = 404;
pub const GET_SERVICE_PROCEDURE: ProcedureId = 405;

#[derive(Debug, Serializable, Deserializable)]
pub struct ScheduleServiceArgs {
    pub name: String,
    pub manifest_path: String,
    pub bin_name: String,
    pub replicas: i32,
}

スケジューリングダッシュボードは、これらのプロシージャを使用してフリートを表示し、運用者がサービスをスケールしたり個々のインスタンスを停止したりできるようにします。