diff --git a/out/openapi.json b/out/openapi.json index 54b20af602..1e34f8af55 100644 --- a/out/openapi.json +++ b/out/openapi.json @@ -299,6 +299,31 @@ ] } }, + "/health/fanout": { + "get": { + "tags": [ + "health" + ], + "operationId": "health_fanout", + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HealthFanoutResponse" + } + } + } + } + }, + "security": [ + { + "bearer_auth": [] + } + ] + } + }, "/namespaces": { "get": { "tags": [ @@ -395,7 +420,7 @@ "/runner-configs": { "get": { "tags": [ - "runner_configs" + "runner_configs::list" ], "operationId": "runner_configs_list", "parameters": [ @@ -463,7 +488,7 @@ "/runner-configs/serverless-health-check": { "post": { "tags": [ - "runner_configs" + "runner_configs::serverless_health_check" ], "operationId": "runner_configs_serverless_health_check", "parameters": [ @@ -508,7 +533,7 @@ "/runner-configs/{runner_name}": { "put": { "tags": [ - "runner_configs" + "runner_configs::upsert" ], "operationId": "runner_configs_upsert", "parameters": [ @@ -559,7 +584,7 @@ }, "delete": { "tags": [ - "runner_configs" + "runner_configs::delete" ], "operationId": "runner_configs_delete", "parameters": [ @@ -599,6 +624,59 @@ ] } }, + "/runner-configs/{runner_name}/refresh-metadata": { + "post": { + "tags": [ + "runner_configs::refresh_metadata" + ], + "operationId": "runner_configs_refresh_metadata", + "parameters": [ + { + "name": "runner_name", + "in": "path", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "namespace", + "in": "query", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/RunnerConfigsRefreshMetadataRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/RunnerConfigsRefreshMetadataResponse" + } + } + } + } + }, + "security": [ + { + "bearer_auth": [] + } + ] + } + }, "/runners": { "get": { "tags": [ @@ -999,6 +1077,50 @@ }, "additionalProperties": false }, + "DatacenterHealth": { + "type": "object", + "required": [ + "datacenter_label", + "datacenter_name", + "status" + ], + "properties": { + "datacenter_label": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "datacenter_name": { + "type": "string" + }, + "error": { + "type": [ + "string", + "null" + ] + }, + "response": { + "oneOf": [ + { + "type": "null" + }, + { + "$ref": "#/components/schemas/HealthResponse" + } + ] + }, + "rtt_ms": { + "type": [ + "number", + "null" + ], + "format": "double" + }, + "status": { + "$ref": "#/components/schemas/HealthStatus" + } + } + }, "DatacentersListResponse": { "type": "object", "required": [ @@ -1018,6 +1140,46 @@ }, "additionalProperties": false }, + "HealthFanoutResponse": { + "type": "object", + "required": [ + "datacenters" + ], + "properties": { + "datacenters": { + "type": "array", + "items": { + "$ref": "#/components/schemas/DatacenterHealth" + } + } + } + }, + "HealthResponse": { + "type": "object", + "required": [ + "runtime", + "status", + "version" + ], + "properties": { + "runtime": { + "type": "string" + }, + "status": { + "type": "string" + }, + "version": { + "type": "string" + } + } + }, + "HealthStatus": { + "type": "string", + "enum": [ + "ok", + "error" + ] + }, "Namespace": { "type": "object", "required": [ @@ -1340,7 +1502,78 @@ } } }, - "RunnerConfigsServerlessHealthCheckError": { + "RunnerConfigsRefreshMetadataRequest": { + "type": "object", + "additionalProperties": false + }, + "RunnerConfigsRefreshMetadataResponse": { + "type": "object", + "additionalProperties": false + }, + "RunnerConfigsServerlessHealthCheckRequest": { + "type": "object", + "required": [ + "url" + ], + "properties": { + "headers": { + "type": "object", + "additionalProperties": { + "type": "string" + }, + "propertyNames": { + "type": "string" + } + }, + "url": { + "type": "string" + } + }, + "additionalProperties": false + }, + "RunnerConfigsServerlessHealthCheckResponse": { + "oneOf": [ + { + "type": "object", + "required": [ + "success" + ], + "properties": { + "success": { + "type": "object", + "required": [ + "version" + ], + "properties": { + "version": { + "type": "string" + } + } + } + } + }, + { + "type": "object", + "required": [ + "failure" + ], + "properties": { + "failure": { + "type": "object", + "required": [ + "error" + ], + "properties": { + "error": { + "$ref": "#/components/schemas/RunnerConfigsServerlessMetadataError" + } + } + } + } + } + ] + }, + "RunnerConfigsServerlessMetadataError": { "oneOf": [ { "type": "object", @@ -1444,69 +1677,6 @@ } ] }, - "RunnerConfigsServerlessHealthCheckRequest": { - "type": "object", - "required": [ - "url" - ], - "properties": { - "headers": { - "type": "object", - "additionalProperties": { - "type": "string" - }, - "propertyNames": { - "type": "string" - } - }, - "url": { - "type": "string" - } - }, - "additionalProperties": false - }, - "RunnerConfigsServerlessHealthCheckResponse": { - "oneOf": [ - { - "type": "object", - "required": [ - "success" - ], - "properties": { - "success": { - "type": "object", - "required": [ - "version" - ], - "properties": { - "version": { - "type": "string" - } - } - } - } - }, - { - "type": "object", - "required": [ - "failure" - ], - "properties": { - "failure": { - "type": "object", - "required": [ - "error" - ], - "properties": { - "error": { - "$ref": "#/components/schemas/RunnerConfigsServerlessHealthCheckError" - } - } - } - } - } - ] - }, "RunnerConfigsUpsertRequestBody": { "type": "object", "required": [ diff --git a/packages/core/api-public/src/health.rs b/packages/core/api-public/src/health.rs new file mode 100644 index 0000000000..c5c3a33279 --- /dev/null +++ b/packages/core/api-public/src/health.rs @@ -0,0 +1,155 @@ +use anyhow::Result; +use axum::{extract::Extension, response::IntoResponse, Json}; +use futures_util::StreamExt; +use rivet_api_builder::ApiError; +use serde::{Deserialize, Serialize}; +use std::time::Instant; +use utoipa::ToSchema; + +use crate::ctx::ApiCtx; + +#[derive(Debug, Serialize, Deserialize, ToSchema)] +#[schema(as = HealthFanoutResponse)] +pub struct FanoutResponse { + pub datacenters: Vec, +} + +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct DatacenterHealth { + pub datacenter_label: u16, + pub datacenter_name: String, + pub status: HealthStatus, + pub rtt_ms: Option, + pub response: Option, + pub error: Option, +} + +#[derive(Debug, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "snake_case")] +pub enum HealthStatus { + Ok, + Error, +} + +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct HealthResponse { + pub runtime: String, + pub status: String, + pub version: String, +} + +#[utoipa::path( + get, + operation_id = "health_fanout", + path = "/health/fanout", + responses( + (status = 200, body = FanoutResponse), + ), + security(("bearer_auth" = [])), +)] +pub async fn fanout(Extension(ctx): Extension) -> impl IntoResponse { + match fanout_inner(ctx).await { + Ok(response) => Json(response).into_response(), + Err(err) => ApiError::from(err).into_response(), + } +} + +async fn fanout_inner(ctx: ApiCtx) -> Result { + // Require datacenter read permissions to access health status + ctx.auth().await?; + + let dcs = &ctx.config().topology().datacenters; + + tracing::debug!(datacenters = dcs.len(), "starting health fanout"); + + let results = futures_util::stream::iter(dcs.clone().into_iter().map(|dc| { + let ctx = ctx.clone(); + + async move { + let start = Instant::now(); + + if dc.datacenter_label == ctx.config().dc_label() { + // Local datacenter - check directly + let response = HealthResponse { + runtime: "engine".to_string(), + status: "ok".to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + }; + + DatacenterHealth { + datacenter_label: dc.datacenter_label, + datacenter_name: dc.name.clone(), + status: HealthStatus::Ok, + rtt_ms: Some(start.elapsed().as_secs_f64() * 1000.0), + response: Some(response), + error: None, + } + } else { + // Remote datacenter - HTTP request + match send_health_check(&ctx, &dc).await { + Ok(response) => DatacenterHealth { + datacenter_label: dc.datacenter_label, + datacenter_name: dc.name.clone(), + status: HealthStatus::Ok, + rtt_ms: Some(start.elapsed().as_secs_f64() * 1000.0), + response: Some(response), + error: None, + }, + Err(err) => { + tracing::warn!( + ?dc.datacenter_label, + ?err, + "health check failed for datacenter" + ); + + DatacenterHealth { + datacenter_label: dc.datacenter_label, + datacenter_name: dc.name.clone(), + status: HealthStatus::Error, + rtt_ms: Some(start.elapsed().as_secs_f64() * 1000.0), + response: None, + error: Some(err.to_string()), + } + } + } + } + } + })) + .buffer_unordered(16) + .collect::>() + .await; + + tracing::debug!(results = results.len(), "health fanout completed"); + + Ok(FanoutResponse { + datacenters: results, + }) +} + +async fn send_health_check( + ctx: &ApiCtx, + dc: &rivet_config::config::topology::Datacenter, +) -> Result { + let client = rivet_pools::reqwest::client().await?; + let url = dc.peer_url.join("/health")?; + + tracing::debug!( + ?dc.datacenter_label, + ?url, + "sending health check to remote datacenter" + ); + + let res = client + .get(url) + .timeout(std::time::Duration::from_secs(5)) + .send() + .await?; + + if res.status().is_success() { + let response = res.json::().await?; + Ok(response) + } else { + anyhow::bail!("Health check returned status: {}", res.status()) + } +} + diff --git a/packages/core/api-public/src/lib.rs b/packages/core/api-public/src/lib.rs index bed4376314..7bd3bcebf4 100644 --- a/packages/core/api-public/src/lib.rs +++ b/packages/core/api-public/src/lib.rs @@ -2,6 +2,7 @@ pub mod actors; pub mod ctx; pub mod datacenters; mod errors; +pub mod health; pub mod metadata; pub mod namespaces; pub mod router; diff --git a/packages/core/api-public/src/router.rs b/packages/core/api-public/src/router.rs index ae7c64a66a..ad8665c57f 100644 --- a/packages/core/api-public/src/router.rs +++ b/packages/core/api-public/src/router.rs @@ -3,12 +3,12 @@ use axum::{ middleware::{self, Next}, response::{IntoResponse, Redirect, Response}, }; -use reqwest::header::{AUTHORIZATION, HeaderMap}; +use reqwest::header::{HeaderMap, AUTHORIZATION}; use rivet_api_builder::{create_router, extract::FailedExtraction}; use tower_http::cors::CorsLayer; use utoipa::OpenApi; -use crate::{actors, ctx, datacenters, metadata, namespaces, runner_configs, runners, ui}; +use crate::{actors, ctx, datacenters, health, metadata, namespaces, runner_configs, runners, ui}; #[derive(OpenApi)] #[openapi( @@ -28,6 +28,7 @@ use crate::{actors, ctx, datacenters, metadata, namespaces, runner_configs, runn runner_configs::serverless_health_check::serverless_health_check, runner_configs::refresh_metadata::refresh_metadata, datacenters::list, + health::fanout, ), components( schemas(rivet_types::keys::namespace::runner_config::RunnerConfigVariant) @@ -91,6 +92,8 @@ pub async fn router( .route("/runners/names", axum::routing::get(runners::list_names)) // MARK: Datacenters .route("/datacenters", axum::routing::get(datacenters::list)) + // MARK: Health + .route("/health/fanout", axum::routing::get(health::fanout)) // MARK: UI .route("/ui", axum::routing::get(ui::serve_index)) .route("/ui/", axum::routing::get(ui::serve_index))