diff --git a/out/errors/actor.no_runners_available.json b/out/errors/actor.no_runners_available.json new file mode 100644 index 0000000000..b187a48345 --- /dev/null +++ b/out/errors/actor.no_runners_available.json @@ -0,0 +1,5 @@ +{ + "code": "no_runners_available", + "group": "actor", + "message": "No runners are available in any datacenter. Validate the runner is listed in the Connect tab and that the runner's name matches the requested runner name." +} \ No newline at end of file diff --git a/out/errors/guard.invalid_regional_host.json b/out/errors/guard.invalid_regional_host.json new file mode 100644 index 0000000000..339d076f19 --- /dev/null +++ b/out/errors/guard.invalid_regional_host.json @@ -0,0 +1,5 @@ +{ + "code": "invalid_regional_host", + "group": "guard", + "message": "Request must use a regional URL for this datacenter." +} \ No newline at end of file diff --git a/out/errors/guard.must_use_regional_host.json b/out/errors/guard.must_use_regional_host.json new file mode 100644 index 0000000000..79c579751a --- /dev/null +++ b/out/errors/guard.must_use_regional_host.json @@ -0,0 +1,5 @@ +{ + "code": "must_use_regional_host", + "group": "guard", + "message": "Request must use a regional URL for this datacenter." +} \ No newline at end of file diff --git a/out/openapi.json b/out/openapi.json index e56645a2ad..99c579754a 100644 --- a/out/openapi.json +++ b/out/openapi.json @@ -939,17 +939,21 @@ "Datacenter": { "type": "object", "required": [ - "datacenter_label", - "name" + "label", + "name", + "url" ], "properties": { - "datacenter_label": { + "label": { "type": "integer", "format": "int32", "minimum": 0 }, "name": { "type": "string" + }, + "url": { + "type": "string" } }, "additionalProperties": false diff --git a/packages/common/config/src/config/topology.rs b/packages/common/config/src/config/topology.rs index 0c36d2092e..89605fa1f5 100644 --- a/packages/common/config/src/config/topology.rs +++ b/packages/common/config/src/config/topology.rs @@ -48,19 +48,20 @@ impl Default for Topology { Topology { datacenter_label: 1, datacenters: vec![Datacenter { - name: "local".into(), + name: "default".into(), datacenter_label: 1, is_leader: true, - api_peer_url: Url::parse(&format!( + public_url: Url::parse(&format!( "http://127.0.0.1:{}", - crate::defaults::ports::API_PEER + crate::defaults::ports::GUARD )) .unwrap(), - guard_url: Url::parse(&format!( + api_peer_url: Url::parse(&format!( "http://127.0.0.1:{}", - crate::defaults::ports::GUARD + crate::defaults::ports::API_PEER )) .unwrap(), + valid_hosts: None, }], } } @@ -72,8 +73,28 @@ pub struct Datacenter { pub name: String, pub datacenter_label: u16, pub is_leader: bool, - /// Url of the api-peer service + /// Public origin that can be used to connect to this region. + pub public_url: Url, + /// URL of the api-peer service pub api_peer_url: Url, - /// Url of the peer's guard server - pub guard_url: Url, + /// List of hosts that are valid to connect to this region with. This is used in regional + /// endpoints to validate that incoming requests to this datacenter are going to a + /// region-specific domain. + /// + /// IMPORTANT: Do not use a global origin that routes to multiple different regions. This will + /// cause unpredictable behavior when requests are expected to go to a specific region. + #[serde(default)] + pub valid_hosts: Option>, +} + +impl Datacenter { + pub fn is_valid_regional_host(&self, host: &str) -> bool { + if let Some(valid_hosts) = &self.valid_hosts { + // Check if host is in the valid_hosts list + valid_hosts.iter().any(|valid_host| valid_host == host) + } else { + // Check if host matches the origin of public_url + self.public_url.host_str() == Some(host) + } + } } diff --git a/packages/common/test-deps/src/lib.rs b/packages/common/test-deps/src/lib.rs index 1dc9d69efe..db84ee673b 100644 --- a/packages/common/test-deps/src/lib.rs +++ b/packages/common/test-deps/src/lib.rs @@ -48,8 +48,9 @@ impl TestDeps { name: format!("dc-{dc_id}"), datacenter_label: dc_id, is_leader: dc_id == dc_ids[0], // First DC in list is leader + public_url: Url::parse(&format!("http://127.0.0.1:{guard_port}"))?, api_peer_url: Url::parse(&format!("http://127.0.0.1:{api_peer_port}"))?, - guard_url: Url::parse(&format!("http://127.0.0.1:{guard_port}"))?, + valid_hosts: None, }); ports.push((api_peer_port, guard_port)); } diff --git a/packages/common/types/src/datacenters.rs b/packages/common/types/src/datacenters.rs index 4db7b95e65..fff1b27355 100644 --- a/packages/common/types/src/datacenters.rs +++ b/packages/common/types/src/datacenters.rs @@ -4,6 +4,7 @@ use utoipa::ToSchema; #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] #[serde(deny_unknown_fields)] pub struct Datacenter { - pub datacenter_label: u16, + pub label: u16, pub name: String, + pub url: String, } diff --git a/packages/core/api-public/src/actors/create.rs b/packages/core/api-public/src/actors/create.rs index 77f313a09e..46b1abaad1 100644 --- a/packages/core/api-public/src/actors/create.rs +++ b/packages/core/api-public/src/actors/create.rs @@ -68,15 +68,21 @@ async fn create_inner( ) -> Result { ctx.skip_auth(); - // Determine which datacenter to create the actor in - let target_dc_label = if let Some(dc_name) = &query.datacenter { - ctx.config() - .dc_for_name(dc_name) - .ok_or_else(|| crate::errors::Datacenter::NotFound.build())? - .datacenter_label - } else { - ctx.config().dc_label() - }; + let namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: query.namespace.clone(), + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + let target_dc_label = super::utils::find_dc_for_actor_creation( + &ctx, + namespace.namespace_id, + &query.namespace, + &body.runner_name_selector, + query.datacenter.as_ref().map(String::as_str), + ) + .await?; let query = rivet_api_types::actors::create::CreateQuery { namespace: query.namespace, diff --git a/packages/core/api-public/src/actors/get_or_create.rs b/packages/core/api-public/src/actors/get_or_create.rs index 88faa1b4a7..379eb5ac1d 100644 --- a/packages/core/api-public/src/actors/get_or_create.rs +++ b/packages/core/api-public/src/actors/get_or_create.rs @@ -14,7 +14,6 @@ use utoipa::{IntoParams, ToSchema}; use crate::actors::utils; use crate::ctx::ApiCtx; -use crate::errors; #[derive(Debug, Deserialize, IntoParams)] #[serde(deny_unknown_fields)] @@ -123,15 +122,14 @@ async fn get_or_create_inner( } // Actor doesn't exist for any key, create it - // Determine which datacenter to create the actor in - let target_dc_label = if let Some(dc_name) = &query.datacenter { - ctx.config() - .dc_for_name(dc_name) - .ok_or_else(|| errors::Datacenter::NotFound.build())? - .datacenter_label - } else { - ctx.config().dc_label() - }; + let target_dc_label = super::utils::find_dc_for_actor_creation( + &ctx, + namespace.namespace_id, + &query.namespace, + &body.runner_name_selector, + query.datacenter.as_ref().map(String::as_str), + ) + .await?; let actor_id = Id::new_v1(target_dc_label); diff --git a/packages/core/api-public/src/actors/utils.rs b/packages/core/api-public/src/actors/utils.rs index b2fcca8ec3..f62b537af0 100644 --- a/packages/core/api-public/src/actors/utils.rs +++ b/packages/core/api-public/src/actors/utils.rs @@ -184,3 +184,39 @@ pub fn extract_duplicate_key_error(err: &anyhow::Error) -> Option { None } + +/// Determine the datacenter label to create the actor in. +pub async fn find_dc_for_actor_creation( + ctx: &ApiCtx, + namespace_id: Id, + namespace_name: &str, + runner_name: &str, + dc_name: Option<&str>, +) -> Result { + let target_dc_label = if let Some(dc_name) = &dc_name { + // Use user-configured DC + ctx.config() + .dc_for_name(dc_name) + .ok_or_else(|| crate::errors::Datacenter::NotFound.build())? + .datacenter_label + } else { + // Find the nearest DC with runners + let res = ctx + .op(pegboard::ops::runner::find_dc_with_runner::Input { + namespace_id, + runner_name: runner_name.into(), + }) + .await?; + if let Some(dc_label) = res.dc_label { + dc_label + } else { + return Err(pegboard::errors::Actor::NoRunnersAvailable { + namespace: namespace_name.into(), + runner_name: runner_name.into(), + } + .build()); + } + }; + + Ok(target_dc_label) +} diff --git a/packages/core/api-public/src/datacenters.rs b/packages/core/api-public/src/datacenters.rs index 00c578cb74..be5e07f750 100644 --- a/packages/core/api-public/src/datacenters.rs +++ b/packages/core/api-public/src/datacenters.rs @@ -32,8 +32,9 @@ async fn list_inner(ctx: ApiCtx) -> Result { .datacenters .iter() .map(|dc| Datacenter { - datacenter_label: dc.datacenter_label, + label: dc.datacenter_label, name: dc.name.clone(), + url: dc.public_url.to_string(), }) .collect(), pagination: Pagination { cursor: None }, diff --git a/packages/core/guard/server/src/errors.rs b/packages/core/guard/server/src/errors.rs index 87af34af4a..633467c5d7 100644 --- a/packages/core/guard/server/src/errors.rs +++ b/packages/core/guard/server/src/errors.rs @@ -69,3 +69,16 @@ pub struct ActorDestroyed { pub struct ActorReadyTimeout { pub actor_id: Id, } + +#[derive(RivetError, Serialize)] +#[error( + "guard", + "must_use_regional_host", + "Request must use a regional URL for this datacenter.", + "Invalid host {host} for datacenter {datacenter}. Please use one of the following hosts: {valid_hosts}" +)] +pub struct MustUseRegionalHost { + pub host: String, + pub datacenter: String, + pub valid_hosts: String, +} diff --git a/packages/core/guard/server/src/routing/pegboard_gateway.rs b/packages/core/guard/server/src/routing/pegboard_gateway.rs index a79e63619a..4a23425160 100644 --- a/packages/core/guard/server/src/routing/pegboard_gateway.rs +++ b/packages/core/guard/server/src/routing/pegboard_gateway.rs @@ -77,14 +77,14 @@ pub async fn route_request( targets: vec![RouteTarget { actor_id: Some(actor_id), host: peer_dc - .guard_url + .public_url .host() - .context("peer dc guard_url has no host")? + .context("peer dc public_url has no host")? .to_string(), port: peer_dc - .guard_url + .public_url .port() - .context("peer dc guard_url has no port")?, + .context("peer dc public_url has no port")?, path: path.to_owned(), }], timeout: RoutingTimeout { diff --git a/packages/core/guard/server/src/routing/runner.rs b/packages/core/guard/server/src/routing/runner.rs index 64db4a1ffa..34ce2a6449 100644 --- a/packages/core/guard/server/src/routing/runner.rs +++ b/packages/core/guard/server/src/routing/runner.rs @@ -11,7 +11,7 @@ pub(crate) const WS_PROTOCOL_TOKEN: &str = "rivet_token."; pub async fn route_request( ctx: &StandaloneCtx, target: &str, - _host: &str, + host: &str, _path: &str, headers: &hyper::HeaderMap, ) -> Result> { @@ -19,6 +19,30 @@ pub async fn route_request( return Ok(None); } + // Validate that the host is valid for the current datacenter + let current_dc = ctx.config().topology().current_dc()?; + if !current_dc.is_valid_regional_host(host) { + tracing::warn!(?host, datacenter = ?current_dc.name, "invalid host for current datacenter"); + + // Determine valid hosts for error message + let valid_hosts = if let Some(hosts) = ¤t_dc.valid_hosts { + hosts.join(", ") + } else { + current_dc + .public_url + .host_str() + .map(|h| h.to_string()) + .unwrap_or_else(|| "unknown".to_string()) + }; + + return Err(crate::errors::MustUseRegionalHost { + host: host.to_string(), + datacenter: current_dc.name.clone(), + valid_hosts, + } + .build()); + } + let is_websocket = headers .get("upgrade") .and_then(|v| v.to_str().ok()) diff --git a/packages/core/pegboard-serverless/src/lib.rs b/packages/core/pegboard-serverless/src/lib.rs index 4e6ef41c05..a1a57d8cb5 100644 --- a/packages/core/pegboard-serverless/src/lib.rs +++ b/packages/core/pegboard-serverless/src/lib.rs @@ -21,6 +21,7 @@ use universaldb::options::StreamingMode; use universaldb::utils::IsolationLevel::*; use vbare::OwnedVersionedData; +const X_RIVET_ENDPOINT: HeaderName = HeaderName::from_static("x-rivet-endpoint"); const X_RIVET_TOKEN: HeaderName = HeaderName::from_static("x-rivet-token"); const X_RIVET_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivet-total-slots"); const X_RIVET_RUNNER_NAME: HeaderName = HeaderName::from_static("x-rivet-runner-name"); @@ -266,6 +267,8 @@ async fn outbound_handler( shutdown_rx: oneshot::Receiver<()>, draining: Arc, ) -> Result<()> { + let current_dc = ctx.config().topology().current_dc()?; + let client = rivet_pools::reqwest::client_no_timeout().await?; let headers = headers .into_iter() @@ -276,6 +279,10 @@ async fn outbound_handler( v.parse::().ok()?, )) }) + .chain(std::iter::once(( + X_RIVET_ENDPOINT, + HeaderValue::try_from(current_dc.public_url.to_string())?, + ))) .chain(std::iter::once(( X_RIVET_TOTAL_SLOTS, HeaderValue::try_from(slots_per_runner)?, diff --git a/packages/services/epoxy/src/workflows/coordinator/reconfigure.rs b/packages/services/epoxy/src/workflows/coordinator/reconfigure.rs index 0adbe0fd6c..1e738cdc47 100644 --- a/packages/services/epoxy/src/workflows/coordinator/reconfigure.rs +++ b/packages/services/epoxy/src/workflows/coordinator/reconfigure.rs @@ -87,7 +87,7 @@ pub async fn check_config_changes( replica_id: dc.datacenter_label as u64, status: status.into(), api_peer_url: dc.api_peer_url.to_string(), - guard_url: dc.guard_url.to_string(), + guard_url: dc.public_url.to_string(), } }) .collect::>(); @@ -277,7 +277,7 @@ fn should_abort_reconfigure( return Ok(true); } - if url::Url::parse(&replica.guard_url)? != current_dc.guard_url { + if url::Url::parse(&replica.guard_url)? != current_dc.public_url { tracing::info!( "config changed during reconfigure (guard_url changed), aborting reconfigure" ); diff --git a/packages/services/pegboard/src/errors.rs b/packages/services/pegboard/src/errors.rs index e564f98e05..dbd3173e63 100644 --- a/packages/services/pegboard/src/errors.rs +++ b/packages/services/pegboard/src/errors.rs @@ -53,6 +53,16 @@ pub enum Actor { "Actor key is already reserved in the datacenter '{datacenter_label}'. Either remove the datacenter constraint to automatically create this actor in the correct datacenter or provide the datacenter that matches." )] KeyReservedInDifferentDatacenter { datacenter_label: u16 }, + + #[error( + "no_runners_available", + "No runners are available in any datacenter. Validate the runner is listed in the Connect tab and that the runner's name matches the requested runner name.", + "No runners with name '{runner_name}' are available in any datacenter for the namespace '{namespace}'. Validate the runner is listed in the Connect tab and that the runner's name matches the requested runner name." + )] + NoRunnersAvailable { + namespace: String, + runner_name: String, + }, } #[derive(RivetError, Debug, Clone, Deserialize, Serialize)] diff --git a/packages/services/pegboard/src/ops/runner/find_dc_with_runner.rs b/packages/services/pegboard/src/ops/runner/find_dc_with_runner.rs new file mode 100644 index 0000000000..5a583c3e2e --- /dev/null +++ b/packages/services/pegboard/src/ops/runner/find_dc_with_runner.rs @@ -0,0 +1,164 @@ +use std::time::Duration; + +use anyhow::Result; +use futures_util::{StreamExt, TryFutureExt, stream::FuturesUnordered}; +use gas::prelude::*; +use rivet_api_types::runners::list as runners_list; +use rivet_api_util::{HeaderMap, Method, request_remote_datacenter}; +use serde::de::DeserializeOwned; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Input { + pub namespace_id: Id, + pub runner_name: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Output { + pub dc_label: Option, +} + +/// Finds a datacenter that contains a given runner name in a namespace. +/// +/// This is core to determining which datacenter actors should run in, since actors can only run in +/// datacenters with supported runners. +#[operation] +pub async fn find_dc_with_runner(ctx: &OperationCtx, input: &Input) -> Result { + // TODO: We should figure out how to pre-emptively validate this cache so we don't have + // "stutters" where every 15s we have a high request duration + ctx.cache() + .clone() + .request() + .ttl(15_000) + .fetch_one_json( + "runner.find_dc_with_runner", + (input.namespace_id, input.runner_name.clone()), + { + move |mut cache, key| async move { + let dc_id = find_dc_with_runner_inner(ctx, input).await?; + + if let Some(dc_id) = dc_id { + cache.resolve(&key, dc_id); + } + + Ok(cache) + } + }, + ) + .await + .map(|dc_label| Output { dc_label }) +} + +async fn find_dc_with_runner_inner(ctx: &OperationCtx, input: &Input) -> Result> { + // Check if this DC has any runners + let res = ctx + .op(super::list_for_ns::Input { + namespace_id: input.namespace_id, + name: Some(input.runner_name.clone()), + include_stopped: false, + created_before: None, + limit: 1, + }) + .await?; + if !res.runners.is_empty() { + return Ok(Some(ctx.config().dc_label())); + } + + // Get namespace + let namespace = ctx + .op(namespace::ops::get_global::Input { + namespace_ids: vec![input.namespace_id], + }) + .await? + .into_iter() + .next() + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + // Fanout to all datacenters + let runners = + race_request_to_datacenters::( + ctx, + Default::default(), + "/runners", + runners_list::ListQuery { + namespace: namespace.name, + name: Some(input.runner_name.clone()), + runner_ids: None, + include_stopped: Some(false), + limit: Some(1), + cursor: None, + }, + |res| !res.runners.is_empty(), + ) + .await?; + + Ok(runners.map(|x| x.0)) +} + +const REQUEST_TIMEOUT: Duration = Duration::from_secs(5); + +/// Helper fn that will send a request to all datacenters and return the response of the first +/// datacenter that matches `filter`. +pub async fn race_request_to_datacenters( + ctx: &OperationCtx, + headers: HeaderMap, + endpoint: &str, + query: Q, + filter: F, +) -> Result> +where + R: DeserializeOwned + Send + 'static, + Q: Serialize + Clone + Send + 'static, + F: Fn(&R) -> bool, +{ + // Create futures for all dcs except the current + let dcs = &ctx.config().topology().datacenters; + let mut responses = futures_util::stream::iter( + dcs.iter() + .filter(|dc| dc.datacenter_label != ctx.config().dc_label()) + .map(|dc| { + let headers = headers.clone(); + let query = query.clone(); + async move { + tokio::time::timeout( + REQUEST_TIMEOUT, + // Remote datacenter + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &endpoint, + Method::GET, + headers, + Some(&query), + Option::<&()>::None, + ) + .map_ok(|x| (dc.datacenter_label, x)), + ) + .await + } + }), + ) + .collect::>() + .await; + + // Collect responses until we reach quorum or all futures complete + while let Some(out) = responses.next().await { + match out { + std::result::Result::Ok(result) => match result { + std::result::Result::Ok((dc_label, response)) => { + if filter(&response) { + return Ok(Some((dc_label, response))); + } + } + std::result::Result::Err(err) => { + tracing::warn!(?err, "received error from replica"); + } + }, + std::result::Result::Err(err) => { + tracing::warn!(?err, "received timeout from replica"); + } + } + } + + Ok(None) +} diff --git a/packages/services/pegboard/src/ops/runner/mod.rs b/packages/services/pegboard/src/ops/runner/mod.rs index 1885c60f23..601f9bd3ee 100644 --- a/packages/services/pegboard/src/ops/runner/mod.rs +++ b/packages/services/pegboard/src/ops/runner/mod.rs @@ -1,3 +1,4 @@ +pub mod find_dc_with_runner; pub mod get; pub mod get_by_key; pub mod list_for_ns;