Skip to content

Commit 8bcfeb9

Browse files
committed
fix(pb): fix sleeping + serverless state issues (#3089)
1 parent 249290b commit 8bcfeb9

File tree

12 files changed

+113
-60
lines changed

12 files changed

+113
-60
lines changed

packages/common/gasoline/core/src/builder/common/signal.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,16 @@ impl<T: Signal + Serialize> SignalBuilder<T> {
4141
}
4242
}
4343

44+
// TODO: Get rid of this
45+
// NOTE: This is a bad implementation because it disregards other errors that may have happened earlier
46+
pub fn bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING(mut self) -> Self {
47+
if let Some(BuilderError::CannotDispatchFromOpInWorkflow) = &self.error {
48+
self.error = None;
49+
}
50+
51+
self
52+
}
53+
4454
pub fn to_workflow_id(mut self, workflow_id: Id) -> Self {
4555
if self.error.is_some() {
4656
return self;

packages/common/gasoline/core/src/ctx/standalone.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub struct StandaloneCtx {
3030
pools: rivet_pools::Pools,
3131
cache: rivet_cache::Cache,
3232
msg_ctx: MessageCtx,
33+
from_workflow: bool,
3334
}
3435

3536
impl StandaloneCtx {
@@ -61,33 +62,42 @@ impl StandaloneCtx {
6162
pools,
6263
cache,
6364
msg_ctx,
65+
from_workflow: false,
6466
})
6567
}
6668

6769
#[tracing::instrument(skip_all)]
6870
pub fn new_from_activity(ctx: &ActivityCtx, req_id: Id) -> WorkflowResult<Self> {
69-
StandaloneCtx::new(
71+
let mut ctx = StandaloneCtx::new(
7072
ctx.db().clone(),
7173
ctx.config().clone(),
7274
ctx.pools().clone(),
7375
ctx.cache().clone(),
7476
ctx.name(),
7577
ctx.ray_id(),
7678
req_id,
77-
)
79+
)?;
80+
81+
ctx.from_workflow = true;
82+
83+
Ok(ctx)
7884
}
7985

8086
#[tracing::instrument(skip_all)]
8187
pub fn new_from_operation(ctx: &OperationCtx, req_id: Id) -> WorkflowResult<Self> {
82-
StandaloneCtx::new(
88+
let mut ctx = StandaloneCtx::new(
8389
ctx.db().clone(),
8490
ctx.config().clone(),
8591
ctx.pools().clone(),
8692
ctx.cache().clone(),
8793
ctx.name(),
8894
ctx.ray_id(),
8995
req_id,
90-
)
96+
)?;
97+
98+
ctx.from_workflow = ctx.from_workflow;
99+
100+
Ok(ctx)
91101
}
92102
}
93103

@@ -107,7 +117,7 @@ impl StandaloneCtx {
107117
self.config.clone(),
108118
self.ray_id,
109119
input,
110-
false,
120+
self.from_workflow,
111121
)
112122
}
113123

@@ -134,7 +144,7 @@ impl StandaloneCtx {
134144
self.config.clone(),
135145
self.ray_id,
136146
body,
137-
false,
147+
self.from_workflow,
138148
)
139149
}
140150

@@ -153,7 +163,7 @@ impl StandaloneCtx {
153163
&self.pools,
154164
&self.cache,
155165
self.ray_id,
156-
false,
166+
self.from_workflow,
157167
input,
158168
)
159169
.in_current_span()

packages/common/universaldb/src/transaction.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ impl Transaction {
150150
.map_err(Into::into)
151151
}
152152

