Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/common/types/src/msgs/pegboard.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use gas::prelude::*;

// TODO: Add namespace + runner name to this struct so bumps can be more targeted
#[message("pegboard_bump_serverless_autoscaler")]
pub struct BumpServerlessAutoscaler {}
1 change: 0 additions & 1 deletion packages/core/guard/server/src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::sync::Arc;
use anyhow::*;
use gas::prelude::*;
use hyper::header::HeaderName;
use hyper::{Request, body::Incoming as BodyIncoming};
use rivet_guard_core::RoutingFn;

use crate::{errors, shared_state::SharedState};
Expand Down
30 changes: 13 additions & 17 deletions packages/core/pegboard-serverless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,22 +279,18 @@ async fn outbound_handler(
v.parse::<HeaderValue>().ok()?,
))
})
.chain(std::iter::once((
X_RIVET_ENDPOINT,
HeaderValue::try_from(current_dc.public_url.to_string())?,
)))
.chain(std::iter::once((
X_RIVET_TOTAL_SLOTS,
HeaderValue::try_from(slots_per_runner)?,
)))
.chain(std::iter::once((
X_RIVET_RUNNER_NAME,
HeaderValue::try_from(runner_name)?,
)))
.chain(std::iter::once((
X_RIVET_NAMESPACE_ID,
HeaderValue::try_from(namespace_name)?,
)))
.chain([
(
X_RIVET_ENDPOINT,
HeaderValue::try_from(current_dc.public_url.to_string())?,
),
(
X_RIVET_TOTAL_SLOTS,
HeaderValue::try_from(slots_per_runner)?,
),
(X_RIVET_RUNNER_NAME, HeaderValue::try_from(runner_name)?),
(X_RIVET_NAMESPACE_ID, HeaderValue::try_from(namespace_name)?),
])
// Add token if auth is enabled
.chain(
ctx.config()
Expand All @@ -310,7 +306,7 @@ async fn outbound_handler(
)
.collect();

let mut req = client.get(url).headers(headers);
let req = client.get(url).headers(headers);

let mut source = sse::EventSource::new(req).context("failed creating event source")?;
let mut runner_id = None;
Expand Down
28 changes: 27 additions & 1 deletion packages/services/pegboard/src/workflows/actor/destroy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ pub(crate) async fn pegboard_actor_destroy(ctx: &mut WorkflowCtx, input: &Input)
kill(ctx, input.actor_id, input.generation, runner_workflow_id).await?;
}

// If a slot was allocated at the time of actor destruction then bump the serverless autoscaler so it can scale down
// if needed
if res.allocated_serverless_slot {
ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {})
.send()
.await?;
}

