Skip to content

Commit 51ad7be

Browse files
committed
fix(pb): clean up state management and allocation logic in actor wf (#3111)
1 parent 134fc7b commit 51ad7be

File tree

2 files changed

+289
-171
lines changed

2 files changed

+289
-171
lines changed

packages/services/pegboard/src/workflows/actor/mod.rs

Lines changed: 77 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -219,25 +219,39 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
219219
.send()
220220
.await?;
221221

222-
let Some(allocate_res) = runtime::spawn_actor(ctx, input, 0).await? else {
223-
// Destroyed early
224-
ctx.workflow(destroy::Input {
225-
namespace_id: input.namespace_id,
226-
actor_id: input.actor_id,
227-
name: input.name.clone(),
228-
key: input.key.clone(),
229-
generation: 0,
230-
kill: false,
231-
})
232-
.output()
233-
.await?;
222+
let lifecycle_state = match runtime::spawn_actor(ctx, input, 0, false).await? {
223+
runtime::SpawnActorOutput::Allocated {
224+
runner_id,
225+
runner_workflow_id,
226+
} => runtime::LifecycleState::new(runner_id, runner_workflow_id),
227+
runtime::SpawnActorOutput::Sleep => {
228+
ctx.activity(runtime::SetSleepingInput {
229+
actor_id: input.actor_id,
230+
})
231+
.await?;
234232

235-
return Ok(());
233+
runtime::LifecycleState::new_sleeping()
234+
}
235+
runtime::SpawnActorOutput::Destroy => {
236+
// Destroyed early
237+
ctx.workflow(destroy::Input {
238+
namespace_id: input.namespace_id,
239+
actor_id: input.actor_id,
240+
name: input.name.clone(),
241+
key: input.key.clone(),
242+
generation: 0,
243+
kill: false,
244+
})
245+
.output()
246+
.await?;
247+
248+
return Ok(());
249+
}
236250
};
237251

238252
let lifecycle_res = ctx
239253
.loope(
240-
runtime::LifecycleState::new(allocate_res.runner_id, allocate_res.runner_workflow_id),
254+
lifecycle_state,
241255
|ctx, state| {
242256
let input = input.clone();
243257

@@ -262,6 +276,8 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
262276
} else {
263277
tracing::debug!(actor_id=?input.actor_id, "actor wake");
264278

279+
state.wake_for_alarm = true;
280+
265281
// Fake signal
266282
Main::Wake(Wake {})
267283
}
@@ -370,16 +386,24 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
370386
state.sleeping = false;
371387
state.will_wake = false;
372388

373-
if runtime::reschedule_actor(ctx, &input, state).await? {
374-
// Destroyed early
375-
return Ok(Loop::Break(runtime::LifecycleRes {
376-
generation: state.generation,
377-
// False here because if we received the destroy signal, it is
378-
// guaranteed that we did not allocate another actor.
379-
kill: false,
380-
}));
389+
match runtime::reschedule_actor(ctx, &input, state).await? {
390+
runtime::SpawnActorOutput::Allocated { .. } => {},
391+
runtime::SpawnActorOutput::Sleep => {
392+
state.sleeping = true;
393+
}
394+
runtime::SpawnActorOutput::Destroy => {
395+
// Destroyed early
396+
return Ok(Loop::Break(runtime::LifecycleRes {
397+
generation: state.generation,
398+
// False here because if we received the destroy signal, it is
399+
// guaranteed that we did not allocate another actor.
400+
kill: false,
401+
}));
402+
}
381403
}
382-
} else {
404+
405+
state.wake_for_alarm = false;
406+
} else if !state.will_wake {
383407
state.will_wake = true;
384408

385409
tracing::debug!(
@@ -525,14 +549,19 @@ async fn handle_stopped(
525549
.await?;
526550
}
527551

528-
if runtime::reschedule_actor(ctx, &input, state).await? {
529-
// Destroyed early
530-
return Ok(Some(runtime::LifecycleRes {
531-
generation: state.generation,
532-
// False here because if we received the destroy signal, it is
533-
// guaranteed that we did not allocate another actor.
534-
kill: false,
535-
}));
552+
match runtime::reschedule_actor(ctx, &input, state).await? {
553+
runtime::SpawnActorOutput::Allocated { .. } => {}
554+
// NOTE: Its not possible for `SpawnActorOutput::Sleep` to be returned here, the crash
555+
// policy is `Restart`.
556+
runtime::SpawnActorOutput::Sleep | runtime::SpawnActorOutput::Destroy => {
557+
// Destroyed early
558+
return Ok(Some(runtime::LifecycleRes {
559+
generation: state.generation,
560+
// False here because if we received the destroy signal, it is
561+
// guaranteed that we did not allocate another actor.
562+
kill: false,
563+
}));
564+
}
536565
}
537566
}
538567
(true, CrashPolicy::Sleep) => {
@@ -566,17 +595,26 @@ async fn handle_stopped(
566595
}
567596
// Rewake actor immediately after stopping if `will_wake` was set
568597
else if state.will_wake {
598+
state.sleeping = false;
569599
state.will_wake = false;
570600

571-
if runtime::reschedule_actor(ctx, &input, state).await? {
572-
// Destroyed early
573-
return Ok(Some(runtime::LifecycleRes {
574-
generation: state.generation,
575-
// False here because if we received the destroy signal, it is
576-
// guaranteed that we did not allocate another actor.
577-
kill: false,
578-
}));
601+
match runtime::reschedule_actor(ctx, &input, state).await? {
602+
runtime::SpawnActorOutput::Allocated { .. } => {}
603+
runtime::SpawnActorOutput::Sleep => {
604+
state.sleeping = true;
605+
}
606+
runtime::SpawnActorOutput::Destroy => {
607+
// Destroyed early
608+
return Ok(Some(runtime::LifecycleRes {
609+
generation: state.generation,
610+
// False here because if we received the destroy signal, it is
611+
// guaranteed that we did not allocate another actor.
612+
kill: false,
613+
}));
614+
}
579615
}
616+
617+
state.wake_for_alarm = false;
580618
}
581619

582620
Ok(None)

0 commit comments

Comments
 (0)