Skip to content

Commit 3d91fa5

Browse files
committed
fix(serverless): scale to 0, fix serverless slot count edgecase of destroying pending actor (#3092)
1 parent bd7a983 commit 3d91fa5

File tree

7 files changed

+106
-55
lines changed

7 files changed

+106
-55
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use gas::prelude::*;
22

3+
// TODO: Add namespace + runner name to this struct so bumps can be more targeted
34
#[message("pegboard_bump_serverless_autoscaler")]
45
pub struct BumpServerlessAutoscaler {}

packages/core/guard/server/src/routing/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::sync::Arc;
33
use anyhow::*;
44
use gas::prelude::*;
55
use hyper::header::HeaderName;
6-
use hyper::{Request, body::Incoming as BodyIncoming};
76
use rivet_guard_core::RoutingFn;
87

98
use crate::{errors, shared_state::SharedState};

packages/core/pegboard-serverless/src/lib.rs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -279,22 +279,18 @@ async fn outbound_handler(
279279
v.parse::<HeaderValue>().ok()?,
280280
))
281281
})
282-
.chain(std::iter::once((
283-
X_RIVET_ENDPOINT,
284-
HeaderValue::try_from(current_dc.public_url.to_string())?,
285-
)))
286-
.chain(std::iter::once((
287-
X_RIVET_TOTAL_SLOTS,
288-
HeaderValue::try_from(slots_per_runner)?,
289-
)))
290-
.chain(std::iter::once((
291-
X_RIVET_RUNNER_NAME,
292-
HeaderValue::try_from(runner_name)?,
293-
)))
294-
.chain(std::iter::once((
295-
X_RIVET_NAMESPACE_ID,
296-
HeaderValue::try_from(namespace_name)?,
297-
)))
282+
.chain([
283+
(
284+
X_RIVET_ENDPOINT,
285+
HeaderValue::try_from(current_dc.public_url.to_string())?,
286+
),
287+
(
288+
X_RIVET_TOTAL_SLOTS,
289+
HeaderValue::try_from(slots_per_runner)?,
290+
),
291+
(X_RIVET_RUNNER_NAME, HeaderValue::try_from(runner_name)?),
292+
(X_RIVET_NAMESPACE_ID, HeaderValue::try_from(namespace_name)?),
293+
])
298294
// Add token if auth is enabled
299295
.chain(
300296
ctx.config()
@@ -310,7 +306,7 @@ async fn outbound_handler(
310306
)
311307
.collect();
312308

313-
let mut req = client.get(url).headers(headers);
309+
let req = client.get(url).headers(headers);
314310

315311
let mut source = sse::EventSource::new(req).context("failed creating event source")?;
316312
let mut runner_id = None;

packages/services/pegboard/src/workflows/actor/destroy.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ pub(crate) async fn pegboard_actor_destroy(ctx: &mut WorkflowCtx, input: &Input)
3838
kill(ctx, input.actor_id, input.generation, runner_workflow_id).await?;
3939
}
4040