153-
pub fn atomic_op<'de, T: FormalKey + TuplePack + TupleUnpack<'de>>(
153+
pub fn atomic_op<'de, T: std::fmt::Debug + FormalKey + TuplePack + TupleUnpack<'de>>(
154154
&self,
155155
key: &'de T,
156156
param: &[u8],

packages/core/bootstrap/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R
1313
)?;
1414

1515
tokio::try_join!(
16-
setup_epoxy_coordinator(&ctx),
17-
setup_epoxy_replica(&ctx),
16+
async {
17+
// Replicas must exist before coordinator
18+
setup_epoxy_replica(&ctx).await?;
19+
setup_epoxy_coordinator(&ctx).await
20+
},
1821
create_default_namespace(&ctx),
1922
)?;
2023

packages/core/pegboard-serverless/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,11 @@ async fn tick(
145145

146146
// Log warning and reset to 0 if negative
147147
let adjusted_desired_slots = if *desired_slots < 0 {
148-
tracing::warn!(
148+
tracing::error!(
149149
?ns_id,
150150
?runner_name,
151-
desired_slots = ?desired_slots,
152-
"Negative desired_slots detected, resetting to 0"
151+
?desired_slots,
152+
"negative desired slots, scaling to 0"
153153
);
154154
0
155155
} else {

packages/services/epoxy/src/replica/message_request.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ pub async fn message_request(
104104
replica_id: req.replica_id,
105105
status: req.status.into(),
106106
})
107+
.bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING()
107108
.to_workflow::<crate::workflows::coordinator::Workflow>()
108109
.tag("replica", replica_id)
109110
.send()
@@ -121,6 +122,7 @@ pub async fn message_request(
121122
ctx.signal(crate::workflows::replica::BeginLearning {
122123
config: req.config.clone().into(),
123124
})
125+
.bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING()
124126
.to_workflow::<crate::workflows::replica::Workflow>()
125127
.tag("replica", replica_id)
126128
.send()

packages/services/pegboard/src/workflows/actor/mod.rs

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,9 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
294294
ctx,
295295
input.actor_id,
296296
state.generation,
297-
state.runner_workflow_id,
297+
state.runner_workflow_id.context(
298+
"should have runner_workflow_id set if sleeping",
299+
)?,
298300
)
299301
.await?;
300302
}
@@ -311,7 +313,9 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
311313
ctx,
312314
input.actor_id,
313315
state.generation,
314-
state.runner_workflow_id,
316+
state.runner_workflow_id.context(
317+
"should have runner_workflow_id set if stopping",
318+
)?,
315319
)
316320
.await?;
317321
}
@@ -330,7 +334,9 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
330334
.await?;
331335

