Skip to content

Commit bdfca35

Browse files
committed
fix(pb): fix various race condition edge cases (#3137)
1 parent dc96382 commit bdfca35

File tree

9 files changed

+173
-104
lines changed

9 files changed

+173
-104
lines changed

packages/common/gasoline/core/src/db/kv/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ impl DatabaseKv {
139139

140140
let workflow_name_key = keys::workflow::NameKey::new(workflow_id);
141141

142+
// TODO: This does not check if the workflow is silenced
142143
// Check if the workflow exists
143144
let Some(workflow_name_entry) = tx
144145
.get(&self.subspace.pack(&workflow_name_key), Serializable)

packages/common/util/id/src/lib.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,8 @@ fn base36_char_to_base10(c: char) -> Result<u8, IdError> {
289289
match c {
290290
'0'..='9' => Ok(c as u8 - b'0'),
291291
'a'..='z' => Ok(c as u8 - b'a' + 10),
292+
// Uppercase is treated the same as lowercase
293+
'A'..='Z' => Ok(c as u8 - b'A' + 10),
292294
_ => return Err(IdError::InvalidChar(c)),
293295
}
294296
}
@@ -313,4 +315,15 @@ mod tests {
313315
let parsed = Id::from_str(&s).unwrap();
314316
assert_eq!(parsed, id);
315317
}
318+
319+
#[test]
320+
fn test_v1_parse_uppercase() {
321+
let uuid = Uuid::new_v4();
322+
let label = 0xABCD;
323+
let id = Id::v1(uuid, label);
324+
let s = id.to_string().to_ascii_uppercase();
325+
assert_eq!(s.len(), 30);
326+
let parsed = Id::from_str(&s).unwrap();
327+
assert_eq!(parsed, id);
328+
}
316329
}

packages/core/pegboard-serverless/src/lib.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ fn spawn_connection(
236236
)
237237
.await
238238
{
239-
tracing::error!(?err, "outbound req failed");
239+
tracing::warn!(?err, "outbound req failed");
240240

241241
// TODO: Add backoff
242242
tokio::time::sleep(Duration::from_secs(1)).await;
@@ -368,15 +368,28 @@ async fn outbound_handler(
368368
// If runner_id is none at this point it means we did not send the stopping signal yet, so
369369
// send it now
370370
if runner_id.is_none() {
371-
stop_runner(ctx, Id::parse(&msg.data)?).await?;
371+
let data = BASE64.decode(msg.data).context("invalid base64 message")?;
372+
let payload =
373+
protocol::versioned::ToServerlessServer::deserialize_with_embedded_version(
374+
&data,
375+
)
376+
.context("invalid payload")?;
377+
378+
match payload {
379+
protocol::ToServerlessServer::ToServerlessServerInit(init) => {
380+
let runner_id =
381+
Id::parse(&init.runner_id).context("invalid runner id")?;
382+
stop_runner(ctx, runner_id).await?;
383+
}
384+
}
372385
}
373386
}
374387
Err(sse::Error::StreamEnded) => break,
375388
Err(err) => return Err(err.into()),
376389
}
377390
}
378391

379-
tracing::info!("outbound req stopped");
392+
tracing::debug!("outbound req stopped");
380393

381394
Ok(())
382395
}

packages/services/pegboard/src/ops/runner/update_alloc_idx.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,20 @@ pub async fn pegboard_runner_update_alloc_idx(ctx: &OperationCtx, input: &Input)
108108
continue;
109109
};
110110

111-
// Runner is expired, updating will do nothing
111+
// Runner is expired, AddIdx is invalid and UpdatePing will do nothing
112112
if expired_ts_entry.is_some() {
113-
notifications.push(RunnerNotification {
114-
runner_id: runner.runner_id,
115-
workflow_id,
116-
eligibility: RunnerEligibility::Expired,
117-
});
113+
match runner.action {
114+
Action::ClearIdx => {}
115+
Action::AddIdx | Action::UpdatePing { .. } => {
116+
notifications.push(RunnerNotification {
117+
runner_id: runner.runner_id,
118+
workflow_id,
119+
eligibility: RunnerEligibility::Expired,
120+
});
118121

119-
continue;
122+
continue;
123+
}
124+
}
120125
}
121126

122127
let remaining_millislots = (remaining_slots * 1000) / total_slots;

