Skip to content

Commit aec7521

Browse files
authored
fix(guard): fix dc proxy reqs (#3216)
1 parent ab93c52 commit aec7521

File tree

11 files changed

+112
-73
lines changed

11 files changed

+112
-73
lines changed

packages/common/cache/build/src/req_config.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,12 +303,20 @@ impl RequestConfig {
303303
let payload = serde_json::to_vec(&message)?;
304304

305305
if let Err(err) = ups
306-
.publish(CACHE_PURGE_TOPIC, &payload, universalpubsub::PublishOpts::broadcast())
306+
.publish(
307+
CACHE_PURGE_TOPIC,
308+
&payload,
309+
universalpubsub::PublishOpts::broadcast(),
310+
)
307311
.await
308312
{
309313
tracing::error!(?err, "failed to publish cache purge message");
310314
} else {
311-
tracing::debug!(base_key, keys_count = cache_keys.len(), "published cache purge message");
315+
tracing::debug!(
316+
base_key,
317+
keys_count = cache_keys.len(),
318+
"published cache purge message"
319+
);
312320
}
313321
}
314322

packages/common/config/src/config/topology.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ impl Default for Topology {
6161
crate::defaults::ports::API_PEER
6262
))
6363
.unwrap(),
64+
proxy_url: None,
6465
valid_hosts: None,
6566
}],
6667
}
@@ -77,6 +78,9 @@ pub struct Datacenter {
7778
pub public_url: Url,
7879
/// URL of the api-peer service
7980
pub peer_url: Url,
81+
/// URL of the guard service that other datacenters can access privately. Goes to the same place as
82+
// public_url.
83+
pub proxy_url: Option<Url>,
8084
/// List of hosts that are valid to connect to this region with. This is used in regional
8185
/// endpoints to validate that incoming requests to this datacenter are going to a
8286
/// region-specific domain.
@@ -98,14 +102,18 @@ impl Datacenter {
98102
}
99103
}
100104