// Clear KV
ctx.activity(ClearKvInput {
actor_id: input.actor_id,
Expand All @@ -60,6 +68,7 @@ struct UpdateStateAndDbInput {
#[derive(Debug, Serialize, Deserialize, Hash)]
struct UpdateStateAndDbOutput {
runner_workflow_id: Option<Id>,
allocated_serverless_slot: bool,
}

#[activity(UpdateStateAndDb)]
Expand Down Expand Up @@ -89,6 +98,17 @@ async fn update_state_and_db(
&tx,
)
.await?;
} else if state.allocated_slot {
// Clear the serverless slot even if we do not have a runner id. This happens when the
// actor is destroyed while pending allocation
tx.atomic_op(
&rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new(
state.namespace_id,
state.runner_name_selector.clone(),
),
&(-1i64).to_le_bytes(),
MutationType::Add,
);
}

// Update namespace indexes
Expand Down Expand Up @@ -125,7 +145,13 @@ async fn update_state_and_db(
state.runner_id = None;
let runner_workflow_id = state.runner_workflow_id.take();

Ok(UpdateStateAndDbOutput { runner_workflow_id })
let old_allocated_slot = state.allocated_slot;
state.allocated_slot = false;

Ok(UpdateStateAndDbOutput {
runner_workflow_id,
allocated_serverless_slot: state.for_serverless && old_allocated_slot,
})
}

#[derive(Debug, Serialize, Deserialize, Hash)]
Expand Down
37 changes: 25 additions & 12 deletions packages/services/pegboard/src/workflows/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub struct State {

#[serde(default)]
pub for_serverless: bool,
#[serde(default)]
pub allocated_slot: bool,

pub start_ts: Option<i64>,
// NOTE: This is not the alarm ts, this is when the actor started sleeping. See `LifecycleState` for alarm
Expand Down Expand Up @@ -83,6 +85,7 @@ impl State {
create_complete_ts: None,

for_serverless: false,
allocated_slot: false,

start_ts: None,
pending_allocation_ts: None,
Expand Down Expand Up @@ -471,26 +474,36 @@ async fn handle_stopped(
state.runner_id = None;
state.runner_workflow_id = None;

ctx.activity(runtime::DeallocateInput {
actor_id: input.actor_id,
})
.await?;
let deallocate_res = ctx
.activity(runtime::DeallocateInput {
actor_id: input.actor_id,
})
.await?;

// Allocate other pending actors from queue since a slot has now cleared
let res = ctx
let allocate_pending_res = ctx
.activity(AllocatePendingActorsInput {
namespace_id: input.namespace_id,
name: input.runner_name_selector.clone(),
})
.await?;

// Dispatch pending allocs (if any)
for alloc in res.allocations {
ctx.signal(alloc.signal)
.to_workflow::<Workflow>()
.tag("actor_id", alloc.actor_id)
.send()
.await?;
if allocate_pending_res.allocations.is_empty() {
// Bump autoscaler so it can scale down if needed
if deallocate_res.for_serverless {
ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {})
.send()
.await?;
}
} else {
// Dispatch pending allocs (if any)
for alloc in allocate_pending_res.allocations {
ctx.signal(alloc.signal)
.to_workflow::<Workflow>()
.tag("actor_id", alloc.actor_id)
.send()
.await?;
}
}

// Handle rescheduling if not marked as sleeping
Expand Down
25 changes: 18 additions & 7 deletions packages/services/pegboard/src/workflows/actor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ async fn allocate_actor(
.record(dt, &[KeyValue::new("did_reserve", res.is_ok().to_string())]);

state.for_serverless = for_serverless;
state.allocated_slot = true;

match &res {
Ok(res) => {
Expand Down Expand Up @@ -339,8 +340,13 @@ pub struct DeallocateInput {
pub actor_id: Id,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct DeallocateOutput {
pub for_serverless: bool,
}

#[activity(Deallocate)]
pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<()> {
pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<DeallocateOutput> {
let mut state = ctx.state::<State>()?;
let runner_name_selector = &state.runner_name_selector;
let namespace_id = state.namespace_id;
Expand All @@ -353,8 +359,8 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<()

tx.delete(&keys::actor::ConnectableKey::new(input.actor_id));

// Only clear slot if we have a runner id
if let Some(runner_id) = runner_id {
// Only clear slot if we have a runner id
destroy::clear_slot(
input.actor_id,
namespace_id,
Expand All @@ -374,8 +380,12 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<()
state.connectable_ts = None;
state.runner_id = None;
state.runner_workflow_id = None;
// Slot was cleared by the above txn
state.allocated_slot = false;

Ok(())
Ok(DeallocateOutput {
for_serverless: state.for_serverless,
})
}

/// Returns None if a destroy signal was received while pending for allocation.
Expand All @@ -393,6 +403,11 @@ pub async fn spawn_actor(
})
.await?;

// Always bump the autoscaler so it can scale up
ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {})
.send()
.await?;

let allocate_res = match allocate_res {
Ok(x) => x,
Err(pending_allocation_ts) => {
Expand All @@ -401,10 +416,6 @@ pub async fn spawn_actor(
"failed to allocate (no availability), waiting for allocation",
);

ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {})
.send()
.await?;

// If allocation fails, the allocate txn already inserted this actor into the queue. Now we wait for
// an `Allocate` signal
match ctx.listen::<PendingAllocation>().await? {
Expand Down
39 changes: 22 additions & 17 deletions packages/services/pegboard/src/workflows/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
// NOTE: This should not be parallelized because signals should be sent in order
// Forward to actor workflows
for event in &events {
if event.index <= state.last_event_idx {
tracing::warn!(idx=%event.index, "event already received, ignoring");
}

let actor_id =
crate::utils::event_actor_id(&event.inner).to_string();
let res = ctx
Expand All @@ -186,29 +190,28 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
}
}

if !events.is_empty() {
// Check if events is empty
if let Some(last_event_idx) = events.last().map(|event| event.index) {
ctx.activity(InsertEventsInput {
events: events.clone(),
})
.await?;

state.last_event_idx = last_event_idx;

// Ack every 500 events
let last_event_idx = events.last().map(|event| event.index);
if let Some(last_event_idx) = last_event_idx {
if last_event_idx > state.last_event_ack_idx.saturating_add(500)
{
state.last_event_ack_idx = last_event_idx;

ctx.activity(SendMessageToRunnerInput {
runner_id: input.runner_id,
message: protocol::ToClient::ToClientAckEvents(
protocol::ToClientAckEvents {
last_event_idx: state.last_event_ack_idx,
},
),
})
.await?;
}
if last_event_idx > state.last_event_ack_idx.saturating_add(500) {
state.last_event_ack_idx = last_event_idx;

ctx.activity(SendMessageToRunnerInput {
runner_id: input.runner_id,
message: protocol::ToClient::ToClientAckEvents(
protocol::ToClientAckEvents {
last_event_idx: state.last_event_ack_idx,
},
),
})
.await?;
}
}
}
Expand Down Expand Up @@ -436,13 +439,15 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
#[derive(Debug, Serialize, Deserialize)]
struct LifecycleState {
draining: bool,
last_event_idx: i64,
last_event_ack_idx: i64,
}

impl LifecycleState {
fn new() -> Self {
LifecycleState {
draining: false,
last_event_idx: -1,
last_event_ack_idx: -1,
}
}
Expand Down
Loading