packages/services/pegboard/src/workflows/actor/mod.rs

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
266266
// Fake signal
267267
Main::Lost(Lost {
268268
generation: state.generation,
269+
force_reschedule: false,
269270
})
270271
}
271272
} else if let Some(alarm_ts) = state.alarm_ts {
@@ -365,7 +366,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
365366
protocol::ActorStateStopped { code, .. },
366367
) => {
367368
if let Some(res) =
368-
handle_stopped(ctx, &input, state, Some(code), false)
369+
handle_stopped(ctx, &input, state, Some(code), false, false)
369370
.await?
370371
{
371372
return Ok(Loop::Break(res));
@@ -386,7 +387,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
386387
state.sleeping = false;
387388
state.will_wake = false;
388389

389-
match runtime::reschedule_actor(ctx, &input, state).await? {
390+
match runtime::reschedule_actor(ctx, &input, state, false).await? {
390391
runtime::SpawnActorOutput::Allocated { .. } => {},
391392
runtime::SpawnActorOutput::Sleep => {
392393
state.sleeping = true;
@@ -416,6 +417,8 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
416417
actor_id=?input.actor_id,
417418
"cannot wake actor that is not sleeping",
418419
);
420+
421+
state.wake_for_alarm = false;
419422
}
420423
}
421424
Main::Lost(sig) => {
@@ -425,7 +428,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
425428
}
426429

427430
if let Some(res) =
428-
handle_stopped(ctx, &input, state, None, true).await?
431+
handle_stopped(ctx, &input, state, None, true, sig.force_reschedule).await?
429432
{
430433
return Ok(Loop::Break(res));
431434
}
@@ -485,8 +488,9 @@ async fn handle_stopped(
485488
state: &mut runtime::LifecycleState,
486489
code: Option<protocol::StopCode>,
487490
lost: bool,
491+
force_reschedule: bool,
488492
) -> Result<Option<runtime::LifecycleRes>> {
489-
tracing::debug!(?code, "actor stopped");
493+
tracing::debug!(?code, %force_reschedule, "actor stopped");
490494

491495
// Reset retry count on successful exit
492496
if let Some(protocol::StopCode::Ok) = code {
@@ -496,7 +500,7 @@ async fn handle_stopped(
496500
// Clear stop gc timeout to prevent being marked as lost in the lifecycle loop
497501
state.gc_timeout_ts = None;
498502
state.runner_id = None;
499-
state.runner_workflow_id = None;
503+
let old_runner_workflow_id = state.runner_workflow_id.take();
500504

501505
let deallocate_res = ctx
502506
.activity(runtime::DeallocateInput {
@@ -530,26 +534,44 @@ async fn handle_stopped(
530534
}
531535
}
532536

537+
// Reschedule no matter what
538+
if force_reschedule {
539+
match runtime::reschedule_actor(ctx, &input, state, true).await? {
540+
runtime::SpawnActorOutput::Allocated { .. } => {}
541+
// NOTE: This should be unreachable because force_reschedule is true
542+
runtime::SpawnActorOutput::Sleep => {
543+
state.sleeping = true;
544+
}
545+
runtime::SpawnActorOutput::Destroy => {
546+
// Destroyed early
547+
return Ok(Some(runtime::LifecycleRes {
548+
generation: state.generation,
549+
// False here because if we received the destroy signal, it is
550+
// guaranteed that we did not allocate another actor.
551+
kill: false,
552+
}));
553+
}
554+
}
555+
}
533556
// Handle rescheduling if not marked as sleeping
534-
if !state.sleeping {
557+
else if !state.sleeping {
535558
let failed = matches!(code, None | Some(protocol::StopCode::Error));
536559

537-
match (failed, input.crash_policy) {
538-
(true, CrashPolicy::Restart) => {
560+
match (input.crash_policy, failed) {
561+
(CrashPolicy::Restart, true) => {
539562
// Kill old actor immediately if lost
540563
if lost {
541564
destroy::kill(
542565
ctx,
543566
input.actor_id,
544567
state.generation,
545-
state
546-
.runner_workflow_id
568+
old_runner_workflow_id
547569
.context("should have runner_workflow_id set if not sleeping")?,
548570
)
549571
.await?;
550572
}
551573

552-
match runtime::reschedule_actor(ctx, &input, state).await? {
574+
match runtime::reschedule_actor(ctx, &input, state, false).await? {
553575
runtime::SpawnActorOutput::Allocated { .. } => {}
554576
// NOTE: Its not possible for `SpawnActorOutput::Sleep` to be returned here, the crash
555577
// policy is `Restart`.
@@ -564,7 +586,7 @@ async fn handle_stopped(
564586
}
565587
}
566588
}
567-
(true, CrashPolicy::Sleep) => {
589+
(CrashPolicy::Sleep, true) => {
568590
tracing::debug!(actor_id=?input.actor_id, "actor sleeping due to crash");
569591

570592
state.sleeping = true;
@@ -577,15 +599,6 @@ async fn handle_stopped(
577599
_ => {
578600
ctx.activity(runtime::SetCompleteInput {}).await?;
579601

580-
if lost {
581-
ctx.msg(Failed {
582-
error: errors::Actor::DestroyedWhileWaitingForReady,
583-
})
584-
.tag("actor_id", input.actor_id)
585-
.send()
586-
.await?;
587-
}
588-
589602
return Ok(Some(runtime::LifecycleRes {
590603
generation: state.generation,
591604
kill: lost,
@@ -596,9 +609,8 @@ async fn handle_stopped(
596609
// Rewake actor immediately after stopping if `will_wake` was set
597610
else if state.will_wake {
598611
state.sleeping = false;
599-
state.will_wake = false;
600612

601-
match runtime::reschedule_actor(ctx, &input, state).await? {
613+
match runtime::reschedule_actor(ctx, &input, state, false).await? {
602614
runtime::SpawnActorOutput::Allocated { .. } => {}
603615
runtime::SpawnActorOutput::Sleep => {
604616
state.sleeping = true;
@@ -613,10 +625,11 @@ async fn handle_stopped(
613625
}));
614626
}
615627
}
616-
617-
state.wake_for_alarm = false;
618628
}
619629

630+
state.wake_for_alarm = false;
631+
state.will_wake = false;
632+
620633
Ok(None)
621634
}
622635

@@ -651,6 +664,7 @@ pub struct Wake {}
651664
#[signal("pegboard_actor_lost")]
652665
pub struct Lost {
653666
pub generation: u32,
667+
pub force_reschedule: bool,
654668
}
655669

656670
#[signal("pegboard_actor_destroy")]

packages/services/pegboard/src/workflows/actor/runtime.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,11 @@ async fn update_runner(ctx: &ActivityCtx, input: &UpdateRunnerInput) -> Result<(
107107
struct AllocateActorInput {
108108
actor_id: Id,
109109
generation: u32,
110-
from_alarm: bool,
110+
force_allocate: bool,
111111
}
112112

113113
#[derive(Debug, Serialize, Deserialize)]
114+
#[serde(rename_all = "snake_case")]
114115
pub enum AllocateActorOutput {
115116
Allocated {
116117
runner_id: Id,
@@ -315,7 +316,7 @@ async fn allocate_actor(
315316

316317
// At this point in the txn there is no availability
317318

318-
match (crash_policy, input.from_alarm, has_valid_serverless) {
319+
match (crash_policy, input.force_allocate, has_valid_serverless) {
319320
(CrashPolicy::Sleep, false, false) => {
320321
Ok((for_serverless, AllocateActorOutput::Sleep))
321322
}
@@ -373,6 +374,11 @@ async fn allocate_actor(
373374
AllocateActorOutput::Pending {
374375
pending_allocation_ts,
375376
} => {
377+
tracing::warn!(
378+
actor_id=?input.actor_id,
379+
"failed to allocate (no availability), waiting for allocation",
380+
);
381+
376382
state.pending_allocation_ts = Some(*pending_allocation_ts);
377383
}
378384
AllocateActorOutput::Sleep => {}
@@ -473,14 +479,14 @@ pub async fn spawn_actor(
473479
ctx: &mut WorkflowCtx,
474480
input: &Input,
475481
generation: u32,
476-
from_alarm: bool,
482+
force_allocate: bool,
477483
) -> Result<SpawnActorOutput> {
478484
// Attempt allocation
479485
let allocate_res = ctx
480486
.activity(AllocateActorInput {
481487
actor_id: input.actor_id,
482488
generation,
483-
from_alarm,
489+
force_allocate,
484490
})
485491
.await?;
486492

@@ -524,11 +530,6 @@ pub async fn spawn_actor(
524530
AllocateActorOutput::Pending {
525531
pending_allocation_ts,
526532
} => {
527-
tracing::warn!(
528-
actor_id=?input.actor_id,
529-
"failed to allocate (no availability), waiting for allocation",
530-
);
531-
532533
// Bump the autoscaler so it can scale up
533534
ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {})
534535
.send()
@@ -611,6 +612,7 @@ pub async fn reschedule_actor(
611612
ctx: &mut WorkflowCtx,
612613
input: &Input,
613614
state: &mut LifecycleState,
615+
force_reschedule: bool,
614616
) -> Result<SpawnActorOutput> {
615617
tracing::debug!(actor_id=?input.actor_id, "rescheduling actor");
616618

@@ -653,7 +655,13 @@ pub async fn reschedule_actor(
653655
}
654656

655657
let next_generation = state.generation + 1;
656-
let spawn_res = spawn_actor(ctx, &input, next_generation, state.wake_for_alarm).await?;
658+
let spawn_res = spawn_actor(
659+
ctx,
660+
&input,
661+
next_generation,
662+
force_reschedule || state.wake_for_alarm,
663+
)
664+
.await?;
657665

658666
if let SpawnActorOutput::Allocated {
659667
runner_id,

0 commit comments

Comments
 (0)