Skip to content

Commit 6bda5b8

Browse files
committed
chore(guard): clean up actor gateway hotpath txns (#3113)
1 parent 51ad7be commit 6bda5b8

File tree

5 files changed

+126
-61
lines changed

5 files changed

+126
-61
lines changed

packages/core/guard/server/src/routing/pegboard_gateway.rs

Lines changed: 7 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use anyhow::Result;
44
use gas::prelude::*;
55
use hyper::header::HeaderName;
66
use rivet_guard_core::proxy_service::{RouteConfig, RouteTarget, RoutingOutput, RoutingTimeout};
7-
use universaldb::utils::IsolationLevel::*;
87

98
use super::SEC_WEBSOCKET_PROTOCOL;
109
use crate::{errors, shared_state::SharedState};
@@ -93,26 +92,6 @@ pub async fn route_request(
9392
})));
9493
}
9594

96-
// Lookup actor
97-
find_actor(ctx, shared_state, actor_id, path).await
98-
}
99-
100-
struct FoundActor {
101-
workflow_id: Id,
102-
sleeping: bool,
103-
destroyed: bool,
104-
}
105-
106-
/// Find an actor by actor_id
107-
#[tracing::instrument(skip_all, fields(%actor_id, %path))]
108-
async fn find_actor(
109-
ctx: &StandaloneCtx,
110-
shared_state: &SharedState,
111-
actor_id: Id,
112-
path: &str,
113-
) -> Result<Option<RoutingOutput>> {
114-
// TODO: Optimize this down to a single FDB call
115-
11695
// Create subs before checking if actor exists/is not destroyed
11796
let mut ready_sub = ctx
11897
.subscribe::<pegboard::workflows::actor::Ready>(("actor_id", actor_id))
@@ -124,37 +103,11 @@ async fn find_actor(
124103
.subscribe::<pegboard::workflows::actor::DestroyStarted>(("actor_id", actor_id))
125104
.await?;
126105

127-
let actor_res = tokio::time::timeout(
128-
Duration::from_secs(5),
129-
ctx.udb()?
130-
.run(|tx| async move {
131-
let tx = tx.with_subspace(pegboard::keys::subspace());
132-
133-
let workflow_id_key = pegboard::keys::actor::WorkflowIdKey::new(actor_id);
134-
let sleep_ts_key = pegboard::keys::actor::SleepTsKey::new(actor_id);
135-
let destroy_ts_key = pegboard::keys::actor::DestroyTsKey::new(actor_id);
136-
137-
let (workflow_id_entry, sleeping, destroyed) = tokio::try_join!(
138-
tx.read_opt(&workflow_id_key, Serializable),
139-
tx.exists(&sleep_ts_key, Serializable),
140-
tx.exists(&destroy_ts_key, Serializable),
141-
)?;
142-
143-
let Some(workflow_id) = workflow_id_entry else {
144-
return Ok(None);
145-
};
146-
147-
Ok(Some(FoundActor {
148-
workflow_id,
149-
sleeping,
150-
destroyed,
151-
}))
152-
})
153-
.custom_instrument(tracing::info_span!("actor_exists_tx")),
154-
)
155-
.await??;
156-
157-
let Some(actor) = actor_res else {
106+
// Fetch actor info
107+
let Some(actor) = ctx
108+
.op(pegboard::ops::actor::get_for_gateway::Input { actor_id })
109+
.await?
110+
else {
158111
return Err(errors::ActorNotFound { actor_id }.build());
159112
};
160113

@@ -170,15 +123,8 @@ async fn find_actor(
170123
.await?;
171124
}
172125

173-
// Check if actor is connectable and get runner_id
174-
let get_runner_fut = ctx.op(pegboard::ops::actor::get_runner::Input {
175-
actor_ids: vec![actor_id],
176-
});
177-
let res = tokio::time::timeout(Duration::from_secs(5), get_runner_fut).await??;
178-
let actor = res.actors.into_iter().next().filter(|x| x.is_connectable);
179-
180-
let runner_id = if let Some(actor) = actor {
181-
actor.runner_id
126+
let runner_id = if let (Some(runner_id), true) = (actor.runner_id, actor.connectable) {
127+
runner_id
182128
} else {
183129
tracing::debug!(?actor_id, "waiting for actor to become ready");
184130

packages/services/pegboard/src/keys/actor.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,3 +269,47 @@ impl<'de> TupleUnpack<'de> for DestroyTsKey {
269269
Ok((input, v))
270270
}
271271
}
272+
273+
#[derive(Debug)]
274+
pub struct NamespaceIdKey {
275+
actor_id: Id,
276+
}
277+
278+
impl NamespaceIdKey {
279+
pub fn new(actor_id: Id) -> Self {
280+
NamespaceIdKey { actor_id }
281+
}
282+
}
283+
284+
impl FormalKey for NamespaceIdKey {
285+
type Value = Id;
286+
287+
fn deserialize(&self, raw: &[u8]) -> Result<Self::Value> {
288+
Ok(Id::from_slice(raw)?)
289+
}
290+
291+
fn serialize(&self, value: Self::Value) -> Result<Vec<u8>> {
292+
Ok(value.as_bytes())
293+
}
294+
}
295+
296+
impl TuplePack for NamespaceIdKey {
297+
fn pack<W: std::io::Write>(
298+
&self,
299+
w: &mut W,
300+
tuple_depth: TupleDepth,
301+
) -> std::io::Result<VersionstampOffset> {
302+
let t = (ACTOR, DATA, self.actor_id, NAMESPACE_ID);
303+
t.pack(w, tuple_depth)
304+
}
305+
}
306+
307+
impl<'de> TupleUnpack<'de> for NamespaceIdKey {
308+
fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
309+
let (input, (_, _, actor_id, _)) = <(usize, usize, Id, usize)>::unpack(input, tuple_depth)?;
310+
311+
let v = NamespaceIdKey { actor_id };
312+
313+
Ok((input, v))
314+
}
315+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
use gas::prelude::*;
2+
use universaldb::utils::IsolationLevel::*;
3+
4+
use crate::keys;
5+
6+
#[derive(Debug)]
7+
pub struct Input {
8+
pub actor_id: Id,
9+
}
10+
11+
#[derive(Debug)]
12+
pub struct Output {
13+
pub namespace_id: Id,
14+
pub workflow_id: Id,
15+
pub sleeping: bool,
16+
pub destroyed: bool,
17+
pub connectable: bool,
18+
pub runner_id: Option<Id>,
19+
}
20+
21+
#[operation]
22+
#[timeout = 5]
23+
pub async fn pegboard_actor_get_for_gateway(
24+
ctx: &OperationCtx,
25+
input: &Input,
26+
) -> Result<Option<Output>> {
27+
ctx.udb()?
28+
.run(|tx| async move {
29+
let tx = tx.with_subspace(keys::subspace());
30+
31+
let namespace_id_key = keys::actor::NamespaceIdKey::new(input.actor_id);
32+
let workflow_id_key = keys::actor::WorkflowIdKey::new(input.actor_id);
33+
let sleep_ts_key = keys::actor::SleepTsKey::new(input.actor_id);
34+
let destroy_ts_key = keys::actor::DestroyTsKey::new(input.actor_id);
35+
let connectable_key = keys::actor::ConnectableKey::new(input.actor_id);
36+
let runner_id_key = keys::actor::RunnerIdKey::new(input.actor_id);
37+
38+
let (
39+
namespace_id_entry,
40+
workflow_id_entry,
41+
sleeping,
42+
destroyed,
43+
connectable,
44+
runner_id,
45+
) = tokio::try_join!(
46+
tx.read_opt(&namespace_id_key, Serializable),
47+
tx.read_opt(&workflow_id_key, Serializable),
48+
tx.exists(&sleep_ts_key, Serializable),
49+
tx.exists(&destroy_ts_key, Serializable),
50+
tx.exists(&connectable_key, Serializable),
51+
tx.read_opt(&runner_id_key, Serializable),
52+
)?;
53+
54+
let (Some(namespace_id), Some(workflow_id)) = (namespace_id_entry, workflow_id_entry)
55+
else {
56+
return Ok(None);
57+
};
58+
59+
Ok(Some(Output {
60+
namespace_id,
61+
workflow_id,
62+
sleeping,
63+
destroyed,
64+
connectable,
65+
runner_id,
66+
}))
67+
})
68+
.custom_instrument(tracing::info_span!("actor_get_for_gateway_tx"))
69+
.await
70+
}

packages/services/pegboard/src/ops/actor/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod create;
22
pub mod get;
3+
pub mod get_for_gateway;
34
pub mod get_for_key;
45
pub mod get_reservation_for_key;
56
pub mod get_runner;

packages/services/pegboard/src/workflows/actor/setup.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ pub async fn insert_state_and_db(ctx: &ActivityCtx, input: &InitStateAndUdbInput
9494
&keys::actor::WorkflowIdKey::new(input.actor_id),
9595
ctx.workflow_id(),
9696
)?;
97+
tx.write(
98+
&keys::actor::NamespaceIdKey::new(input.actor_id),
99+
input.namespace_id,
100+
)?;
97101

98102
Ok(())
99103
})

0 commit comments

Comments
 (0)