diff --git a/packages/common/types/src/msgs/pegboard.rs b/packages/common/types/src/msgs/pegboard.rs index ece441706b..0ea2d1154c 100644 --- a/packages/common/types/src/msgs/pegboard.rs +++ b/packages/common/types/src/msgs/pegboard.rs @@ -1,4 +1,5 @@ use gas::prelude::*; +// TODO: Add namespace + runner name to this struct so bumps can be more targeted #[message("pegboard_bump_serverless_autoscaler")] pub struct BumpServerlessAutoscaler {} diff --git a/packages/core/guard/server/src/routing/mod.rs b/packages/core/guard/server/src/routing/mod.rs index 057a54e54f..ac61338bd1 100644 --- a/packages/core/guard/server/src/routing/mod.rs +++ b/packages/core/guard/server/src/routing/mod.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use anyhow::*; use gas::prelude::*; use hyper::header::HeaderName; -use hyper::{Request, body::Incoming as BodyIncoming}; use rivet_guard_core::RoutingFn; use crate::{errors, shared_state::SharedState}; diff --git a/packages/core/pegboard-serverless/src/lib.rs b/packages/core/pegboard-serverless/src/lib.rs index 6be3b778b2..52ec7b47d1 100644 --- a/packages/core/pegboard-serverless/src/lib.rs +++ b/packages/core/pegboard-serverless/src/lib.rs @@ -279,22 +279,18 @@ async fn outbound_handler( v.parse::().ok()?, )) }) - .chain(std::iter::once(( - X_RIVET_ENDPOINT, - HeaderValue::try_from(current_dc.public_url.to_string())?, - ))) - .chain(std::iter::once(( - X_RIVET_TOTAL_SLOTS, - HeaderValue::try_from(slots_per_runner)?, - ))) - .chain(std::iter::once(( - X_RIVET_RUNNER_NAME, - HeaderValue::try_from(runner_name)?, - ))) - .chain(std::iter::once(( - X_RIVET_NAMESPACE_ID, - HeaderValue::try_from(namespace_name)?, - ))) + .chain([ + ( + X_RIVET_ENDPOINT, + HeaderValue::try_from(current_dc.public_url.to_string())?, + ), + ( + X_RIVET_TOTAL_SLOTS, + HeaderValue::try_from(slots_per_runner)?, + ), + (X_RIVET_RUNNER_NAME, HeaderValue::try_from(runner_name)?), + (X_RIVET_NAMESPACE_ID, HeaderValue::try_from(namespace_name)?), + ]) // Add token if auth is enabled .chain( ctx.config() @@ -310,7 +306,7 @@ async fn outbound_handler( ) .collect(); - let mut req = client.get(url).headers(headers); + let req = client.get(url).headers(headers); let mut source = sse::EventSource::new(req).context("failed creating event source")?; let mut runner_id = None; diff --git a/packages/services/pegboard/src/workflows/actor/destroy.rs b/packages/services/pegboard/src/workflows/actor/destroy.rs index fe85bfb517..0a18fa6cf0 100644 --- a/packages/services/pegboard/src/workflows/actor/destroy.rs +++ b/packages/services/pegboard/src/workflows/actor/destroy.rs @@ -38,6 +38,14 @@ pub(crate) async fn pegboard_actor_destroy(ctx: &mut WorkflowCtx, input: &Input) kill(ctx, input.actor_id, input.generation, runner_workflow_id).await?; } + // If a slot was allocated at the time of actor destruction then bump the serverless autoscaler so it can scale down + // if needed + if res.allocated_serverless_slot { + ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) + .send() + .await?; + } + // Clear KV ctx.activity(ClearKvInput { actor_id: input.actor_id, @@ -60,6 +68,7 @@ struct UpdateStateAndDbInput { #[derive(Debug, Serialize, Deserialize, Hash)] struct UpdateStateAndDbOutput { runner_workflow_id: Option, + allocated_serverless_slot: bool, } #[activity(UpdateStateAndDb)] @@ -89,6 +98,17 @@ async fn update_state_and_db( &tx, ) .await?; + } else if state.allocated_slot { + // Clear the serverless slot even if we do not have a runner id. This happens when the + // actor is destroyed while pending allocation + tx.atomic_op( + &rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new( + state.namespace_id, + state.runner_name_selector.clone(), + ), + &(-1i64).to_le_bytes(), + MutationType::Add, + ); } // Update namespace indexes @@ -125,7 +145,13 @@ async fn update_state_and_db( state.runner_id = None; let runner_workflow_id = state.runner_workflow_id.take(); - Ok(UpdateStateAndDbOutput { runner_workflow_id }) + let old_allocated_slot = state.allocated_slot; + state.allocated_slot = false; + + Ok(UpdateStateAndDbOutput { + runner_workflow_id, + allocated_serverless_slot: state.for_serverless && old_allocated_slot, + }) } #[derive(Debug, Serialize, Deserialize, Hash)] diff --git a/packages/services/pegboard/src/workflows/actor/mod.rs b/packages/services/pegboard/src/workflows/actor/mod.rs index 494129398b..709313735e 100644 --- a/packages/services/pegboard/src/workflows/actor/mod.rs +++ b/packages/services/pegboard/src/workflows/actor/mod.rs @@ -48,6 +48,8 @@ pub struct State { #[serde(default)] pub for_serverless: bool, + #[serde(default)] + pub allocated_slot: bool, pub start_ts: Option, // NOTE: This is not the alarm ts, this is when the actor started sleeping. See `LifecycleState` for alarm @@ -83,6 +85,7 @@ impl State { create_complete_ts: None, for_serverless: false, + allocated_slot: false, start_ts: None, pending_allocation_ts: None, @@ -471,26 +474,36 @@ async fn handle_stopped( state.runner_id = None; state.runner_workflow_id = None; - ctx.activity(runtime::DeallocateInput { - actor_id: input.actor_id, - }) - .await?; + let deallocate_res = ctx + .activity(runtime::DeallocateInput { + actor_id: input.actor_id, + }) + .await?; // Allocate other pending actors from queue since a slot has now cleared - let res = ctx + let allocate_pending_res = ctx .activity(AllocatePendingActorsInput { namespace_id: input.namespace_id, name: input.runner_name_selector.clone(), }) .await?; - // Dispatch pending allocs (if any) - for alloc in res.allocations { - ctx.signal(alloc.signal) - .to_workflow::() - .tag("actor_id", alloc.actor_id) - .send() - .await?; + if allocate_pending_res.allocations.is_empty() { + // Bump autoscaler so it can scale down if needed + if deallocate_res.for_serverless { + ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) + .send() + .await?; + } + } else { + // Dispatch pending allocs (if any) + for alloc in allocate_pending_res.allocations { + ctx.signal(alloc.signal) + .to_workflow::() + .tag("actor_id", alloc.actor_id) + .send() + .await?; + } } // Handle rescheduling if not marked as sleeping diff --git a/packages/services/pegboard/src/workflows/actor/runtime.rs b/packages/services/pegboard/src/workflows/actor/runtime.rs index c31556b133..0d5ff45ce3 100644 --- a/packages/services/pegboard/src/workflows/actor/runtime.rs +++ b/packages/services/pegboard/src/workflows/actor/runtime.rs @@ -294,6 +294,7 @@ async fn allocate_actor( .record(dt, &[KeyValue::new("did_reserve", res.is_ok().to_string())]); state.for_serverless = for_serverless; + state.allocated_slot = true; match &res { Ok(res) => { @@ -339,8 +340,13 @@ pub struct DeallocateInput { pub actor_id: Id, } +#[derive(Debug, Serialize, Deserialize)] +pub struct DeallocateOutput { + pub for_serverless: bool, +} + #[activity(Deallocate)] -pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<()> { +pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result { let mut state = ctx.state::()?; let runner_name_selector = &state.runner_name_selector; let namespace_id = state.namespace_id; @@ -353,8 +359,8 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<() tx.delete(&keys::actor::ConnectableKey::new(input.actor_id)); + // Only clear slot if we have a runner id if let Some(runner_id) = runner_id { - // Only clear slot if we have a runner id destroy::clear_slot( input.actor_id, namespace_id, @@ -374,8 +380,12 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<() state.connectable_ts = None; state.runner_id = None; state.runner_workflow_id = None; + // Slot was cleared by the above txn + state.allocated_slot = false; - Ok(()) + Ok(DeallocateOutput { + for_serverless: state.for_serverless, + }) } /// Returns None if a destroy signal was received while pending for allocation. @@ -393,6 +403,11 @@ pub async fn spawn_actor( }) .await?; + // Always bump the autoscaler so it can scale up + ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) + .send() + .await?; + let allocate_res = match allocate_res { Ok(x) => x, Err(pending_allocation_ts) => { @@ -401,10 +416,6 @@ pub async fn spawn_actor( "failed to allocate (no availability), waiting for allocation", ); - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() - .await?; - // If allocation fails, the allocate txn already inserted this actor into the queue. Now we wait for // an `Allocate` signal match ctx.listen::().await? { diff --git a/packages/services/pegboard/src/workflows/runner.rs b/packages/services/pegboard/src/workflows/runner.rs index 91aea6c390..508ce01c4c 100644 --- a/packages/services/pegboard/src/workflows/runner.rs +++ b/packages/services/pegboard/src/workflows/runner.rs @@ -162,6 +162,10 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> // NOTE: This should not be parallelized because signals should be sent in order // Forward to actor workflows for event in &events { + if event.index <= state.last_event_idx { + tracing::warn!(idx=%event.index, "event already received, ignoring"); + } + let actor_id = crate::utils::event_actor_id(&event.inner).to_string(); let res = ctx @@ -186,29 +190,28 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> } } - if !events.is_empty() { + // Check if events is empty + if let Some(last_event_idx) = events.last().map(|event| event.index) { ctx.activity(InsertEventsInput { events: events.clone(), }) .await?; + state.last_event_idx = last_event_idx; + // Ack every 500 events - let last_event_idx = events.last().map(|event| event.index); - if let Some(last_event_idx) = last_event_idx { - if last_event_idx > state.last_event_ack_idx.saturating_add(500) - { - state.last_event_ack_idx = last_event_idx; - - ctx.activity(SendMessageToRunnerInput { - runner_id: input.runner_id, - message: protocol::ToClient::ToClientAckEvents( - protocol::ToClientAckEvents { - last_event_idx: state.last_event_ack_idx, - }, - ), - }) - .await?; - } + if last_event_idx > state.last_event_ack_idx.saturating_add(500) { + state.last_event_ack_idx = last_event_idx; + + ctx.activity(SendMessageToRunnerInput { + runner_id: input.runner_id, + message: protocol::ToClient::ToClientAckEvents( + protocol::ToClientAckEvents { + last_event_idx: state.last_event_ack_idx, + }, + ), + }) + .await?; } } } @@ -436,6 +439,7 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> #[derive(Debug, Serialize, Deserialize)] struct LifecycleState { draining: bool, + last_event_idx: i64, last_event_ack_idx: i64, } @@ -443,6 +447,7 @@ impl LifecycleState { fn new() -> Self { LifecycleState { draining: false, + last_event_idx: -1, last_event_ack_idx: -1, } }