Skip to content

Commit e6be189

Browse files
committed
chore(core): update default region name to default
1 parent f5e24c8 commit e6be189

File tree

10 files changed

+250
-16
lines changed

10 files changed

+250
-16
lines changed

out/errors/guard.invalid_regional_host.json

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

out/errors/guard.must_use_regional_host.json

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/common/config/src/config/topology.rs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,19 +48,20 @@ impl Default for Topology {
4848
Topology {
4949
datacenter_label: 1,
5050
datacenters: vec![Datacenter {
51-
name: "local".into(),
51+
name: "default".into(),
5252
datacenter_label: 1,
5353
is_leader: true,
54-
api_peer_url: Url::parse(&format!(
54+
public_url: Url::parse(&format!(
5555
"http://127.0.0.1:{}",
56-
crate::defaults::ports::API_PEER
56+
crate::defaults::ports::GUARD
5757
))
5858
.unwrap(),
59-
guard_url: Url::parse(&format!(
59+
api_peer_url: Url::parse(&format!(
6060
"http://127.0.0.1:{}",
61-
crate::defaults::ports::GUARD
61+
crate::defaults::ports::API_PEER
6262
))
6363
.unwrap(),
64+
valid_hosts: None,
6465
}],
6566
}
6667
}
@@ -72,8 +73,28 @@ pub struct Datacenter {
7273
pub name: String,
7374
pub datacenter_label: u16,
7475
pub is_leader: bool,
75-
/// Url of the api-peer service
76+
/// Public origin that can be used to connect to this region.
77+
pub public_url: Url,
78+
/// URL of the api-peer service
7679
pub api_peer_url: Url,
77-
/// Url of the peer's guard server
78-
pub guard_url: Url,
80+
/// List of hosts that are valid to connect to this region with. This is used in regional
81+
/// endpoints to validate that incoming requests to this datacenter are going to a
82+
/// region-specific domain.
83+
///
84+
/// IMPORTANT: Do not use a global origin that routes to multiple different regions. This will
85+
/// cause unpredictable behavior when requests are expected to go to a specific region.
86+
#[serde(default)]
87+
pub valid_hosts: Option<Vec<String>>,
88+
}
89+
90+
impl Datacenter {
91+
pub fn is_valid_regional_host(&self, host: &str) -> bool {
92+
if let Some(valid_hosts) = &self.valid_hosts {
93+
// Check if host is in the valid_hosts list
94+
valid_hosts.iter().any(|valid_host| valid_host == host)
95+
} else {
96+
// Check if host matches the origin of public_url
97+
self.public_url.host_str() == Some(host)
98+
}
99+
}
79100
}

packages/common/test-deps/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,9 @@ impl TestDeps {
4848
name: format!("dc-{dc_id}"),
4949
datacenter_label: dc_id,
5050
is_leader: dc_id == dc_ids[0], // First DC in list is leader
51+
public_url: Url::parse(&format!("http://127.0.0.1:{guard_port}"))?,
5152
api_peer_url: Url::parse(&format!("http://127.0.0.1:{api_peer_port}"))?,
52-
guard_url: Url::parse(&format!("http://127.0.0.1:{guard_port}"))?,
53+
valid_hosts: None,
5354
});
5455
ports.push((api_peer_port, guard_port));
5556
}

packages/core/guard/server/src/errors.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,16 @@ pub struct ActorDestroyed {
6969
pub struct ActorReadyTimeout {
7070
pub actor_id: Id,
7171
}
72+
73+
#[derive(RivetError, Serialize)]
74+
#[error(
75+
"guard",
76+
"must_use_regional_host",
77+
"Request must use a regional URL for this datacenter.",
78+
"Invalid host {host} for datacenter {datacenter}. Please use one of the following hosts: {valid_hosts}"
79+
)]
80+
pub struct MustUseRegionalHost {
81+
pub host: String,
82+
pub datacenter: String,
83+
pub valid_hosts: String,
84+
}

packages/core/guard/server/src/routing/pegboard_gateway.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,14 @@ pub async fn route_request(
7777
targets: vec![RouteTarget {
7878
actor_id: Some(actor_id),
7979
host: peer_dc
80-
.guard_url
80+
.public_url
8181
.host()
82-
.context("peer dc guard_url has no host")?
82+
.context("peer dc public_url has no host")?
8383
.to_string(),
8484
port: peer_dc
85-
.guard_url
85+
.public_url
8686
.port()
87-
.context("peer dc guard_url has no port")?,
87+
.context("peer dc public_url has no port")?,
8888
path: path.to_owned(),
8989
}],
9090
timeout: RoutingTimeout {

packages/core/guard/server/src/routing/runner.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,38 @@ pub(crate) const WS_PROTOCOL_TOKEN: &str = "rivet_token.";
1111
pub async fn route_request(
1212
ctx: &StandaloneCtx,
1313
target: &str,
14-
_host: &str,
14+
host: &str,
1515
_path: &str,
1616
headers: &hyper::HeaderMap,
1717
) -> Result<Option<RoutingOutput>> {
1818
if target != "runner" {
1919
return Ok(None);
2020
}
2121

22+
// Validate that the host is valid for the current datacenter
23+
let current_dc = ctx.config().topology().current_dc()?;
24+
if !current_dc.is_valid_regional_host(host) {
25+
tracing::warn!(?host, datacenter = ?current_dc.name, "invalid host for current datacenter");
26+
27+
// Determine valid hosts for error message
28+
let valid_hosts = if let Some(hosts) = &current_dc.valid_hosts {
29+
hosts.join(", ")
30+
} else {
31+
current_dc
32+
.public_url
33+
.host_str()
34+
.map(|h| h.to_string())
35+
.unwrap_or_else(|| "unknown".to_string())
36+
};
37+
38+
return Err(crate::errors::MustUseRegionalHost {
39+
host: host.to_string(),
40+
datacenter: current_dc.name.clone(),
41+
valid_hosts,
42+
}
43+
.build());
44+
}
45+
2246
let is_websocket = headers
2347
.get("upgrade")
2448
.and_then(|v| v.to_str().ok())

packages/services/epoxy/src/workflows/coordinator/reconfigure.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ pub async fn check_config_changes(
8787
replica_id: dc.datacenter_label as u64,
8888
status: status.into(),
8989
api_peer_url: dc.api_peer_url.to_string(),
90-
guard_url: dc.guard_url.to_string(),
90+
guard_url: dc.public_url.to_string(),
9191
}
9292
})
9393
.collect::<Vec<types::ReplicaConfig>>();
@@ -277,7 +277,7 @@ fn should_abort_reconfigure(
277277
return Ok(true);
278278
}
279279

280-
if url::Url::parse(&replica.guard_url)? != current_dc.guard_url {
280+
if url::Url::parse(&replica.guard_url)? != current_dc.public_url {
281281
tracing::info!(
282282
"config changed during reconfigure (guard_url changed), aborting reconfigure"
283283
);
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
use std::time::Duration;
2+
3+
use anyhow::Result;
4+
use futures_util::{StreamExt, TryFutureExt, stream::FuturesUnordered};
5+
use gas::prelude::*;
6+
use rivet_api_types::runners::list as runners_list;
7+
use rivet_api_util::{HeaderMap, Method, request_remote_datacenter};
8+
use serde::de::DeserializeOwned;
9+
10+
#[derive(Debug, Clone, Serialize, Deserialize)]
11+
pub struct Input {
12+
pub namespace_id: Id,
13+
pub runner_name: String,
14+
}
15+
16+
#[derive(Debug, Clone, Serialize, Deserialize)]
17+
pub struct Output {
18+
pub dc_label: Option<u16>,
19+
}
20+
21+
/// Finds a datacenter that contains a given runner name in a namespace.
22+
///
23+
/// This is core to determining which datacenter actors should run in, since actors can only run in
24+
/// datacenters with supported runners.
25+
#[operation]
26+
pub async fn find_dc_with_runner(ctx: &OperationCtx, input: &Input) -> Result<Output> {
27+
// TODO: We should figure out how to pre-emptively validate this cache so we don't have
28+
// "stutters" where every 15s we have a high request duration
29+
ctx.cache()
30+
.clone()
31+
.request()
32+
.ttl(15_000)
33+
.fetch_one_json(
34+
"runner.find_dc_with_runner",
35+
(input.namespace_id, input.runner_name.clone()),
36+
{
37+
move |mut cache, key| async move {
38+
let dc_id = find_dc_with_runner_inner(ctx, input).await?;
39+
40+
if let Some(dc_id) = dc_id {
41+
cache.resolve(&key, dc_id);
42+
}
43+
44+
Ok(cache)
45+
}
46+
},
47+
)
48+
.await
49+
.map(|dc_label| Output { dc_label })
50+
}
51+
52+
async fn find_dc_with_runner_inner(ctx: &OperationCtx, input: &Input) -> Result<Option<u16>> {
53+
// Check if this DC has any runners
54+
let res = ctx
55+
.op(super::list_for_ns::Input {
56+
namespace_id: input.namespace_id,
57+
name: Some(input.runner_name.clone()),
58+
include_stopped: false,
59+
created_before: None,
60+
limit: 1,
61+
})
62+
.await?;
63+
if !res.runners.is_empty() {
64+
return Ok(Some(ctx.config().dc_label()));
65+
}
66+
67+
// Get namespace
68+
let namespace = ctx
69+
.op(namespace::ops::get_global::Input {
70+
namespace_ids: vec![input.namespace_id],
71+
})
72+
.await?
73+
.into_iter()
74+
.next()
75+
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;
76+
77+
// Fanout to all datacenters
78+
let runners =
79+
race_request_to_datacenters::<runners_list::ListQuery, runners_list::ListResponse, _>(
80+
ctx,
81+
Default::default(),
82+
"/runners",
83+
runners_list::ListQuery {
84+
namespace: namespace.name,
85+
name: Some(input.runner_name.clone()),
86+
runner_ids: None,
87+
include_stopped: Some(false),
88+
limit: Some(1),
89+
cursor: None,
90+
},
91+
|res| !res.runners.is_empty(),
92+
)
93+
.await?;
94+
95+
Ok(runners.map(|x| x.0))
96+
}
97+
98+
const REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
99+
100+
/// Helper fn that will send a request to all datacenters and return the response of the first
101+
/// datacenter that matches `filter`.
102+
pub async fn race_request_to_datacenters<Q, R, F>(
103+
ctx: &OperationCtx,
104+
headers: HeaderMap,
105+
endpoint: &str,
106+
query: Q,
107+
filter: F,
108+
) -> Result<Option<(u16, R)>>
109+
where
110+
R: DeserializeOwned + Send + 'static,
111+
Q: Serialize + Clone + Send + 'static,
112+
F: Fn(&R) -> bool,
113+
{
114+
// Create futures for all dcs except the current
115+
let dcs = &ctx.config().topology().datacenters;
116+
let mut responses = futures_util::stream::iter(
117+
dcs.iter()
118+
.filter(|dc| dc.datacenter_label != ctx.config().dc_label())
119+
.map(|dc| {
120+
let headers = headers.clone();
121+
let query = query.clone();
122+
async move {
123+
tokio::time::timeout(
124+
REQUEST_TIMEOUT,
125+
// Remote datacenter
126+
request_remote_datacenter::<R>(
127+
ctx.config(),
128+
dc.datacenter_label,
129+
&endpoint,
130+
Method::GET,
131+
headers,
132+
Some(&query),
133+
Option::<&()>::None,
134+
)
135+
.map_ok(|x| (dc.datacenter_label, x)),
136+
)
137+
.await
138+
}
139+
}),
140+
)
141+
.collect::<FuturesUnordered<_>>()
142+
.await;
143+
144+
// Collect responses until we reach quorum or all futures complete
145+
while let Some(out) = responses.next().await {
146+
match out {
147+
std::result::Result::Ok(result) => match result {
148+
std::result::Result::Ok((dc_label, response)) => {
149+
if filter(&response) {
150+
return Ok(Some((dc_label, response)));
151+
}
152+
}
153+
std::result::Result::Err(err) => {
154+
tracing::warn!(?err, "received error from replica");
155+
}
156+
},
157+
std::result::Result::Err(err) => {
158+
tracing::warn!(?err, "received timeout from replica");
159+
}
160+
}
161+
}
162+
163+
Ok(None)
164+
}

packages/services/pegboard/src/ops/runner/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod find_dc_with_runner;
12
pub mod get;
23
pub mod get_by_key;
34
pub mod list_for_ns;

0 commit comments

Comments
 (0)