From 6bda80f40f1fd821cffa950f01bd72e5aa282db6 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Tue, 7 Oct 2025 13:00:56 -0700 Subject: [PATCH] feat(serverless): regional serverless configs --- Cargo.lock | 3 + out/openapi.json | 124 +++--- packages/common/api-builder/src/extract.rs | 1 - packages/common/api-types/src/lib.rs | 1 + .../api-types/src/runner_configs/list.rs | 29 ++ .../api-types/src/runner_configs/mod.rs | 1 + packages/common/api-util/src/lib.rs | 29 +- packages/common/types/Cargo.toml | 1 + packages/common/types/src/keys/mod.rs | 1 + .../common/types/src/keys/namespace/mod.rs | 1 + .../types/src/keys/namespace/runner_config.rs | 25 ++ packages/common/types/src/lib.rs | 1 + packages/common/types/src/namespaces.rs | 61 --- packages/common/types/src/runner_configs.rs | 63 +++ packages/common/universaldb/src/utils/keys.rs | 2 +- packages/core/api-peer/src/runner_configs.rs | 47 +- packages/core/api-public/Cargo.toml | 6 +- packages/core/api-public/src/actors/list.rs | 2 +- .../core/api-public/src/actors/list_names.rs | 2 +- packages/core/api-public/src/router.rs | 2 +- .../core/api-public/src/runner_configs.rs | 211 +++++++-- packages/core/api-public/src/runners.rs | 4 +- packages/core/pegboard-serverless/src/lib.rs | 6 +- packages/services/namespace/Cargo.toml | 2 + packages/services/namespace/src/keys.rs | 419 ------------------ packages/services/namespace/src/keys/mod.rs | 190 ++++++++ .../namespace/src/keys/runner_config.rs | 260 +++++++++++ .../namespace/src/ops/runner_config/delete.rs | 17 +- .../src/ops/runner_config/delete_default.rs | 43 ++ .../runner_config/{get_local.rs => get.rs} | 26 +- .../src/ops/runner_config/get_default.rs | 33 ++ .../src/ops/runner_config/get_global.rs | 106 ----- .../namespace/src/ops/runner_config/list.rs | 19 +- .../namespace/src/ops/runner_config/mod.rs | 6 +- .../namespace/src/ops/runner_config/upsert.rs | 24 +- .../src/ops/runner_config/upsert_default.rs | 45 ++ packages/services/namespace/src/utils.rs | 10 +- .../src/ops/runner/find_dc_with_runner.rs | 93 ++-- .../pegboard/src/workflows/actor/runtime.rs | 5 +- scripts/api/add-serverless.ts | 20 +- 40 files changed, 1093 insertions(+), 848 deletions(-) create mode 100644 packages/common/api-types/src/runner_configs/list.rs create mode 100644 packages/common/api-types/src/runner_configs/mod.rs create mode 100644 packages/common/types/src/keys/namespace/mod.rs create mode 100644 packages/common/types/src/keys/namespace/runner_config.rs create mode 100644 packages/common/types/src/runner_configs.rs delete mode 100644 packages/services/namespace/src/keys.rs create mode 100644 packages/services/namespace/src/keys/mod.rs create mode 100644 packages/services/namespace/src/keys/runner_config.rs create mode 100644 packages/services/namespace/src/ops/runner_config/delete_default.rs rename packages/services/namespace/src/ops/runner_config/{get_local.rs => get.rs} (66%) create mode 100644 packages/services/namespace/src/ops/runner_config/get_default.rs delete mode 100644 packages/services/namespace/src/ops/runner_config/get_global.rs create mode 100644 packages/services/namespace/src/ops/runner_config/upsert_default.rs diff --git a/Cargo.lock b/Cargo.lock index 752a259544..c3b695d857 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2762,6 +2762,8 @@ name = "namespace" version = "25.7.3" dependencies = [ "anyhow", + "epoxy", + "epoxy-protocol", "gasoline", "internal", "reqwest", @@ -4634,6 +4636,7 @@ dependencies = [ "rivet-runner-protocol", "rivet-util", "serde", + "strum", "universaldb", "utoipa", "vbare", diff --git a/out/openapi.json b/out/openapi.json index 99c579754a..27d4727d5d 100644 --- a/out/openapi.json +++ b/out/openapi.json @@ -1242,7 +1242,13 @@ "runner_configs": { "type": "object", "additionalProperties": { - "$ref": "#/components/schemas/RunnerConfig" + "type": "object", + "additionalProperties": { + "$ref": "#/components/schemas/RunnerConfig" + }, + "propertyNames": { + "type": "string" + } }, "propertyNames": { "type": "string" @@ -1252,68 +1258,74 @@ "additionalProperties": false }, "RunnerConfigsUpsertRequest": { - "oneOf": [ - { - "type": "object", - "required": [ - "serverless" - ], - "properties": { - "serverless": { - "type": "object", - "required": [ - "url", - "headers", - "request_lifespan", - "slots_per_runner", - "min_runners", - "max_runners", - "runners_margin" - ], - "properties": { - "headers": { - "type": "object", - "additionalProperties": { - "type": "string" + "type": "object", + "additionalProperties": { + "oneOf": [ + { + "type": "object", + "required": [ + "serverless" + ], + "properties": { + "serverless": { + "type": "object", + "required": [ + "url", + "headers", + "request_lifespan", + "slots_per_runner", + "min_runners", + "max_runners", + "runners_margin" + ], + "properties": { + "headers": { + "type": "object", + "additionalProperties": { + "type": "string" + }, + "propertyNames": { + "type": "string" + } }, - "propertyNames": { + "max_runners": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "min_runners": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "request_lifespan": { + "type": "integer", + "format": "int32", + "description": "Seconds.", + "minimum": 0 + }, + "runners_margin": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "slots_per_runner": { + "type": "integer", + "format": "int32", + "minimum": 0 + }, + "url": { "type": "string" } - }, - "max_runners": { - "type": "integer", - "format": "int32", - "minimum": 0 - }, - "min_runners": { - "type": "integer", - "format": "int32", - "minimum": 0 - }, - "request_lifespan": { - "type": "integer", - "format": "int32", - "description": "Seconds.", - "minimum": 0 - }, - "runners_margin": { - "type": "integer", - "format": "int32", - "minimum": 0 - }, - "slots_per_runner": { - "type": "integer", - "format": "int32", - "minimum": 0 - }, - "url": { - "type": "string" } } } } - } - ] + ] + }, + "propertyNames": { + "type": "string" + } }, "RunnerConfigsUpsertResponse": { "type": "object" diff --git a/packages/common/api-builder/src/extract.rs b/packages/common/api-builder/src/extract.rs index 116034df9d..274fd87abe 100644 --- a/packages/common/api-builder/src/extract.rs +++ b/packages/common/api-builder/src/extract.rs @@ -1,4 +1,3 @@ -use anyhow::anyhow; use axum::{ extract::{ Request, diff --git a/packages/common/api-types/src/lib.rs b/packages/common/api-types/src/lib.rs index 345bda71ec..29ad6a120e 100644 --- a/packages/common/api-types/src/lib.rs +++ b/packages/common/api-types/src/lib.rs @@ -2,4 +2,5 @@ pub mod actors; pub mod datacenters; pub mod namespaces; pub mod pagination; +pub mod runner_configs; pub mod runners; diff --git a/packages/common/api-types/src/runner_configs/list.rs b/packages/common/api-types/src/runner_configs/list.rs new file mode 100644 index 0000000000..b9ded225d1 --- /dev/null +++ b/packages/common/api-types/src/runner_configs/list.rs @@ -0,0 +1,29 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; +use utoipa::IntoParams; + +use crate::pagination::Pagination; + +#[derive(Debug, Serialize, Deserialize, Clone, IntoParams)] +#[serde(deny_unknown_fields)] +#[into_params(parameter_in = Query)] +pub struct ListQuery { + pub namespace: String, + pub limit: Option, + pub cursor: Option, + pub variant: Option, + #[serde(default)] + pub runner_names: Option, +} + +#[derive(Deserialize, Clone)] +#[serde(deny_unknown_fields)] +pub struct ListPath {} + +#[derive(Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct ListResponse { + pub runner_configs: HashMap, + pub pagination: Pagination, +} diff --git a/packages/common/api-types/src/runner_configs/mod.rs b/packages/common/api-types/src/runner_configs/mod.rs new file mode 100644 index 0000000000..d17e233fbf --- /dev/null +++ b/packages/common/api-types/src/runner_configs/mod.rs @@ -0,0 +1 @@ +pub mod list; diff --git a/packages/common/api-util/src/lib.rs b/packages/common/api-util/src/lib.rs index e6876d3696..4883babb89 100644 --- a/packages/common/api-util/src/lib.rs +++ b/packages/common/api-util/src/lib.rs @@ -102,7 +102,7 @@ where Q: Serialize + Clone + Send + 'static, F: Fn(ApiCtx, Q) -> Fut + Clone + Send + 'static, Fut: Future> + Send, - A: Fn(I, &mut R), + A: Fn(u16, I, &mut R), R: Default + Send + 'static, { let dcs = &ctx.config().topology().datacenters; @@ -117,19 +117,22 @@ where async move { if dc.datacenter_label == ctx.config().dc_label() { // Local datacenter - use direct API call - local_handler(ctx, query).await + (dc.datacenter_label, local_handler(ctx, query).await) } else { // Remote datacenter - HTTP request - request_remote_datacenter::( - ctx.config(), + ( dc.datacenter_label, - &endpoint, - Method::GET, - headers, - Some(&query), - Option::<&()>::None, + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &endpoint, + Method::GET, + headers, + Some(&query), + Option::<&()>::None, + ) + .await, ) - .await } } })) @@ -141,11 +144,11 @@ where let result_count = results.len(); let mut errors = Vec::new(); let mut aggregated = R::default(); - for res in results { + for (dc_label, res) in results { match res { - Ok(data) => aggregator(data, &mut aggregated), + Ok(data) => aggregator(dc_label, data, &mut aggregated), Err(err) => { - tracing::error!(?err, "failed to request edge dc"); + tracing::error!(?dc_label, ?err, "failed to request edge dc"); errors.push(err); } } diff --git a/packages/common/types/Cargo.toml b/packages/common/types/Cargo.toml index 49bb8b9549..ae417ad704 100644 --- a/packages/common/types/Cargo.toml +++ b/packages/common/types/Cargo.toml @@ -13,6 +13,7 @@ rivet-data.workspace = true rivet-runner-protocol.workspace = true rivet-util.workspace = true serde.workspace = true +strum.workspace = true universaldb.workspace = true utoipa.workspace = true vbare.workspace = true diff --git a/packages/common/types/src/keys/mod.rs b/packages/common/types/src/keys/mod.rs index 38311a6f72..82726c01bd 100644 --- a/packages/common/types/src/keys/mod.rs +++ b/packages/common/types/src/keys/mod.rs @@ -1 +1,2 @@ +pub mod namespace; pub mod pegboard; diff --git a/packages/common/types/src/keys/namespace/mod.rs b/packages/common/types/src/keys/namespace/mod.rs new file mode 100644 index 0000000000..45f0e9ec4b --- /dev/null +++ b/packages/common/types/src/keys/namespace/mod.rs @@ -0,0 +1 @@ +pub mod runner_config; diff --git a/packages/common/types/src/keys/namespace/runner_config.rs b/packages/common/types/src/keys/namespace/runner_config.rs new file mode 100644 index 0000000000..7dc326fb8b --- /dev/null +++ b/packages/common/types/src/keys/namespace/runner_config.rs @@ -0,0 +1,25 @@ +use gas::prelude::*; +use utoipa::ToSchema; + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, strum::FromRepr, ToSchema)] +#[serde(rename_all = "snake_case")] +pub enum RunnerConfigVariant { + Serverless = 0, +} + +impl RunnerConfigVariant { + pub fn parse(v: &str) -> Option { + match v { + "serverless" => Some(RunnerConfigVariant::Serverless), + _ => None, + } + } +} + +impl std::fmt::Display for RunnerConfigVariant { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RunnerConfigVariant::Serverless => write!(f, "serverless"), + } + } +} diff --git a/packages/common/types/src/lib.rs b/packages/common/types/src/lib.rs index 37904cce2e..ed3c57a928 100644 --- a/packages/common/types/src/lib.rs +++ b/packages/common/types/src/lib.rs @@ -3,4 +3,5 @@ pub mod datacenters; pub mod keys; pub mod msgs; pub mod namespaces; +pub mod runner_configs; pub mod runners; diff --git a/packages/common/types/src/namespaces.rs b/packages/common/types/src/namespaces.rs index 5dd220e655..c7e6f71d59 100644 --- a/packages/common/types/src/namespaces.rs +++ b/packages/common/types/src/namespaces.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use gas::prelude::*; use utoipa::ToSchema; @@ -10,62 +8,3 @@ pub struct Namespace { pub display_name: String, pub create_ts: i64, } - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "snake_case")] -pub enum RunnerConfig { - Serverless { - url: String, - headers: HashMap, - /// Seconds. - request_lifespan: u32, - slots_per_runner: u32, - min_runners: u32, - max_runners: u32, - runners_margin: u32, - }, -} - -impl From for rivet_data::generated::namespace_runner_config_v1::Data { - fn from(value: RunnerConfig) -> Self { - match value { - RunnerConfig::Serverless { - url, - headers, - request_lifespan, - slots_per_runner, - min_runners, - max_runners, - runners_margin, - } => rivet_data::generated::namespace_runner_config_v1::Data::Serverless( - rivet_data::generated::namespace_runner_config_v1::Serverless { - url, - headers: headers.into(), - request_lifespan, - slots_per_runner, - min_runners, - max_runners, - runners_margin, - }, - ), - } - } -} - -impl From for RunnerConfig { - fn from(value: rivet_data::generated::namespace_runner_config_v1::Data) -> Self { - match value { - rivet_data::generated::namespace_runner_config_v1::Data::Serverless(o) => { - RunnerConfig::Serverless { - url: o.url, - headers: o.headers.into(), - request_lifespan: o.request_lifespan, - slots_per_runner: o.slots_per_runner, - min_runners: o.min_runners, - max_runners: o.max_runners, - runners_margin: o.runners_margin, - } - } - } - } -} diff --git a/packages/common/types/src/runner_configs.rs b/packages/common/types/src/runner_configs.rs new file mode 100644 index 0000000000..4cd1be9d99 --- /dev/null +++ b/packages/common/types/src/runner_configs.rs @@ -0,0 +1,63 @@ +use std::collections::HashMap; + +use gas::prelude::*; +use utoipa::ToSchema; + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "snake_case")] +pub enum RunnerConfig { + Serverless { + url: String, + headers: HashMap, + /// Seconds. + request_lifespan: u32, + slots_per_runner: u32, + min_runners: u32, + max_runners: u32, + runners_margin: u32, + }, +} + +impl From for rivet_data::generated::namespace_runner_config_v1::Data { + fn from(value: RunnerConfig) -> Self { + match value { + RunnerConfig::Serverless { + url, + headers, + request_lifespan, + slots_per_runner, + min_runners, + max_runners, + runners_margin, + } => rivet_data::generated::namespace_runner_config_v1::Data::Serverless( + rivet_data::generated::namespace_runner_config_v1::Serverless { + url, + headers: headers.into(), + request_lifespan, + slots_per_runner, + min_runners, + max_runners, + runners_margin, + }, + ), + } + } +} + +impl From for RunnerConfig { + fn from(value: rivet_data::generated::namespace_runner_config_v1::Data) -> Self { + match value { + rivet_data::generated::namespace_runner_config_v1::Data::Serverless(o) => { + RunnerConfig::Serverless { + url: o.url, + headers: o.headers.into(), + request_lifespan: o.request_lifespan, + slots_per_runner: o.slots_per_runner, + min_runners: o.min_runners, + max_runners: o.max_runners, + runners_margin: o.runners_margin, + } + } + } + } +} diff --git a/packages/common/universaldb/src/utils/keys.rs b/packages/common/universaldb/src/utils/keys.rs index 4c0a261948..1f1a11a7d8 100644 --- a/packages/common/universaldb/src/utils/keys.rs +++ b/packages/common/universaldb/src/utils/keys.rs @@ -59,7 +59,7 @@ define_keys! { (31, DBS, "dbs"), (32, ACTOR, "actor"), (33, BY_NAME, "by_name"), - // 34 + (34, DEFAULT, "default"), (35, REMAINING_MEMORY, "remaining_memory"), (36, REMAINING_CPU, "remaining_cpu"), (37, TOTAL_MEMORY, "total_memory"), diff --git a/packages/core/api-peer/src/runner_configs.rs b/packages/core/api-peer/src/runner_configs.rs index 31f38a0fd0..2623cc9d3a 100644 --- a/packages/core/api-peer/src/runner_configs.rs +++ b/packages/core/api-peer/src/runner_configs.rs @@ -1,36 +1,11 @@ -use std::collections::HashMap; - use anyhow::Result; use namespace::utils::runner_config_variant; use rivet_api_builder::ApiCtx; -use rivet_api_types::pagination::Pagination; +use rivet_api_types::{pagination::Pagination, runner_configs::list::*}; +use rivet_types::keys::namespace::runner_config::RunnerConfigVariant; use serde::{Deserialize, Serialize}; use utoipa::{IntoParams, ToSchema}; -#[derive(Debug, Serialize, Deserialize, Clone, IntoParams)] -#[serde(deny_unknown_fields)] -#[into_params(parameter_in = Query)] -pub struct ListQuery { - pub namespace: String, - pub limit: Option, - pub cursor: Option, - pub variant: Option, - #[serde(default)] - pub runner_names: Option, -} - -#[derive(Deserialize)] -#[serde(deny_unknown_fields)] -pub struct ListPath {} - -#[derive(Deserialize, Serialize, ToSchema)] -#[serde(deny_unknown_fields)] -#[schema(as = RunnerConfigsListResponse)] -pub struct ListResponse { - pub runner_configs: HashMap, - pub pagination: Pagination, -} - pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result { let namespace = ctx .op(namespace::ops::resolve_for_name_global::Input { @@ -41,7 +16,7 @@ pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result Result Result Result "/actors", peer_query, |ctx, query| async move { rivet_api_peer::actors::list::list(ctx, (), query).await }, - |res, agg| agg.extend(res.actors), + |_, res, agg| agg.extend(res.actors), ) .await?; diff --git a/packages/core/api-public/src/actors/list_names.rs b/packages/core/api-public/src/actors/list_names.rs index db6b2f2713..1d0a643b7e 100644 --- a/packages/core/api-public/src/actors/list_names.rs +++ b/packages/core/api-public/src/actors/list_names.rs @@ -63,7 +63,7 @@ async fn list_names_inner( |ctx, query| async move { rivet_api_peer::actors::list_names::list_names(ctx, (), query).await }, - |res, agg| agg.extend(res.names), + |_, res, agg| agg.extend(res.names), ) .await?; diff --git a/packages/core/api-public/src/router.rs b/packages/core/api-public/src/router.rs index c96111fdf5..154cc3ed3b 100644 --- a/packages/core/api-public/src/router.rs +++ b/packages/core/api-public/src/router.rs @@ -28,7 +28,7 @@ use crate::{actors, ctx, datacenters, namespaces, runner_configs, runners, ui}; datacenters::list, ), components( - schemas(namespace::keys::RunnerConfigVariant) + schemas(rivet_types::keys::namespace::runner_config::RunnerConfigVariant) ), security( ("bearer_auth" = []) ), modifiers(&SecurityAddon), diff --git a/packages/core/api-public/src/runner_configs.rs b/packages/core/api-public/src/runner_configs.rs index 1267f93294..cb5818e7c4 100644 --- a/packages/core/api-public/src/runner_configs.rs +++ b/packages/core/api-public/src/runner_configs.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use anyhow::Result; use axum::{ http::HeaderMap, @@ -7,12 +9,22 @@ use rivet_api_builder::{ ApiError, extract::{Extension, Json, Path, Query}, }; - use rivet_api_peer::runner_configs::*; -use rivet_api_util::request_remote_datacenter; +use rivet_api_types::{pagination::Pagination, runner_configs::list::*}; +use rivet_api_util::{fanout_to_datacenters, request_remote_datacenter}; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; use crate::ctx::ApiCtx; +#[derive(Deserialize, Serialize, ToSchema)] +#[serde(deny_unknown_fields)] +#[schema(as = RunnerConfigsListResponse)] +pub struct ListResponse { + pub runner_configs: HashMap>, + pub pagination: Pagination, +} + #[utoipa::path( get, operation_id = "runner_configs_list", @@ -45,23 +57,52 @@ async fn list_inner( ) -> Result { ctx.auth().await?; - if ctx.config().is_leader() { - rivet_api_peer::runner_configs::list(ctx.into(), path, query).await - } else { - let leader_dc = ctx.config().leader_dc()?; - request_remote_datacenter::( - ctx.config(), - leader_dc.datacenter_label, - "/runner-configs", - axum::http::Method::GET, - headers, - Some(&query), - Option::<&()>::None, - ) - .await - } + let runner_configs = fanout_to_datacenters::< + rivet_api_types::runner_configs::list::ListResponse, + _, + _, + _, + _, + HashMap>, + >( + ctx.clone().into(), + headers, + "/runner-configs", + query.clone(), + move |ctx, query| { + let path = path.clone(); + async move { rivet_api_peer::runner_configs::list(ctx, path, query).await } + }, + |dc_label, res, agg| { + for (runner_name, runner_config) in res.runner_configs { + let entry = agg.entry(runner_name).or_insert_with(HashMap::new); + + entry.insert( + ctx.config() + .dc_for_label(dc_label) + .expect("dc should exist") + .name + .clone(), + runner_config, + ); + } + }, + ) + .await?; + + Ok(ListResponse { + runner_configs, + pagination: Pagination { cursor: None }, + }) } +#[derive(Deserialize, Serialize, ToSchema)] +#[serde(deny_unknown_fields)] +#[schema(as = RunnerConfigsUpsertRequest)] +pub struct UpsertRequest( + #[schema(inline)] HashMap, +); + #[utoipa::path( put, operation_id = "runner_configs_upsert", @@ -94,25 +135,77 @@ async fn upsert_inner( headers: HeaderMap, path: UpsertPath, query: UpsertQuery, - body: UpsertRequest, + mut body: UpsertRequest, ) -> Result { ctx.auth().await?; - if ctx.config().is_leader() { - rivet_api_peer::runner_configs::upsert(ctx.into(), path, query, body).await - } else { - let leader_dc = ctx.config().leader_dc()?; - request_remote_datacenter::( - ctx.config(), - leader_dc.datacenter_label, - &format!("/runner-configs/{}", path.runner_name), - axum::http::Method::PUT, - headers, - Some(&query), - Some(&body), - ) - .await + let namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: query.namespace.clone(), + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + // Upsert default to epoxy + if let Some(default_config) = body.0.remove("default") { + ctx.op(namespace::ops::runner_config::upsert_default::Input { + namespace_id: namespace.namespace_id, + name: path.runner_name.clone(), + config: default_config, + }) + .await?; + } + + for dc in &ctx.config().topology().datacenters { + if let Some(runner_config) = body.0.remove(&dc.name) { + if ctx.config().dc_label() == dc.datacenter_label { + rivet_api_peer::runner_configs::upsert( + ctx.clone().into(), + path.clone(), + query.clone(), + rivet_api_peer::runner_configs::UpsertRequest(runner_config), + ) + .await?; + } else { + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &format!("/runner-configs/{}", path.runner_name), + axum::http::Method::PUT, + headers.clone(), + Some(&query), + Some(&runner_config), + ) + .await?; + } + } else { + if ctx.config().dc_label() == dc.datacenter_label { + rivet_api_peer::runner_configs::delete( + ctx.clone().into(), + DeletePath { + runner_name: path.runner_name.clone(), + }, + DeleteQuery { + namespace: query.namespace.clone(), + }, + ) + .await?; + } else { + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &format!("/runner-configs/{}", path.runner_name), + axum::http::Method::DELETE, + headers.clone(), + Some(&query), + Option::<&()>::None, + ) + .await?; + } + } } + + Ok(UpsertResponse {}) } #[utoipa::path( @@ -148,19 +241,45 @@ async fn delete_inner( ) -> Result { ctx.auth().await?; - if ctx.config().is_leader() { - rivet_api_peer::runner_configs::delete(ctx.into(), path, query).await - } else { - let leader_dc = ctx.config().leader_dc()?; - request_remote_datacenter::( - ctx.config(), - leader_dc.datacenter_label, - &format!("/runner-configs/{}", path.runner_name), - axum::http::Method::DELETE, - headers, - Some(&query), - Option::<&()>::None, - ) - .await + let namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: query.namespace.clone(), + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + for dc in &ctx.config().topology().datacenters { + if ctx.config().dc_label() == dc.datacenter_label { + rivet_api_peer::runner_configs::delete( + ctx.clone().into(), + DeletePath { + runner_name: path.runner_name.clone(), + }, + DeleteQuery { + namespace: query.namespace.clone(), + }, + ) + .await?; + } else { + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &format!("/runner-configs/{}", path.runner_name), + axum::http::Method::DELETE, + headers.clone(), + Some(&query), + Option::<&()>::None, + ) + .await?; + } } + + // Delete default from epoxy + ctx.op(namespace::ops::runner_config::delete_default::Input { + namespace_id: namespace.namespace_id, + name: path.runner_name.clone(), + }) + .await?; + + Ok(DeleteResponse {}) } diff --git a/packages/core/api-public/src/runners.rs b/packages/core/api-public/src/runners.rs index b4a22454e2..0c91a1ff55 100644 --- a/packages/core/api-public/src/runners.rs +++ b/packages/core/api-public/src/runners.rs @@ -46,7 +46,7 @@ async fn list_inner(ctx: ApiCtx, headers: HeaderMap, query: ListQuery) -> Result "/runners", query.clone(), |ctx, query| async move { rivet_api_peer::runners::list(ctx, (), query).await }, - |res, agg| agg.extend(res.runners), + |_, res, agg| agg.extend(res.runners), ) .await?; @@ -136,7 +136,7 @@ async fn list_names_inner( "/runners/names", peer_query, |ctx, query| async move { rivet_api_peer::runners::list_names(ctx, (), query).await }, - |res, agg| agg.extend(res.names), + |_, res, agg| agg.extend(res.names), ) .await?; diff --git a/packages/core/pegboard-serverless/src/lib.rs b/packages/core/pegboard-serverless/src/lib.rs index 52ec7b47d1..a30d4b23e5 100644 --- a/packages/core/pegboard-serverless/src/lib.rs +++ b/packages/core/pegboard-serverless/src/lib.rs @@ -15,7 +15,7 @@ use pegboard::keys; use reqwest::header::{HeaderName, HeaderValue}; use reqwest_eventsource as sse; use rivet_runner_protocol as protocol; -use rivet_types::namespaces::RunnerConfig; +use rivet_types::runner_configs::RunnerConfig; use tokio::{sync::oneshot, task::JoinHandle, time::Duration}; use universaldb::options::StreamingMode; use universaldb::utils::IsolationLevel::*; @@ -94,7 +94,7 @@ async fn tick( .await?; let runner_configs = ctx - .op(namespace::ops::runner_config::get_global::Input { + .op(namespace::ops::runner_config::get::Input { runners: serverless_data .iter() .map(|(ns_id, runner_name, _)| (*ns_id, runner_name.clone())) @@ -102,7 +102,7 @@ async fn tick( }) .await?; - tracing::debug!(?serverless_data, ?runner_configs); + tracing::info!(?runner_configs, "------------------"); for (ns_id, runner_name, desired_slots) in &serverless_data { let runner_config = runner_configs diff --git a/packages/services/namespace/Cargo.toml b/packages/services/namespace/Cargo.toml index 984e61b0c5..c9d6e953e4 100644 --- a/packages/services/namespace/Cargo.toml +++ b/packages/services/namespace/Cargo.toml @@ -7,6 +7,8 @@ edition.workspace = true [dependencies] anyhow.workspace = true +epoxy-protocol.workspace = true +epoxy.workspace = true gas.workspace = true internal.workspace = true reqwest.workspace = true diff --git a/packages/services/namespace/src/keys.rs b/packages/services/namespace/src/keys.rs deleted file mode 100644 index 1349f8d065..0000000000 --- a/packages/services/namespace/src/keys.rs +++ /dev/null @@ -1,419 +0,0 @@ -use std::result::Result::Ok; - -use anyhow::*; -use gas::prelude::*; -use serde::Serialize; -use universaldb::prelude::*; -use utoipa::ToSchema; -use vbare::OwnedVersionedData; - -pub fn subspace() -> universaldb::utils::Subspace { - universaldb::utils::Subspace::new(&(RIVET, NAMESPACE)) -} - -#[derive(Debug)] -pub struct NameKey { - namespace_id: Id, -} - -impl NameKey { - pub fn new(namespace_id: Id) -> Self { - NameKey { namespace_id } - } - - pub fn namespace_id(&self) -> Id { - self.namespace_id - } -} - -impl FormalKey for NameKey { - type Value = String; - - fn deserialize(&self, raw: &[u8]) -> Result { - String::from_utf8(raw.to_vec()).map_err(Into::into) - } - - fn serialize(&self, value: Self::Value) -> Result> { - Ok(value.into_bytes()) - } -} - -impl TuplePack for NameKey { - fn pack( - &self, - w: &mut W, - tuple_depth: TupleDepth, - ) -> std::io::Result { - let t = (DATA, self.namespace_id, NAME); - t.pack(w, tuple_depth) - } -} - -impl<'de> TupleUnpack<'de> for NameKey { - fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - let (input, (_, namespace_id, _)) = <(usize, Id, usize)>::unpack(input, tuple_depth)?; - - let v = NameKey { namespace_id }; - - Ok((input, v)) - } -} - -#[derive(Debug)] -pub struct DisplayNameKey { - namespace_id: Id, -} - -impl DisplayNameKey { - pub fn new(namespace_id: Id) -> Self { - DisplayNameKey { namespace_id } - } -} - -impl FormalKey for DisplayNameKey { - type Value = String; - - fn deserialize(&self, raw: &[u8]) -> Result { - String::from_utf8(raw.to_vec()).map_err(Into::into) - } - - fn serialize(&self, value: Self::Value) -> Result> { - Ok(value.into_bytes()) - } -} - -impl TuplePack for DisplayNameKey { - fn pack( - &self, - w: &mut W, - tuple_depth: TupleDepth, - ) -> std::io::Result { - let t = (DATA, self.namespace_id, DISPLAY_NAME); - t.pack(w, tuple_depth) - } -} - -impl<'de> TupleUnpack<'de> for DisplayNameKey { - fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - let (input, (_, namespace_id, _)) = <(usize, Id, usize)>::unpack(input, tuple_depth)?; - - let v = DisplayNameKey { namespace_id }; - - Ok((input, v)) - } -} - -#[derive(Debug)] -pub struct CreateTsKey { - namespace_id: Id, -} - -impl CreateTsKey { - pub fn new(namespace_id: Id) -> Self { - CreateTsKey { namespace_id } - } -} - -impl FormalKey for CreateTsKey { - // Timestamp. - type Value = i64; - - fn deserialize(&self, raw: &[u8]) -> Result { - Ok(i64::from_be_bytes(raw.try_into()?)) - } - - fn serialize(&self, value: Self::Value) -> Result> { - Ok(value.to_be_bytes().to_vec()) - } -} - -impl TuplePack for CreateTsKey { - fn pack( - &self, - w: &mut W, - tuple_depth: TupleDepth, - ) -> std::io::Result { - let t = (DATA, self.namespace_id, CREATE_TS); - t.pack(w, tuple_depth) - } -} - -impl<'de> TupleUnpack<'de> for CreateTsKey { - fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - let (input, (_, namespace_id, _)) = <(usize, Id, usize)>::unpack(input, tuple_depth)?; - let v = CreateTsKey { namespace_id }; - - Ok((input, v)) - } -} - -#[derive(Debug)] -pub struct ByNameKey { - name: String, -} - -impl ByNameKey { - pub fn new(name: String) -> Self { - ByNameKey { name } - } -} - -impl FormalKey for ByNameKey { - /// Namespace id. - type Value = Id; - - fn deserialize(&self, raw: &[u8]) -> Result { - Ok(Id::from_slice(raw)?) - } - - fn serialize(&self, value: Self::Value) -> Result> { - Ok(value.as_bytes()) - } -} - -impl TuplePack for ByNameKey { - fn pack( - &self, - w: &mut W, - tuple_depth: TupleDepth, - ) -> std::io::Result { - let t = (BY_NAME, &self.name); - t.pack(w, tuple_depth) - } -} - -impl<'de> TupleUnpack<'de> for ByNameKey { - fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - let (input, (_, name)) = <(usize, String)>::unpack(input, tuple_depth)?; - - let v = ByNameKey { name }; - - Ok((input, v)) - } -} - -#[derive(Clone, Copy, Debug, Serialize, Deserialize, strum::FromRepr, ToSchema)] -#[serde(rename_all = "snake_case")] -pub enum RunnerConfigVariant { - Serverless = 0, -} - -impl RunnerConfigVariant { - pub fn parse(v: &str) -> Option { - match v { - "serverless" => Some(RunnerConfigVariant::Serverless), - _ => None, - } - } -} - -impl std::fmt::Display for RunnerConfigVariant { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - RunnerConfigVariant::Serverless => write!(f, "serverless"), - } - } -} - -#[derive(Debug)] -pub struct RunnerConfigKey { - pub namespace_id: Id, - pub name: String, -} - -impl RunnerConfigKey { - pub fn new(namespace_id: Id, name: String) -> Self { - RunnerConfigKey { namespace_id, name } - } - - pub fn subspace(namespace_id: Id) -> RunnerConfigSubspaceKey { - RunnerConfigSubspaceKey::new(namespace_id) - } -} - -impl FormalKey for RunnerConfigKey { - type Value = rivet_types::namespaces::RunnerConfig; - - fn deserialize(&self, raw: &[u8]) -> Result { - Ok( - rivet_data::versioned::NamespaceRunnerConfig::deserialize_with_embedded_version(raw)? - .into(), - ) - } - - fn serialize(&self, value: Self::Value) -> Result> { - rivet_data::versioned::NamespaceRunnerConfig::latest(value.into()) - .serialize_with_embedded_version( - rivet_data::PEGBOARD_NAMESPACE_RUNNER_ALLOC_IDX_VERSION, - ) - } -} - -impl TuplePack for RunnerConfigKey { - fn pack( - &self, - w: &mut W, - tuple_depth: TupleDepth, - ) -> std::io::Result { - let t = (RUNNER, CONFIG, DATA, self.namespace_id, &self.name); - t.pack(w, tuple_depth) - } -} - -impl<'de> TupleUnpack<'de> for RunnerConfigKey { - fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - let (input, (_, _, _, namespace_id, name)) = - <(usize, usize, usize, Id, String)>::unpack(input, tuple_depth)?; - - let v = RunnerConfigKey { namespace_id, name }; - - Ok((input, v)) - } -} - -pub struct RunnerConfigSubspaceKey { - pub namespace_id: Id, -} - -impl RunnerConfigSubspaceKey { - pub fn new(namespace_id: Id) -> Self { - RunnerConfigSubspaceKey { namespace_id } - } -} - -impl TuplePack for RunnerConfigSubspaceKey { - fn pack( - &self, - w: &mut W, - tuple_depth: TupleDepth, - ) -> std::io::Result { - let mut offset = VersionstampOffset::None { size: 0 }; - - let t = (RUNNER, CONFIG, DATA, self.namespace_id); - offset += t.pack(w, tuple_depth)?; - - Ok(offset) - } -} - -#[derive(Debug)] -pub struct RunnerConfigByVariantKey { - pub namespace_id: Id, - pub variant: RunnerConfigVariant, - pub name: String, -} - -impl RunnerConfigByVariantKey { - pub fn new(namespace_id: Id, variant: RunnerConfigVariant, name: String) -> Self { - RunnerConfigByVariantKey { - namespace_id, - name, - variant, - } - } - - pub fn subspace(namespace_id: Id) -> RunnerConfigByVariantSubspaceKey { - RunnerConfigByVariantSubspaceKey::new(namespace_id) - } - - pub fn subspace_with_variant( - namespace_id: Id, - variant: RunnerConfigVariant, - ) -> RunnerConfigByVariantSubspaceKey { - RunnerConfigByVariantSubspaceKey::new_with_variant(namespace_id, variant) - } -} - -impl FormalKey for RunnerConfigByVariantKey { - type Value = rivet_types::namespaces::RunnerConfig; - - fn deserialize(&self, raw: &[u8]) -> Result { - Ok( - rivet_data::versioned::NamespaceRunnerConfig::deserialize_with_embedded_version(raw)? - .into(), - ) - } - - fn serialize(&self, value: Self::Value) -> Result> { - rivet_data::versioned::NamespaceRunnerConfig::latest(value.into()) - .serialize_with_embedded_version( - rivet_data::PEGBOARD_NAMESPACE_RUNNER_ALLOC_IDX_VERSION, - ) - } -} - -impl TuplePack for RunnerConfigByVariantKey { - fn pack( - &self, - w: &mut W, - tuple_depth: TupleDepth, - ) -> std::io::Result { - let t = ( - RUNNER, - CONFIG, - BY_VARIANT, - self.namespace_id, - self.variant as usize, - &self.name, - ); - t.pack(w, tuple_depth) - } -} - -impl<'de> TupleUnpack<'de> for RunnerConfigByVariantKey { - fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - let (input, (_, _, _, namespace_id, variant, name)) = - <(usize, usize, usize, Id, usize, String)>::unpack(input, tuple_depth)?; - let variant = RunnerConfigVariant::from_repr(variant).ok_or_else(|| { - PackError::Message(format!("invalid runner config variant `{variant}` in key").into()) - })?; - - let v = RunnerConfigByVariantKey { - namespace_id, - variant, - name, - }; - - Ok((input, v)) - } -} - -pub struct RunnerConfigByVariantSubspaceKey { - pub namespace_id: Id, - pub variant: Option, -} - -impl RunnerConfigByVariantSubspaceKey { - pub fn new(namespace_id: Id) -> Self { - RunnerConfigByVariantSubspaceKey { - namespace_id, - variant: None, - } - } - - pub fn new_with_variant(namespace_id: Id, variant: RunnerConfigVariant) -> Self { - RunnerConfigByVariantSubspaceKey { - namespace_id, - variant: Some(variant), - } - } -} - -impl TuplePack for RunnerConfigByVariantSubspaceKey { - fn pack( - &self, - w: &mut W, - tuple_depth: TupleDepth, - ) -> std::io::Result { - let mut offset = VersionstampOffset::None { size: 0 }; - - let t = (RUNNER, CONFIG, BY_VARIANT, self.namespace_id); - offset += t.pack(w, tuple_depth)?; - - if let Some(variant) = self.variant { - offset += (variant as usize).pack(w, tuple_depth)?; - } - - Ok(offset) - } -} diff --git a/packages/services/namespace/src/keys/mod.rs b/packages/services/namespace/src/keys/mod.rs new file mode 100644 index 0000000000..b04883c981 --- /dev/null +++ b/packages/services/namespace/src/keys/mod.rs @@ -0,0 +1,190 @@ +use anyhow::Result; +use gas::prelude::*; +use universaldb::prelude::*; + +pub mod runner_config; + +pub fn subspace() -> universaldb::utils::Subspace { + universaldb::utils::Subspace::new(&(RIVET, NAMESPACE)) +} + +#[derive(Debug)] +pub struct NameKey { + namespace_id: Id, +} + +impl NameKey { + pub fn new(namespace_id: Id) -> Self { + NameKey { namespace_id } + } + + pub fn namespace_id(&self) -> Id { + self.namespace_id + } +} + +impl FormalKey for NameKey { + type Value = String; + + fn deserialize(&self, raw: &[u8]) -> Result { + String::from_utf8(raw.to_vec()).map_err(Into::into) + } + + fn serialize(&self, value: Self::Value) -> Result> { + Ok(value.into_bytes()) + } +} + +impl TuplePack for NameKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = (DATA, self.namespace_id, NAME); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for NameKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, namespace_id, _)) = <(usize, Id, usize)>::unpack(input, tuple_depth)?; + + let v = NameKey { namespace_id }; + + Ok((input, v)) + } +} + +#[derive(Debug)] +pub struct DisplayNameKey { + namespace_id: Id, +} + +impl DisplayNameKey { + pub fn new(namespace_id: Id) -> Self { + DisplayNameKey { namespace_id } + } +} + +impl FormalKey for DisplayNameKey { + type Value = String; + + fn deserialize(&self, raw: &[u8]) -> Result { + String::from_utf8(raw.to_vec()).map_err(Into::into) + } + + fn serialize(&self, value: Self::Value) -> Result> { + Ok(value.into_bytes()) + } +} + +impl TuplePack for DisplayNameKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = (DATA, self.namespace_id, DISPLAY_NAME); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for DisplayNameKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, namespace_id, _)) = <(usize, Id, usize)>::unpack(input, tuple_depth)?; + + let v = DisplayNameKey { namespace_id }; + + Ok((input, v)) + } +} + +#[derive(Debug)] +pub struct CreateTsKey { + namespace_id: Id, +} + +impl CreateTsKey { + pub fn new(namespace_id: Id) -> Self { + CreateTsKey { namespace_id } + } +} + +impl FormalKey for CreateTsKey { + // Timestamp. + type Value = i64; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok(i64::from_be_bytes(raw.try_into()?)) + } + + fn serialize(&self, value: Self::Value) -> Result> { + Ok(value.to_be_bytes().to_vec()) + } +} + +impl TuplePack for CreateTsKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = (DATA, self.namespace_id, CREATE_TS); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for CreateTsKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, namespace_id, _)) = <(usize, Id, usize)>::unpack(input, tuple_depth)?; + let v = CreateTsKey { namespace_id }; + + Ok((input, v)) + } +} + +#[derive(Debug)] +pub struct ByNameKey { + name: String, +} + +impl ByNameKey { + pub fn new(name: String) -> Self { + ByNameKey { name } + } +} + +impl FormalKey for ByNameKey { + /// Namespace id. + type Value = Id; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok(Id::from_slice(raw)?) + } + + fn serialize(&self, value: Self::Value) -> Result> { + Ok(value.as_bytes()) + } +} + +impl TuplePack for ByNameKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = (BY_NAME, &self.name); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for ByNameKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, name)) = <(usize, String)>::unpack(input, tuple_depth)?; + + let v = ByNameKey { name }; + + Ok((input, v)) + } +} diff --git a/packages/services/namespace/src/keys/runner_config.rs b/packages/services/namespace/src/keys/runner_config.rs new file mode 100644 index 0000000000..f9a178f33f --- /dev/null +++ b/packages/services/namespace/src/keys/runner_config.rs @@ -0,0 +1,260 @@ +use anyhow::Result; +use gas::prelude::*; +use rivet_types::keys::namespace::runner_config::RunnerConfigVariant; +use universaldb::prelude::*; +use vbare::OwnedVersionedData; + +#[derive(Debug)] +pub struct DataKey { + pub namespace_id: Id, + pub name: String, +} + +impl DataKey { + pub fn new(namespace_id: Id, name: String) -> Self { + DataKey { namespace_id, name } + } + + pub fn subspace(namespace_id: Id) -> DataSubspaceKey { + DataSubspaceKey::new(namespace_id) + } +} + +impl FormalKey for DataKey { + type Value = rivet_types::runner_configs::RunnerConfig; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok( + rivet_data::versioned::NamespaceRunnerConfig::deserialize_with_embedded_version(raw)? + .into(), + ) + } + + fn serialize(&self, value: Self::Value) -> Result> { + rivet_data::versioned::NamespaceRunnerConfig::latest(value.into()) + .serialize_with_embedded_version( + rivet_data::PEGBOARD_NAMESPACE_RUNNER_ALLOC_IDX_VERSION, + ) + } +} + +impl TuplePack for DataKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = (RUNNER, CONFIG, DATA, self.namespace_id, &self.name); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for DataKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _, _, namespace_id, name)) = + <(usize, usize, usize, Id, String)>::unpack(input, tuple_depth)?; + + let v = DataKey { namespace_id, name }; + + Ok((input, v)) + } +} + +pub struct DataSubspaceKey { + pub namespace_id: Id, +} + +impl DataSubspaceKey { + pub fn new(namespace_id: Id) -> Self { + DataSubspaceKey { namespace_id } + } +} + +impl TuplePack for DataSubspaceKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let mut offset = VersionstampOffset::None { size: 0 }; + + let t = (RUNNER, CONFIG, DATA, self.namespace_id); + offset += t.pack(w, tuple_depth)?; + + Ok(offset) + } +} + +#[derive(Debug)] +pub struct ByVariantKey { + pub namespace_id: Id, + pub variant: RunnerConfigVariant, + pub name: String, +} + +impl ByVariantKey { + pub fn new(namespace_id: Id, variant: RunnerConfigVariant, name: String) -> Self { + ByVariantKey { + namespace_id, + name, + variant, + } + } + + pub fn subspace(namespace_id: Id) -> ByVariantSubspaceKey { + ByVariantSubspaceKey::new(namespace_id) + } + + pub fn subspace_with_variant( + namespace_id: Id, + variant: RunnerConfigVariant, + ) -> ByVariantSubspaceKey { + ByVariantSubspaceKey::new_with_variant(namespace_id, variant) + } +} + +impl FormalKey for ByVariantKey { + type Value = rivet_types::runner_configs::RunnerConfig; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok( + rivet_data::versioned::NamespaceRunnerConfig::deserialize_with_embedded_version(raw)? + .into(), + ) + } + + fn serialize(&self, value: Self::Value) -> Result> { + rivet_data::versioned::NamespaceRunnerConfig::latest(value.into()) + .serialize_with_embedded_version( + rivet_data::PEGBOARD_NAMESPACE_RUNNER_ALLOC_IDX_VERSION, + ) + } +} + +impl TuplePack for ByVariantKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = ( + RUNNER, + CONFIG, + BY_VARIANT, + self.namespace_id, + self.variant as usize, + &self.name, + ); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for ByVariantKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _, _, namespace_id, variant, name)) = + <(usize, usize, usize, Id, usize, String)>::unpack(input, tuple_depth)?; + let variant = RunnerConfigVariant::from_repr(variant).ok_or_else(|| { + PackError::Message(format!("invalid runner config variant `{variant}` in key").into()) + })?; + + let v = ByVariantKey { + namespace_id, + variant, + name, + }; + + Ok((input, v)) + } +} + +pub struct ByVariantSubspaceKey { + pub namespace_id: Id, + pub variant: Option, +} + +impl ByVariantSubspaceKey { + pub fn new(namespace_id: Id) -> Self { + ByVariantSubspaceKey { + namespace_id, + variant: None, + } + } + + pub fn new_with_variant(namespace_id: Id, variant: RunnerConfigVariant) -> Self { + ByVariantSubspaceKey { + namespace_id, + variant: Some(variant), + } + } +} + +impl TuplePack for ByVariantSubspaceKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let mut offset = VersionstampOffset::None { size: 0 }; + + let t = (RUNNER, CONFIG, BY_VARIANT, self.namespace_id); + offset += t.pack(w, tuple_depth)?; + + if let Some(variant) = self.variant { + offset += (variant as usize).pack(w, tuple_depth)?; + } + + Ok(offset) + } +} + +#[derive(Debug)] +pub struct DefaultKey { + pub namespace_id: Id, + pub name: String, +} + +impl DefaultKey { + pub fn new(namespace_id: Id, name: String) -> Self { + DefaultKey { namespace_id, name } + } +} + +impl FormalKey for DefaultKey { + type Value = rivet_types::runner_configs::RunnerConfig; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok( + rivet_data::versioned::NamespaceRunnerConfig::deserialize_with_embedded_version(raw)? + .into(), + ) + } + + fn serialize(&self, value: Self::Value) -> Result> { + rivet_data::versioned::NamespaceRunnerConfig::latest(value.into()) + .serialize_with_embedded_version( + rivet_data::PEGBOARD_NAMESPACE_RUNNER_ALLOC_IDX_VERSION, + ) + } +} + +impl TuplePack for DefaultKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = (RUNNER, CONFIG, DEFAULT, self.namespace_id, &self.name); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for DefaultKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _, _, namespace_id, name)) = + <(usize, usize, usize, Id, String)>::unpack(input, tuple_depth)?; + + let v = DefaultKey { namespace_id, name }; + + Ok((input, v)) + } +} diff --git a/packages/services/namespace/src/ops/runner_config/delete.rs b/packages/services/namespace/src/ops/runner_config/delete.rs index 6f169b48c8..1600d780b1 100644 --- a/packages/services/namespace/src/ops/runner_config/delete.rs +++ b/packages/services/namespace/src/ops/runner_config/delete.rs @@ -1,5 +1,4 @@ use gas::prelude::*; -use rivet_cache::CacheKey; use universaldb::utils::IsolationLevel::*; use crate::{errors, keys, utils::runner_config_variant}; @@ -22,13 +21,13 @@ pub async fn namespace_runner_config_delete(ctx: &OperationCtx, input: &Input) - // Read existing config to determine variant let runner_config_key = - keys::RunnerConfigKey::new(input.namespace_id, input.name.clone()); + keys::runner_config::DataKey::new(input.namespace_id, input.name.clone()); if let Some(config) = tx.read_opt(&runner_config_key, Serializable).await? { tx.delete(&runner_config_key); // Clear secondary idx - tx.delete(&keys::RunnerConfigByVariantKey::new( + tx.delete(&keys::runner_config::ByVariantKey::new( input.namespace_id, runner_config_variant(&config), input.name.clone(), @@ -40,15 +39,9 @@ pub async fn namespace_runner_config_delete(ctx: &OperationCtx, input: &Input) - .custom_instrument(tracing::info_span!("runner_config_upsert_tx")) .await?; - // Purge cache in all dcs - ctx.op(internal::ops::cache::purge_global::Input { - base_key: "namespace.runner_config.{}.get_global".to_string(), - keys: vec![(input.namespace_id, input.name.as_str()).cache_key().into()], - }) - .await?; - - // Bump autoscaler in all dcs - ctx.op(internal::ops::bump_serverless_autoscaler_global::Input {}) + // Bump autoscaler + ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) + .send() .await?; Ok(()) diff --git a/packages/services/namespace/src/ops/runner_config/delete_default.rs b/packages/services/namespace/src/ops/runner_config/delete_default.rs new file mode 100644 index 0000000000..6a6198e229 --- /dev/null +++ b/packages/services/namespace/src/ops/runner_config/delete_default.rs @@ -0,0 +1,43 @@ +use epoxy_protocol::protocol; +use gas::prelude::*; + +use crate::keys; + +#[derive(Debug)] +pub struct Input { + pub namespace_id: Id, + pub name: String, +} + +#[operation] +pub async fn namespace_runner_config_delete_default( + ctx: &OperationCtx, + input: &Input, +) -> Result<()> { + let key = keys::runner_config::DefaultKey::new(input.namespace_id, input.name.clone()); + let key_packed = keys::subspace().pack(&key); + + let result = ctx + .op(epoxy::ops::propose::Input { + proposal: protocol::Proposal { + commands: vec![protocol::Command { + kind: protocol::CommandKind::SetCommand(protocol::SetCommand { + key: key_packed, + value: None, + }), + }], + }, + }) + .await?; + + ensure!( + matches!(result, epoxy::ops::propose::ProposalResult::Committed), + "proposal failed" + ); + + // Bump autoscaler in all dcs + ctx.op(internal::ops::bump_serverless_autoscaler_global::Input {}) + .await?; + + Ok(()) +} diff --git a/packages/services/namespace/src/ops/runner_config/get_local.rs b/packages/services/namespace/src/ops/runner_config/get.rs similarity index 66% rename from packages/services/namespace/src/ops/runner_config/get_local.rs rename to packages/services/namespace/src/ops/runner_config/get.rs index a03c64ee99..ff6c5201a4 100644 --- a/packages/services/namespace/src/ops/runner_config/get_local.rs +++ b/packages/services/namespace/src/ops/runner_config/get.rs @@ -14,11 +14,12 @@ pub struct Input { pub struct RunnerConfig { pub namespace_id: Id, pub name: String, - pub config: rivet_types::namespaces::RunnerConfig, + pub config: rivet_types::runner_configs::RunnerConfig, + pub is_default: bool, } #[operation] -pub async fn namespace_runner_config_get_local( +pub async fn namespace_runner_config_get( ctx: &OperationCtx, input: &Input, ) -> Result> { @@ -37,19 +38,36 @@ pub async fn namespace_runner_config_get_local( let tx = tx.with_subspace(keys::subspace()); let runner_config_key = - keys::RunnerConfigKey::new(namespace_id, runner_name.clone()); + keys::runner_config::DataKey::new(namespace_id, runner_name.clone()); // Runner config not found let Some(runner_config) = tx.read_opt(&runner_config_key, Serializable).await? else { - return Ok(None); + let default_config = ctx + .op(crate::ops::runner_config::get_default::Input { + namespace_id, + name: runner_name.clone(), + }) + .await?; + + let Some(default_config) = default_config else { + return Ok(None); + }; + + return Ok(Some(RunnerConfig { + namespace_id, + name: runner_name, + config: default_config, + is_default: true, + })); }; Ok(Some(RunnerConfig { namespace_id, name: runner_name, config: runner_config, + is_default: false, })) } }) diff --git a/packages/services/namespace/src/ops/runner_config/get_default.rs b/packages/services/namespace/src/ops/runner_config/get_default.rs new file mode 100644 index 0000000000..c76e4c1fee --- /dev/null +++ b/packages/services/namespace/src/ops/runner_config/get_default.rs @@ -0,0 +1,33 @@ +use gas::prelude::*; +use rivet_types::runner_configs::RunnerConfig; +use universaldb::prelude::FormalKey; + +use crate::keys; + +#[derive(Debug)] +pub struct Input { + pub namespace_id: Id, + pub name: String, +} + +#[operation] +pub async fn namespace_runner_config_get_default( + ctx: &OperationCtx, + input: &Input, +) -> Result> { + let key = keys::runner_config::DefaultKey::new(input.namespace_id, input.name.clone()); + let key_packed = keys::subspace().pack(&key); + + let data = ctx + .op(epoxy::ops::kv::get_optimistic::Input { + replica_id: ctx.config().epoxy_replica_id(), + key: key_packed, + }) + .await?; + + let Some(value) = data.value else { + return Ok(None); + }; + + Ok(Some(key.deserialize(&value)?)) +} diff --git a/packages/services/namespace/src/ops/runner_config/get_global.rs b/packages/services/namespace/src/ops/runner_config/get_global.rs deleted file mode 100644 index af634e4c20..0000000000 --- a/packages/services/namespace/src/ops/runner_config/get_global.rs +++ /dev/null @@ -1,106 +0,0 @@ -use gas::prelude::*; -use rivet_types::namespaces::RunnerConfig; -use std::collections::HashMap; - -#[derive(Debug)] -pub struct Input { - pub runners: Vec<(Id, String)>, -} - -#[operation] -pub async fn namespace_runner_config_get_global( - ctx: &OperationCtx, - input: &Input, -) -> Result> { - if ctx.config().is_leader() { - ctx.op(super::get_local::Input { - runners: input.runners.clone(), - }) - .await - } else { - let leader_dc = ctx.config().leader_dc()?; - let client = rivet_pools::reqwest::client().await?; - - ctx.cache() - .clone() - .request() - .fetch_all_json( - &format!("namespace.runner_config.get_global"), - input.runners.clone(), - { - let leader_dc = leader_dc.clone(); - let client = client.clone(); - - move |mut cache, runners| { - let leader_dc = leader_dc.clone(); - let client = client.clone(); - - async move { - let namespaces = ctx - .op(crate::ops::get_global::Input { - namespace_ids: runners - .iter() - .map(|(ns_id, _)| *ns_id) - .collect(), - }) - .await?; - - let mut runner_names_by_namespace_id = - HashMap::with_capacity(runners.len()); - - for (namespace_id, runner_name) in runners { - let runner_names = runner_names_by_namespace_id - .entry(namespace_id) - .or_insert_with(Vec::new); - runner_names.push(runner_name); - } - - // TODO: Parallelize - for (namespace_id, runner_names) in runner_names_by_namespace_id { - let namespace = namespaces - .iter() - .find(|n| n.namespace_id == namespace_id) - .context("namespace not found")?; - let url = leader_dc.api_peer_url.join("/runner-configs")?; - let res = client - .get(url) - .query(&[("namespace", &namespace.name)]) - .query( - &runner_names - .iter() - .map(|runner_name| ("runner", runner_name)) - .collect::>(), - ) - .send() - .await?; - - let res = - rivet_api_util::parse_response::(res) - .await?; - - for (runner_name, runner_config) in res.runner_configs { - cache.resolve( - &(namespace_id, runner_name.clone()), - super::get_local::RunnerConfig { - namespace_id, - name: runner_name, - config: runner_config, - }, - ); - } - } - - Ok(cache) - } - } - }, - ) - .await - } -} - -// TODO: Cyclical dependency with api_peer -#[derive(Deserialize)] -struct RunnerConfigListResponse { - runner_configs: HashMap, -} diff --git a/packages/services/namespace/src/ops/runner_config/list.rs b/packages/services/namespace/src/ops/runner_config/list.rs index 7344b88de3..12501d8160 100644 --- a/packages/services/namespace/src/ops/runner_config/list.rs +++ b/packages/services/namespace/src/ops/runner_config/list.rs @@ -1,6 +1,7 @@ use futures_util::{StreamExt, TryStreamExt}; use gas::prelude::*; -use rivet_types::namespaces::RunnerConfig; +use rivet_types::keys::namespace::runner_config::RunnerConfigVariant; +use rivet_types::runner_configs::RunnerConfig; use universaldb::options::StreamingMode; use universaldb::utils::IsolationLevel::*; @@ -9,11 +10,12 @@ use crate::{errors, keys}; #[derive(Debug)] pub struct Input { pub namespace_id: Id, - pub variant: Option, + pub variant: Option, pub after_name: Option, pub limit: usize, } +// TODO: Needs to return default configs if they exist (currently no way to list from epoxy) #[operation] pub async fn namespace_runner_config_list( ctx: &OperationCtx, @@ -30,14 +32,14 @@ pub async fn namespace_runner_config_list( let (start, end) = if let Some(variant) = input.variant { let (start, end) = keys::subspace() - .subspace(&keys::RunnerConfigByVariantKey::subspace_with_variant( + .subspace(&keys::runner_config::ByVariantKey::subspace_with_variant( input.namespace_id, variant, )) .range(); let start = if let Some(name) = &input.after_name { - tx.pack(&keys::RunnerConfigByVariantKey::new( + tx.pack(&keys::runner_config::ByVariantKey::new( input.namespace_id, variant, name.clone(), @@ -49,11 +51,11 @@ pub async fn namespace_runner_config_list( (start, end) } else { let (start, end) = keys::subspace() - .subspace(&keys::RunnerConfigKey::subspace(input.namespace_id)) + .subspace(&keys::runner_config::DataKey::subspace(input.namespace_id)) .range(); let start = if let Some(name) = &input.after_name { - tx.pack(&keys::RunnerConfigKey::new( + tx.pack(&keys::runner_config::DataKey::new( input.namespace_id, name.clone(), )) @@ -76,10 +78,11 @@ pub async fn namespace_runner_config_list( Ok(entry) => { if input.variant.is_some() { let (key, config) = - tx.read_entry::(&entry)?; + tx.read_entry::(&entry)?; Ok((key.name, config)) } else { - let (key, config) = tx.read_entry::(&entry)?; + let (key, config) = + tx.read_entry::(&entry)?; Ok((key.name, config)) } } diff --git a/packages/services/namespace/src/ops/runner_config/mod.rs b/packages/services/namespace/src/ops/runner_config/mod.rs index 3c44a67d92..d03e1ff62e 100644 --- a/packages/services/namespace/src/ops/runner_config/mod.rs +++ b/packages/services/namespace/src/ops/runner_config/mod.rs @@ -1,5 +1,7 @@ pub mod delete; -pub mod get_global; -pub mod get_local; +pub mod delete_default; +pub mod get; +pub mod get_default; pub mod list; pub mod upsert; +pub mod upsert_default; diff --git a/packages/services/namespace/src/ops/runner_config/upsert.rs b/packages/services/namespace/src/ops/runner_config/upsert.rs index 0d2cb0b798..9f801b2fbf 100644 --- a/packages/services/namespace/src/ops/runner_config/upsert.rs +++ b/packages/services/namespace/src/ops/runner_config/upsert.rs @@ -1,6 +1,5 @@ use gas::prelude::*; -use rivet_cache::CacheKey; -use rivet_types::namespaces::RunnerConfig; +use rivet_types::runner_configs::RunnerConfig; use universaldb::options::MutationType; use crate::{errors, keys, utils::runner_config_variant}; @@ -14,23 +13,19 @@ pub struct Input { #[operation] pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> Result<()> { - if !ctx.config().is_leader() { - return Err(errors::Namespace::NotLeader.build()); - } - ctx.udb()? .run(|tx| async move { let tx = tx.with_subspace(keys::subspace()); // TODO: Once other types of configs get added, delete previous config before writing tx.write( - &keys::RunnerConfigKey::new(input.namespace_id, input.name.clone()), + &keys::runner_config::DataKey::new(input.namespace_id, input.name.clone()), input.config.clone(), )?; // Write to secondary idx tx.write( - &keys::RunnerConfigByVariantKey::new( + &keys::runner_config::ByVariantKey::new( input.namespace_id, runner_config_variant(&input.config), input.name.clone(), @@ -107,16 +102,9 @@ pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) - .await? .map_err(|err| err.build())?; - // Purge cache in all dcs - let variant_str = serde_json::to_string(&runner_config_variant(&input.config))?; - ctx.op(internal::ops::cache::purge_global::Input { - base_key: format!("namespace.runner_config.{variant_str}.get_global"), - keys: vec![(input.namespace_id, input.name.as_str()).cache_key().into()], - }) - .await?; - - // Bump autoscaler in all dcs - ctx.op(internal::ops::bump_serverless_autoscaler_global::Input {}) + // Bump autoscaler + ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) + .send() .await?; Ok(()) diff --git a/packages/services/namespace/src/ops/runner_config/upsert_default.rs b/packages/services/namespace/src/ops/runner_config/upsert_default.rs new file mode 100644 index 0000000000..64ec5b6b6b --- /dev/null +++ b/packages/services/namespace/src/ops/runner_config/upsert_default.rs @@ -0,0 +1,45 @@ +use epoxy_protocol::protocol; +use gas::prelude::*; +use rivet_types::runner_configs::RunnerConfig; +use universaldb::prelude::FormalKey; + +use crate::keys; + +#[derive(Debug)] +pub struct Input { + pub namespace_id: Id, + pub name: String, + pub config: RunnerConfig, +} + +#[operation] +pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> Result<()> { + let key = keys::runner_config::DefaultKey::new(input.namespace_id, input.name.clone()); + let key_packed = keys::subspace().pack(&key); + let runner_config_packed = key.serialize(input.config.clone())?; + + // Propagate default runner config over epoxy + let result = ctx + .op(epoxy::ops::propose::Input { + proposal: protocol::Proposal { + commands: vec![protocol::Command { + kind: protocol::CommandKind::SetCommand(protocol::SetCommand { + key: key_packed, + value: Some(runner_config_packed), + }), + }], + }, + }) + .await?; + + ensure!( + matches!(result, epoxy::ops::propose::ProposalResult::Committed), + "proposal failed" + ); + + // Bump autoscaler in all dcs + ctx.op(internal::ops::bump_serverless_autoscaler_global::Input {}) + .await?; + + Ok(()) +} diff --git a/packages/services/namespace/src/utils.rs b/packages/services/namespace/src/utils.rs index 296ef4805f..03defad796 100644 --- a/packages/services/namespace/src/utils.rs +++ b/packages/services/namespace/src/utils.rs @@ -1,9 +1,9 @@ -use rivet_types::namespaces::RunnerConfig; +use rivet_types::{ + keys::namespace::runner_config::RunnerConfigVariant, runner_configs::RunnerConfig, +}; -use crate::keys; - -pub fn runner_config_variant(runner_config: &RunnerConfig) -> keys::RunnerConfigVariant { +pub fn runner_config_variant(runner_config: &RunnerConfig) -> RunnerConfigVariant { match runner_config { - RunnerConfig::Serverless { .. } => keys::RunnerConfigVariant::Serverless, + RunnerConfig::Serverless { .. } => RunnerConfigVariant::Serverless, } } diff --git a/packages/services/pegboard/src/ops/runner/find_dc_with_runner.rs b/packages/services/pegboard/src/ops/runner/find_dc_with_runner.rs index bd7cb41957..afc77c50b7 100644 --- a/packages/services/pegboard/src/ops/runner/find_dc_with_runner.rs +++ b/packages/services/pegboard/src/ops/runner/find_dc_with_runner.rs @@ -1,11 +1,11 @@ use std::time::Duration; use anyhow::Result; -use futures_util::{StreamExt, TryFutureExt, stream::FuturesUnordered}; +use futures_util::{FutureExt, StreamExt, TryFutureExt, stream::FuturesUnordered}; use gas::prelude::*; -use rivet_api_types::runners::list as runners_list; +use rivet_api_types::{runner_configs::list as runner_configs_list, runners::list as runners_list}; use rivet_api_util::{HeaderMap, Method, request_remote_datacenter}; -use rivet_types::namespaces::RunnerConfig; +use rivet_types::runner_configs::RunnerConfig; use serde::de::DeserializeOwned; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -67,7 +67,7 @@ async fn find_dc_with_runner_inner(ctx: &OperationCtx, input: &Input) -> Result< // Check if serverless runner config exists let res = ctx - .op(namespace::ops::runner_config::get_global::Input { + .op(namespace::ops::runner_config::get::Input { runners: vec![(input.namespace_id, input.runner_name.clone())], }) .await?; @@ -92,14 +92,14 @@ async fn find_dc_with_runner_inner(ctx: &OperationCtx, input: &Input) -> Result< .next() .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; - // Fanout to all datacenters - let runners = + // Fanout two requests to all datacenters: runner list, and runner config list (with specific name) + let runners_fut = race_request_to_datacenters::( ctx, Default::default(), "/runners", runners_list::ListQuery { - namespace: namespace.name, + namespace: namespace.name.clone(), name: Some(input.runner_name.clone()), runner_ids: None, include_stopped: Some(false), @@ -108,9 +108,34 @@ async fn find_dc_with_runner_inner(ctx: &OperationCtx, input: &Input) -> Result< }, |res| !res.runners.is_empty(), ) - .await?; + .map(|res| res.map(|x| x.map(|x| x.0))) + .boxed(); + + let runner_configs_fut = race_request_to_datacenters::< + runner_configs_list::ListQuery, + runner_configs_list::ListResponse, + _, + >( + ctx, + Default::default(), + "/runner-configs", + runner_configs_list::ListQuery { + namespace: namespace.name.clone(), + variant: None, + runner_names: Some(input.runner_name.clone()), + limit: Some(1), + cursor: None, + }, + |res| !res.runner_configs.is_empty(), + ) + .map(|res| res.map(|x| x.map(|x| x.0))) + .boxed(); + + let mut futs = [runners_fut, runner_configs_fut] + .into_iter() + .collect::>(); - Ok(runners.map(|x| x.0)) + Ok(futs.next().await.transpose()?.flatten()) } const REQUEST_TIMEOUT: Duration = Duration::from_secs(5); @@ -131,33 +156,31 @@ where { // Create futures for all dcs except the current let dcs = &ctx.config().topology().datacenters; - let mut responses = futures_util::stream::iter( - dcs.iter() - .filter(|dc| dc.datacenter_label != ctx.config().dc_label()) - .map(|dc| { - let headers = headers.clone(); - let query = query.clone(); - async move { - tokio::time::timeout( - REQUEST_TIMEOUT, - // Remote datacenter - request_remote_datacenter::( - ctx.config(), - dc.datacenter_label, - &endpoint, - Method::GET, - headers, - Some(&query), - Option::<&()>::None, - ) - .map_ok(|x| (dc.datacenter_label, x)), + let mut responses = dcs + .iter() + .filter(|dc| dc.datacenter_label != ctx.config().dc_label()) + .map(|dc| { + let headers = headers.clone(); + let query = query.clone(); + async move { + tokio::time::timeout( + REQUEST_TIMEOUT, + // Remote datacenter + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &endpoint, + Method::GET, + headers, + Some(&query), + Option::<&()>::None, ) - .await - } - }), - ) - .collect::>() - .await; + .map_ok(|x| (dc.datacenter_label, x)), + ) + .await + } + }) + .collect::>(); // Collect responses until we reach quorum or all futures complete while let Some(out) = responses.next().await { diff --git a/packages/services/pegboard/src/workflows/actor/runtime.rs b/packages/services/pegboard/src/workflows/actor/runtime.rs index 0d5ff45ce3..ce608db648 100644 --- a/packages/services/pegboard/src/workflows/actor/runtime.rs +++ b/packages/services/pegboard/src/workflows/actor/runtime.rs @@ -5,6 +5,7 @@ use futures_util::{FutureExt, TryStreamExt}; use gas::prelude::*; use rivet_metrics::KeyValue; use rivet_runner_protocol as protocol; +use rivet_types::keys::namespace::runner_config::RunnerConfigVariant; use std::time::Instant; use universaldb::options::{ConflictRangeType, MutationType, StreamingMode}; use universaldb::utils::{FormalKey, IsolationLevel::*}; @@ -114,9 +115,9 @@ async fn allocate_actor( let for_serverless = tx .with_subspace(namespace::keys::subspace()) .exists( - &namespace::keys::RunnerConfigByVariantKey::new( + &namespace::keys::runner_config::ByVariantKey::new( namespace_id, - namespace::keys::RunnerConfigVariant::Serverless, + RunnerConfigVariant::Serverless, input.runner_name_selector.clone(), ), Serializable, diff --git a/scripts/api/add-serverless.ts b/scripts/api/add-serverless.ts index ff384bd8db..38f16400dd 100755 --- a/scripts/api/add-serverless.ts +++ b/scripts/api/add-serverless.ts @@ -37,15 +37,17 @@ const response = await fetch( "Content-Type": "application/json", }, body: JSON.stringify({ - serverless: { - url: serverlessUrl, - headers: {}, - runners_margin: 1, - min_runners: 1, - max_runners: 3, - slots_per_runner: 100, - request_lifespan: 15 * 60, - }, + default: { + serverless: { + url: serverlessUrl, + headers: {}, + runners_margin: 1, + min_runners: 1, + max_runners: 3, + slots_per_runner: 100, + request_lifespan: 15 * 60, + }, + } }), }, );