332336
ctx.msg(Ready {
333-
runner_id: state.runner_id,
337+
runner_id: state
338+
.runner_id
339+
.context("should have runner_id set if running")?,
334340
})
335341
.tag("actor_id", input.actor_id)
336342
.send()
@@ -355,20 +361,28 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
355361
}
356362
}
357363
Main::Wake(_sig) => {
358-
// Ignore wake if we are not sleeping. This is expected to happen under certain
359-
// circumstances.
360364
if state.sleeping {
361-
state.alarm_ts = None;
362-
state.sleeping = false;
363-
364-
if runtime::reschedule_actor(ctx, &input, state).await? {
365-
// Destroyed early
366-
return Ok(Loop::Break(runtime::LifecycleRes {
367-
generation: state.generation,
368-
// False here because if we received the destroy signal, it is
369-
// guaranteed that we did not allocate another actor.
370-
kill: false,
371-
}));
365+
if state.runner_id.is_none() {
366+
state.alarm_ts = None;
367+
state.sleeping = false;
368+
state.will_wake = false;
369+
370+
if runtime::reschedule_actor(ctx, &input, state).await? {
371+
// Destroyed early
372+
return Ok(Loop::Break(runtime::LifecycleRes {
373+
generation: state.generation,
374+
// False here because if we received the destroy signal, it is
375+
// guaranteed that we did not allocate another actor.
376+
kill: false,
377+
}));
378+
}
379+
} else {
380+
state.will_wake = true;
381+
382+
tracing::debug!(
383+
actor_id=?input.actor_id,
384+
"cannot wake an actor that intends to sleep but has not stopped yet, deferring wake until after stop",
385+
);
372386
}
373387
} else {
374388
tracing::debug!(
@@ -447,27 +461,30 @@ async fn handle_stopped(
447461
) -> Result<Option<runtime::LifecycleRes>> {
448462
tracing::debug!(?code, "actor stopped");
449463

450-
// Reset retry count
464+
// Reset retry count on successful exit
451465
if let Some(protocol::StopCode::Ok) = code {
452466
state.reschedule_state = Default::default();
453467
}
454468

469+
// Clear stop gc timeout to prevent being marked as lost in the lifecycle loop
455470
state.gc_timeout_ts = None;
471+
state.runner_id = None;
472+
state.runner_workflow_id = None;
456473

457474
ctx.activity(runtime::DeallocateInput {
458475
actor_id: input.actor_id,
459476
})
460477
.await?;
461478

462-
// Allocate other pending actors from queue
479+
// Allocate other pending actors from queue since a slot has now cleared
463480
let res = ctx
464481
.activity(AllocatePendingActorsInput {
465482
namespace_id: input.namespace_id,
466483
name: input.runner_name_selector.clone(),
467484
})
468485
.await?;
469486

470-
// Dispatch pending allocs
487+
// Dispatch pending allocs (if any)
471488
for alloc in res.allocations {
472489
ctx.signal(alloc.signal)
473490
.to_workflow::<Workflow>()
@@ -476,6 +493,7 @@ async fn handle_stopped(
476493
.await?;
477494
}
478495

496+
// Handle rescheduling if not marked as sleeping
479497
if !state.sleeping {
480498
let failed = matches!(code, None | Some(protocol::StopCode::Error));
481499

@@ -487,7 +505,9 @@ async fn handle_stopped(
487505
ctx,
488506
input.actor_id,
489507
state.generation,
490-
state.runner_workflow_id,
508+
state
509+
.runner_workflow_id
510+
.context("should have runner_workflow_id set if not sleeping")?,
491511
)
492512
.await?;
493513
}
@@ -531,6 +551,20 @@ async fn handle_stopped(
531551
}
532552
}
533553
}
554+
// Rewake actor immediately after stopping if `will_wake` was set
555+
else if state.will_wake {
556+
state.will_wake = false;
557+
558+
if runtime::reschedule_actor(ctx, &input, state).await? {
559+
// Destroyed early
560+
return Ok(Some(runtime::LifecycleRes {
561+
generation: state.generation,
562+
// False here because if we received the destroy signal, it is
563+
// guaranteed that we did not allocate another actor.
564+
kill: false,
565+
}));
566+
}
567+
}
534568

535569
Ok(None)
536570
}

packages/services/pegboard/src/workflows/actor/runtime.rs

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ use super::{
2020
pub struct LifecycleState {
2121
pub generation: u32,
2222

23-
// TODO: Make these optional? These might not match the properties in the workflow state but it shouldn't
24-
// matter for the functionality of the lifecycle loop
25-
pub runner_id: Id,
26-
pub runner_workflow_id: Id,
23+
// Set when currently running (not rescheduling or sleeping)
24+
pub runner_id: Option<Id>,
25+
pub runner_workflow_id: Option<Id>,
2726

2827
pub sleeping: bool,
28+
#[serde(default)]
29+
pub will_wake: bool,
2930
pub alarm_ts: Option<i64>,
3031
pub gc_timeout_ts: Option<i64>,
3132

@@ -36,9 +37,10 @@ impl LifecycleState {
3637
pub fn new(runner_id: Id, runner_workflow_id: Id) -> Self {
3738
LifecycleState {
3839
generation: 0,
39-
runner_id,
40-
runner_workflow_id,
40+
runner_id: Some(runner_id),
41+
runner_workflow_id: Some(runner_workflow_id),
4142
sleeping: false,
43+
will_wake: false,
4244
alarm_ts: None,
4345
gc_timeout_ts: Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS),
4446
reschedule_state: RescheduleState::default(),
@@ -352,6 +354,7 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<()
352354
tx.delete(&keys::actor::ConnectableKey::new(input.actor_id));
353355

354356
if let Some(runner_id) = runner_id {
357+
// Only clear slot if we have a runner id
355358
destroy::clear_slot(
356359
input.actor_id,
357360
namespace_id,
@@ -361,15 +364,6 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<()
361364
&tx,
362365
)
363366
.await?;
364-
} else if for_serverless {
365-
tx.atomic_op(
366-
&rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new(
367-
namespace_id,
368-
runner_name_selector.clone(),
369-
),
370-
&(-1i64).to_le_bytes(),
371-
MutationType::Add,
372-
);
373367
}
374368

375369
Ok(())
@@ -551,8 +545,8 @@ pub async fn reschedule_actor(
551545
// Update loop state
552546
if let Some((reschedule_state, res)) = res {
553547
state.generation = next_generation;
554-
state.runner_id = res.runner_id;
555-
state.runner_workflow_id = res.runner_workflow_id;
548+
state.runner_id = Some(res.runner_id);
549+
state.runner_workflow_id = Some(res.runner_workflow_id);
556550

557551
// Save reschedule state in global state
558552
state.reschedule_state = reschedule_state;

scripts/api/add-serverless.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@ if (!rivetToken) {
1515

1616
const endpoint =
1717
process.env.RIVET_ENDPOINT ||
18-
(await rl.question("Rivet Endpoint (default: https://api.rivet.gg): ")) ||
19-
"https://api.rivet.gg";
18+
(await rl.question("Rivet Endpoint (default: http://localhost:6420): ")) ||
19+
"http://localhost:6420";
2020
const namespace =
2121
(await rl.question("Namespace (default: default): ")) || "default";
2222
const runnerName =
2323
(await rl.question("Runner name (default: serverless): ")) || "serverless";
2424
const serverlessUrl =
2525
(await rl.question(
26-
"Serverless URL (default: http://localhost:8080/api/start): ",
27-
)) || "http://localhost:8080/api/start";
26+
"Serverless URL (default: http://localhost:3000/api/rivet/start): ",
27+
)) || "http://localhost:3000/api/rivet/start";
2828

2929
rl.close();
3030

scripts/api/delete-run-config.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ if (!rivetToken) {
1515

1616
const endpoint =
1717
process.env.RIVET_ENDPOINT ||
18-
(await rl.question("Rivet Endpoint (default: https://api.rivet.gg): ")) ||
19-
"https://api.rivet.gg";
18+
(await rl.question("Rivet Endpoint (default: http://localhost:6420): ")) ||
19+
"http://localhost:6420";
2020
const namespace =
2121
(await rl.question("Namespace (default: default): ")) || "default";
2222
const runnerName = await rl.question("Runner name to delete: ");

0 commit comments

Comments
 (0)