101-
pub fn public_url_host(&self) -> Result<&str> {
102-
self.public_url.host_str().context("no host")
105+
pub fn proxy_url(&self) -> &Url {
106+
self.proxy_url.as_ref().unwrap_or(&self.public_url)
103107
}
104108

105-
pub fn public_url_port(&self) -> Result<u16> {
106-
self.public_url
109+
pub fn proxy_url_host(&self) -> Result<&str> {
110+
self.proxy_url().host_str().context("no host")
111+
}
112+
113+
pub fn proxy_url_port(&self) -> Result<u16> {
114+
self.proxy_url()
107115
.port()
108-
.or_else(|| match self.public_url.scheme() {
116+
.or_else(|| match self.proxy_url().scheme() {
109117
"http" => Some(80),
110118
"https" => Some(443),
111119
_ => None,

packages/common/test-deps/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ impl TestDeps {
5050
is_leader: dc_id == dc_ids[0], // First DC in list is leader
5151
public_url: Url::parse(&format!("http://127.0.0.1:{guard_port}"))?,
5252
peer_url: Url::parse(&format!("http://127.0.0.1:{api_peer_port}"))?,
53+
proxy_url: None,
5354
valid_hosts: None,
5455
});
5556
ports.push((api_peer_port, guard_port));

packages/core/api-public/src/health.rs

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use anyhow::Result;
2-
use axum::{Json, extract::Extension, response::IntoResponse};
1+
use anyhow::{bail, Result};
2+
use axum::{extract::Extension, response::IntoResponse, Json};
33
use futures_util::StreamExt;
44
use rivet_api_builder::ApiError;
55
use serde::{Deserialize, Serialize};
@@ -87,7 +87,7 @@ async fn fanout_inner(ctx: ApiCtx) -> Result<FanoutResponse> {
8787
}
8888
} else {
8989
// Remote datacenter - HTTP request
90-
match send_health_check(&ctx, &dc).await {
90+
match send_health_checks(&ctx, &dc).await {
9191
Ok(response) => DatacenterHealth {
9292
datacenter_label: dc.datacenter_label,
9393
datacenter_name: dc.name.clone(),
@@ -128,30 +128,40 @@ async fn fanout_inner(ctx: ApiCtx) -> Result<FanoutResponse> {
128128
}
129129

130130
#[tracing::instrument(skip_all)]
131-
async fn send_health_check(
131+
async fn send_health_checks(
132132
ctx: &ApiCtx,
133133
dc: &rivet_config::config::topology::Datacenter,
134134
) -> Result<HealthResponse> {
135135
let client = rivet_pools::reqwest::client().await?;
136-
let url = dc.peer_url.join("/health")?;
136+
let peer_url = dc.peer_url.join("/health")?;
137+
let proxy_url = dc.proxy_url().join("/health")?;
137138

138139
tracing::debug!(
139140
?dc.datacenter_label,
140-
?url,
141-
"sending health check to remote datacenter"
141+
?peer_url,
142+
?proxy_url,
143+
"sending health checks to remote datacenter"
142144
);
143145

144-
let res = client
145-
.get(url)
146-
.timeout(std::time::Duration::from_secs(5))
147-
.send()
148-
.await?;
146+
let (peer_res, proxy_res) = tokio::try_join!(
147+
client
148+
.get(peer_url)
149+
.timeout(std::time::Duration::from_secs(5))
150+
.send(),
151+
client
152+
.get(proxy_url)
153+
.timeout(std::time::Duration::from_secs(5))
154+
.send()
155+
)?;
156+
157+
if !peer_res.status().is_success() {
158+
bail!("Peer health check returned status: {}", peer_res.status())
159+
}
149160

150-
if res.status().is_success() {
151-
let response = res.json::<HealthResponse>().await?;
161+
if proxy_res.status().is_success() {
162+
let response = proxy_res.json::<HealthResponse>().await?;
152163
Ok(response)
153164
} else {
154-
anyhow::bail!("Health check returned status: {}", res.status())
165+
bail!("Proxy health check returned status: {}", proxy_res.status())
155166
}
156167
}
157-

packages/core/guard/core/src/proxy_service.rs

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use anyhow::{Result, anyhow, bail};
1+
use anyhow::{Context, Result, bail};
22
use bytes::Bytes;
33
use futures_util::{SinkExt, StreamExt};
44
use http_body_util::{BodyExt, Full};
@@ -943,11 +943,16 @@ impl ProxyService {
943943
}
944944
Err(err) => {
945945
if !err.is_connect() || attempts >= max_attempts {
946-
tracing::error!(?err, "Request error after {} attempts", attempts);
947-
return Err(errors::UpstreamError(
948-
"Failed to connect to runner. Make sure your runners are healthy and do not have any crash logs."
949-
.to_string(),
950-
)
946+
tracing::error!(
947+
?err,
948+
?target,
949+
"Request error after {} attempts",
950+
attempts
951+
);
952+
953+
return Err(errors::UpstreamError(format!(
954+
"Failed to connect to runner: {err}. Make sure your runners are healthy."
955+
))
951956
.build());
952957
} else {
953958
// Request connect error, might retry
@@ -1058,30 +1063,37 @@ impl ProxyService {
10581063
req_parts: &hyper::http::request::Parts,
10591064
target: &RouteTarget,
10601065
) -> Result<hyper::http::request::Builder> {
1061-
// Build the target URI using the url crate to properly handle IPv6 addresses
1062-
let mut url = Url::parse("http://example.com")?;
1066+
let scheme = if target.port == 443 { "https" } else { "http" };
10631067

1064-
// Wrap IPv6 addresses in brackets if not already wrapped
1068+
// Bracket raw IPv6 hosts
10651069
let host = if target.host.contains(':') && !target.host.starts_with('[') {
10661070
format!("[{}]", target.host)
10671071
} else {
10681072
target.host.clone()
10691073
};
10701074

1071-
url.set_host(Some(&host))
1072-
.map_err(|_| anyhow!("Failed to set host: {}", host))?;
1073-
url.set_port(Some(target.port))
1074-
.map_err(|_| anyhow!("Failed to set port"))?;
1075-
url.set_path(&target.path);
1076-
let uri = url.to_string();
1075+
// Ensure path starts with a leading slash
1076+
let path = if target.path.starts_with('/') {
1077+
target.path.clone()
1078+
} else {
1079+
format!("/{}", target.path)
1080+
};
1081+
1082+
let url = Url::parse(&format!("{scheme}://{host}:{}{}", target.port, path))
1083+
.context("invalid scheme/host/port when building URL")?;
10771084

1085+
// Build the proxied request
10781086
let mut builder = hyper::Request::builder()
10791087
.method(req_parts.method.clone())
1080-
.uri(&uri);
1088+
.uri(url.to_string());
10811089

10821090
// Add proxy headers
1083-
let headers = builder.headers_mut().unwrap();
1084-
add_proxy_headers_with_addr(headers, &req_parts.headers, self.remote_addr)?;
1091+
{
1092+
let headers = builder
1093+
.headers_mut()
1094+
.expect("request builder unexpectedly in error state");
1095+
add_proxy_headers_with_addr(headers, &req_parts.headers, self.remote_addr)?;
1096+
}
10851097

10861098
Ok(builder)
10871099
}

packages/core/guard/server/src/cache/actor.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,17 @@ pub fn build_cache_key(target: &str, path: &str, headers: &hyper::HeaderMap) ->
1414
ensure!(target == "actor", "wrong target");
1515

1616
// Find actor to route to
17-
let actor_id_str = headers.get(X_RIVET_ACTOR).ok_or_else(|| {
18-
crate::errors::MissingHeader {
19-
header: X_RIVET_ACTOR.to_string(),
20-
}
21-
.build()
22-
})?;
23-
let actor_id = Id::parse(actor_id_str.to_str()?)?;
17+
let actor_id_str = headers
18+
.get(X_RIVET_ACTOR)
19+
.ok_or_else(|| {
20+
crate::errors::MissingHeader {
21+
header: X_RIVET_ACTOR.to_string(),
22+
}
23+
.build()
24+
})?
25+
.to_str()
26+
.context("invalid x-rivet-actor header")?;
27+
let actor_id = Id::parse(actor_id_str).context("invalid x-rivet-actor header")?;
2428

2529
// Create a hash using target, actor_id, and path
2630
let mut hasher = DefaultHasher::new();

packages/core/guard/server/src/routing/pegboard_gateway.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ pub async fn route_request(
5151
// For HTTP, use the x-rivet-actor header
5252
headers
5353
.get(X_RIVET_ACTOR)
54-
.and_then(|x| x.to_str().ok())
54+
.map(|x| x.to_str())
55+
.transpose()
56+
.context("invalid x-rivet-actor header")?
5557
.ok_or_else(|| {
5658
crate::errors::MissingHeader {
5759
header: X_RIVET_ACTOR.to_string(),
@@ -61,7 +63,7 @@ pub async fn route_request(
6163
};
6264

6365
// Find actor to route to
64-
let actor_id = Id::parse(actor_id_str)?;
66+
let actor_id = Id::parse(actor_id_str).context("invalid x-rivet-actor header")?;
6567

6668
// Route to peer dc where the actor lives
6769
if actor_id.label() != ctx.config().dc_label() {
@@ -76,12 +78,12 @@ pub async fn route_request(
7678
targets: vec![RouteTarget {
7779
actor_id: Some(actor_id),
7880
host: peer_dc
79-
.public_url_host()
80-
.context("bad peer dc public url host")?
81+
.proxy_url_host()
82+
.context("bad peer dc proxy url host")?
8183
.to_string(),
8284
port: peer_dc
83-
.public_url_port()
84-
.context("bad peer dc public url port")?,
85+
.proxy_url_port()
86+
.context("bad peer dc proxy url port")?,
8587
path: path.to_owned(),
8688
}],
8789
timeout: RoutingTimeout {

packages/core/pegboard-serverless/src/lib.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ const X_RIVET_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivet-total-s
2828
const X_RIVET_RUNNER_NAME: HeaderName = HeaderName::from_static("x-rivet-runner-name");
2929
const X_RIVET_NAMESPACE_ID: HeaderName = HeaderName::from_static("x-rivet-namespace-id");
3030

31-
const DRAIN_GRACE_PERIOD: Duration = Duration::from_secs(10);
31+
const DRAIN_GRACE_PERIOD: Duration = Duration::from_secs(5);
3232

3333
struct OutboundConnection {
3434
handle: JoinHandle<()>,
@@ -387,15 +387,14 @@ async fn outbound_handler(
387387
_ = shutdown_rx => {}
388388
}
389389

390-
// Stop runner
391390
draining.store(true, Ordering::SeqCst);
392391

393392
ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {})
394393
.send()
395394
.await?;
396395

397396
if let Some(runner_id) = runner_id {
398-
stop_runner(ctx, runner_id).await?;
397+
drain_runner(ctx, runner_id).await?;
399398
}
400399

401400
// Continue waiting on req while draining
@@ -421,7 +420,7 @@ async fn outbound_handler(
421420
let runner_id_local =
422421
Id::parse(&init.runner_id).context("invalid runner id")?;
423422
runner_id = Some(runner_id_local);
424-
stop_runner(ctx, runner_id_local).await?;
423+
drain_runner(ctx, runner_id_local).await?;
425424
}
426425
}
427426
}
@@ -440,7 +439,6 @@ async fn outbound_handler(
440439
_ = tokio::time::sleep(DRAIN_GRACE_PERIOD) => {
441440
tracing::debug!("reached drain grace period before runner shut down")
442441
}
443-
444442
}
445443

446444
// Close connection
@@ -456,7 +454,7 @@ async fn outbound_handler(
456454
Ok(())
457455
}
458456

459-
async fn stop_runner(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> {
457+
async fn drain_runner(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> {
460458
let res = ctx
461459
.signal(pegboard::workflows::runner::Forward {
462460
inner: protocol::ToServer::ToServerStopping,
@@ -484,7 +482,7 @@ async fn stop_runner(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> {
484482

485483
/// Send a stop message to the client.
486484
///
487-
/// This will close the runner's WebSocket..
485+
/// This will close the runner's WebSocket.
488486
async fn publish_to_client_stop(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> {
489487
let receiver_subject =
490488
pegboard::pubsub_subjects::RunnerReceiverSubject::new(runner_id).to_string();

packages/infra/engine/src/run_config.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,12 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result<RunConfigData> {
2424
|config, pools| Box::pin(pegboard_serverless::start(config, pools)),
2525
),
2626
// Core services
27-
Service::new(
28-
"tracing_reconfigure",
29-
ServiceKind::Core,
30-
|config, pools| Box::pin(rivet_tracing_reconfigure::start(config, pools)),
31-
),
32-
Service::new(
33-
"cache_purge",
34-
ServiceKind::Core,
35-
|config, pools| Box::pin(rivet_cache_purge::start(config, pools)),
36-
),
27+
Service::new("tracing_reconfigure", ServiceKind::Core, |config, pools| {
28+
Box::pin(rivet_tracing_reconfigure::start(config, pools))
29+
}),
30+
Service::new("cache_purge", ServiceKind::Core, |config, pools| {
31+
Box::pin(rivet_cache_purge::start(config, pools))
32+
}),
3733
];
3834

3935
Ok(RunConfigData { services })

packages/services/cache-purge/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R
2020
while let Ok(NextOutput::Message(msg)) = sub.next().await {
2121
match serde_json::from_slice::<CachePurgeMessage>(&msg.payload) {
2222
Ok(purge_msg) => {
23-
tracing::info!(
23+
tracing::debug!(
2424
base_key = ?purge_msg.base_key,
2525
keys_count = purge_msg.keys.len(),
2626
"received cache purge request"

0 commit comments

Comments
 (0)