41+
// If a slot was allocated at the time of actor destruction then bump the serverless autoscaler so it can scale down
42+
// if needed
43+
if res.allocated_serverless_slot {
44+
ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {})
45+
.send()
46+
.await?;
47+
}
48+
4149
// Clear KV
4250
ctx.activity(ClearKvInput {
4351
actor_id: input.actor_id,
@@ -60,6 +68,7 @@ struct UpdateStateAndDbInput {
6068
#[derive(Debug, Serialize, Deserialize, Hash)]
6169
struct UpdateStateAndDbOutput {
6270
runner_workflow_id: Option<Id>,
71+
allocated_serverless_slot: bool,
6372
}
6473

6574
#[activity(UpdateStateAndDb)]
@@ -89,6 +98,17 @@ async fn update_state_and_db(
8998
&tx,
9099
)
91100
.await?;
101+
} else if state.allocated_slot {
102+
// Clear the serverless slot even if we do not have a runner id. This happens when the
103+
// actor is destroyed while pending allocation
104+
tx.atomic_op(
105+
&rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new(
106+
state.namespace_id,
107+
state.runner_name_selector.clone(),
108+
),
109+
&(-1i64).to_le_bytes(),
110+
MutationType::Add,
111+
);
92112
}
93113

94114
// Update namespace indexes
@@ -125,7 +145,13 @@ async fn update_state_and_db(
125145
state.runner_id = None;
126146
let runner_workflow_id = state.runner_workflow_id.take();
127147

128-
Ok(UpdateStateAndDbOutput { runner_workflow_id })
148+
let old_allocated_slot = state.allocated_slot;
149+
state.allocated_slot = false;
150+
151+
Ok(UpdateStateAndDbOutput {
152+
runner_workflow_id,
153+
allocated_serverless_slot: state.for_serverless && old_allocated_slot,
154+
})
129155
}
130156

131157
#[derive(Debug, Serialize, Deserialize, Hash)]

packages/services/pegboard/src/workflows/actor/mod.rs

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ pub struct State {
4848

4949
#[serde(default)]
5050
pub for_serverless: bool,
51+
#[serde(default)]
52+
pub allocated_slot: bool,
5153

5254
pub start_ts: Option<i64>,
5355
// NOTE: This is not the alarm ts, this is when the actor started sleeping. See `LifecycleState` for alarm
@@ -83,6 +85,7 @@ impl State {
8385
create_complete_ts: None,
8486

8587
for_serverless: false,
88+
allocated_slot: false,
8689

8790
start_ts: None,
8891
pending_allocation_ts: None,
@@ -471,26 +474,36 @@ async fn handle_stopped(
471474
state.runner_id = None;
472475
state.runner_workflow_id = None;
473476

474-
ctx.activity(runtime::DeallocateInput {
475-
actor_id: input.actor_id,
476-
})
477-
.await?;
477+
let deallocate_res = ctx
478+
.activity(runtime::DeallocateInput {
479+
actor_id: input.actor_id,
480+
})
481+
.await?;
478482

479483
// Allocate other pending actors from queue since a slot has now cleared
480-
let res = ctx
484+
let allocate_pending_res = ctx
481485
.activity(AllocatePendingActorsInput {
482486
namespace_id: input.namespace_id,
483487
name: input.runner_name_selector.clone(),
484488
})
485489
.await?;
486490

487-
// Dispatch pending allocs (if any)
488-
for alloc in res.allocations {
489-
ctx.signal(alloc.signal)
490-
.to_workflow::<Workflow>()
491-
.tag("actor_id", alloc.actor_id)
492-
.send()
493-
.await?;
491+
if allocate_pending_res.allocations.is_empty() {
492+
// Bump autoscaler so it can scale down if needed
493+
if deallocate_res.for_serverless {
494+
ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {})
495+
.send()
496+
.await?;
497+
}
498+
} else {
499+
// Dispatch pending allocs (if any)
500+
for alloc in allocate_pending_res.allocations {
501+
ctx.signal(alloc.signal)
502+
.to_workflow::<Workflow>()
503+
.tag("actor_id", alloc.actor_id)
504+
.send()
505+
.await?;
506+
}
494507
}
495508

496509
// Handle rescheduling if not marked as sleeping

packages/services/pegboard/src/workflows/actor/runtime.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ async fn allocate_actor(
294294
.record(dt, &[KeyValue::new("did_reserve", res.is_ok().to_string())]);
295295

296296
state.for_serverless = for_serverless;
297+
state.allocated_slot = true;
297298

298299
match &res {
299300
Ok(res) => {
@@ -339,8 +340,13 @@ pub struct DeallocateInput {
339340
pub actor_id: Id,
340341
}
341342

343+
#[derive(Debug, Serialize, Deserialize)]
344+
pub struct DeallocateOutput {
345+
pub for_serverless: bool,
346+
}
347+
342348
#[activity(Deallocate)]
343-
pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<()> {
349+
pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<DeallocateOutput> {
344350
let mut state = ctx.state::<State>()?;
345351
let runner_name_selector = &state.runner_name_selector;
346352
let namespace_id = state.namespace_id;
@@ -353,8 +359,8 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<()
353359

354360
tx.delete(&keys::actor::ConnectableKey::new(input.actor_id));
355361

362+
// Only clear slot if we have a runner id
356363
if let Some(runner_id) = runner_id {
357-
// Only clear slot if we have a runner id
358364
destroy::clear_slot(
359365
input.actor_id,
360366
namespace_id,
@@ -374,8 +380,12 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<()
374380
state.connectable_ts = None;
375381
state.runner_id = None;
376382
state.runner_workflow_id = None;
383+
// Slot was cleared by the above txn
384+
state.allocated_slot = false;
377385

378-
Ok(())
386+
Ok(DeallocateOutput {
387+
for_serverless: state.for_serverless,
388+
})
379389
}
380390

381391
/// Returns None if a destroy signal was received while pending for allocation.
@@ -393,6 +403,11 @@ pub async fn spawn_actor(
393403
})
394404
.await?;
395405

406+
// Always bump the autoscaler so it can scale up
407+
ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {})
408+
.send()
409+
.await?;
410+
396411
let allocate_res = match allocate_res {
397412
Ok(x) => x,
398413
Err(pending_allocation_ts) => {
@@ -401,10 +416,6 @@ pub async fn spawn_actor(
401416
"failed to allocate (no availability), waiting for allocation",
402417
);
403418

404-
ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {})
405-
.send()
406-
.await?;
407-
408419
// If allocation fails, the allocate txn already inserted this actor into the queue. Now we wait for
409420
// an `Allocate` signal
410421
match ctx.listen::<PendingAllocation>().await? {

packages/services/pegboard/src/workflows/runner.rs

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
162162
// NOTE: This should not be parallelized because signals should be sent in order
163163
// Forward to actor workflows
164164
for event in &events {
165+
if event.index <= state.last_event_idx {
166+
tracing::warn!(idx=%event.index, "event already received, ignoring");
167+
}
168+
165169
let actor_id =
166170
crate::utils::event_actor_id(&event.inner).to_string();
167171
let res = ctx
@@ -186,29 +190,28 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
186190
}
187191
}
188192

189-
if !events.is_empty() {
193+
// Check if events is empty
194+
if let Some(last_event_idx) = events.last().map(|event| event.index) {
190195
ctx.activity(InsertEventsInput {
191196
events: events.clone(),
192197
})
193198
.await?;
194199

200+
state.last_event_idx = last_event_idx;
201+
195202
// Ack every 500 events
196-
let last_event_idx = events.last().map(|event| event.index);
197-
if let Some(last_event_idx) = last_event_idx {
198-
if last_event_idx > state.last_event_ack_idx.saturating_add(500)
199-
{
200-
state.last_event_ack_idx = last_event_idx;
201-
202-
ctx.activity(SendMessageToRunnerInput {
203-
runner_id: input.runner_id,
204-
message: protocol::ToClient::ToClientAckEvents(
205-
protocol::ToClientAckEvents {
206-
last_event_idx: state.last_event_ack_idx,
207-
},
208-
),
209-
})
210-
.await?;
211-
}
203+
if last_event_idx > state.last_event_ack_idx.saturating_add(500) {
204+
state.last_event_ack_idx = last_event_idx;
205+
206+
ctx.activity(SendMessageToRunnerInput {
207+
runner_id: input.runner_id,
208+
message: protocol::ToClient::ToClientAckEvents(
209+
protocol::ToClientAckEvents {
210+
last_event_idx: state.last_event_ack_idx,
211+
},
212+
),
213+
})
214+
.await?;
212215
}
213216
}
214217
}
@@ -436,13 +439,15 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
436439
#[derive(Debug, Serialize, Deserialize)]
437440
struct LifecycleState {
438441
draining: bool,
442+
last_event_idx: i64,
439443
last_event_ack_idx: i64,
440444
}
441445

442446
impl LifecycleState {
443447
fn new() -> Self {
444448
LifecycleState {
445449
draining: false,
450+
last_event_idx: -1,
446451
last_event_ack_idx: -1,
447452
}
448453
}

0 commit comments

Comments
 (0)