diff --git a/packages/services/pegboard/src/workflows/runner.rs b/packages/services/pegboard/src/workflows/runner.rs index 508ce01c4c..2bd68a981f 100644 --- a/packages/services/pegboard/src/workflows/runner.rs +++ b/packages/services/pegboard/src/workflows/runner.rs @@ -16,6 +16,8 @@ pub const RUNNER_ELIGIBLE_THRESHOLD_MS: i64 = util::duration::seconds(10); /// How long to wait after last ping before forcibly removing a runner from the database and deleting its /// workflow, evicting all actors. Note that the runner may still be running and can reconnect. const RUNNER_LOST_THRESHOLD_MS: i64 = util::duration::minutes(2); +/// Batch size of how many events to ack. +const EVENT_ACK_BATCH_SIZE: i64 = 500; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Input { @@ -199,8 +201,12 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> state.last_event_idx = last_event_idx; - // Ack every 500 events - if last_event_idx > state.last_event_ack_idx.saturating_add(500) { + // Ack events in batch + if last_event_idx + > state + .last_event_ack_idx + .saturating_add(EVENT_ACK_BATCH_SIZE) + { state.last_event_ack_idx = last_event_idx; ctx.activity(SendMessageToRunnerInput { diff --git a/sdks/typescript/runner/src/mod.ts b/sdks/typescript/runner/src/mod.ts index ee7c12d9ba..51cde134f2 100644 --- a/sdks/typescript/runner/src/mod.ts +++ b/sdks/typescript/runner/src/mod.ts @@ -10,6 +10,9 @@ import { setLogger, logger } from "./log.js"; const KV_EXPIRE: number = 30_000; const PROTOCOL_VERSION: number = 1; +/** Warn once the backlog significantly exceeds the server's ack batch size. */ +const EVENT_BACKLOG_WARN_THRESHOLD = 10_000; + export interface ActorInstance { actorId: string; generation: number; @@ -92,8 +95,8 @@ export class Runner { #runnerLostTimeout?: NodeJS.Timeout; // Event storage for resending - #eventHistory: { event: protocol.EventWrapper; timestamp: number }[] = []; - #eventPruneInterval?: NodeJS.Timeout; + #eventHistory: protocol.EventWrapper[] = []; + #eventBacklogWarned: boolean = false; // Command acknowledgment #ackInterval?: NodeJS.Timeout; @@ -112,12 +115,6 @@ export class Runner { this.#tunnel = new Tunnel(this); - // TODO(RVT-4986): Prune when server acks events - // Start pruning old events every minute - this.#eventPruneInterval = setInterval(() => { - this.#pruneOldEvents(); - }, 60000); // Run every minute - // Start cleaning up old unsent KV requests every 15 seconds this.#kvCleanupInterval = setInterval(() => { this.#cleanupOldKvRequests(); @@ -290,12 +287,6 @@ export class Runner { this.#ackInterval = undefined; } - // Clear event prune interval - if (this.#eventPruneInterval) { - clearInterval(this.#eventPruneInterval); - this.#eventPruneInterval = undefined; - } - // Clear KV cleanup interval if (this.#kvCleanupInterval) { clearInterval(this.#kvCleanupInterval); @@ -401,7 +392,8 @@ export class Runner { // MARK: Runner protocol async #openPegboardWebSocket() { const protocols = ["rivet", `rivet_target.runner`]; - if (this.config.token) protocols.push(`rivet_token.${this.config.token}`); + if (this.config.token) + protocols.push(`rivet_token.${this.config.token}`); const WS = await importWebSocket(); const ws = new WS(this.pegboardUrl, protocols) as any as WebSocket; @@ -521,7 +513,7 @@ export class Runner { const commands = message.val; this.#handleCommands(commands); } else if (message.tag === "ToClientAckEvents") { - throw new Error("TODO"); + this.#handleAckEvents(message.val); } else if (message.tag === "ToClientKvResponse") { const kvResponse = message.val; this.#handleKvResponse(kvResponse); @@ -622,6 +614,45 @@ export class Runner { } } + #handleAckEvents(ack: protocol.ToClientAckEvents) { + const lastAckedIdx = ack.lastEventIdx; + + const originalLength = this.#eventHistory.length; + this.#eventHistory = this.#eventHistory.filter( + (event) => event.index > lastAckedIdx, + ); + + const prunedCount = originalLength - this.#eventHistory.length; + if (prunedCount > 0) { + logger()?.info({ + msg: "pruned acknowledged events", + lastAckedIdx: lastAckedIdx.toString(), + prunedCount, + }); + } + + if (this.#eventHistory.length <= EVENT_BACKLOG_WARN_THRESHOLD) { + this.#eventBacklogWarned = false; + } + } + + /** Track events to send to the server in case we need to resend it on disconnect. */ + #recordEvent(eventWrapper: protocol.EventWrapper) { + this.#eventHistory.push(eventWrapper); + + if ( + this.#eventHistory.length > EVENT_BACKLOG_WARN_THRESHOLD && + !this.#eventBacklogWarned + ) { + this.#eventBacklogWarned = true; + logger()?.warn({ + msg: "unacknowledged event backlog exceeds threshold", + backlogSize: this.#eventHistory.length, + threshold: EVENT_BACKLOG_WARN_THRESHOLD, + }); + } + } + #handleCommandStartActor(commandWrapper: protocol.CommandWrapper) { const startCommand = commandWrapper.inner .val as protocol.CommandStartActor; @@ -713,11 +744,7 @@ export class Runner { }, }; - // Store event in history for potential resending - this.#eventHistory.push({ - event: eventWrapper, - timestamp: Date.now(), - }); + this.#recordEvent(eventWrapper); logger()?.info({ msg: "sending event to server", @@ -774,11 +801,7 @@ export class Runner { }, }; - // Store event in history for potential resending - this.#eventHistory.push({ - event: eventWrapper, - timestamp: Date.now(), - }); + this.#recordEvent(eventWrapper); logger()?.info({ msg: "sending event to server", @@ -1131,11 +1154,7 @@ export class Runner { }, }; - // Store event in history for potential resending - this.#eventHistory.push({ - event: eventWrapper, - timestamp: Date.now(), - }); + this.#recordEvent(eventWrapper); this.__sendToServer({ tag: "ToServerEvents", @@ -1261,7 +1280,7 @@ export class Runner { tag: "ToServerlessServerInit", val: { runnerId: this.runnerId, - } + }, }); // Embed version @@ -1269,7 +1288,7 @@ export class Runner { buffer.writeUInt16LE(PROTOCOL_VERSION, 0); Buffer.from(data).copy(buffer, 2); - return buffer.toString('base64'); + return buffer.toString("base64"); } #scheduleReconnect() { @@ -1302,7 +1321,7 @@ export class Runner { #resendUnacknowledgedEvents(lastEventIdx: bigint) { const eventsToResend = this.#eventHistory.filter( - (item) => item.event.index > lastEventIdx, + (event) => event.index > lastEventIdx, ); if (eventsToResend.length === 0) return; @@ -1312,29 +1331,12 @@ export class Runner { //); // Resend events in batches - const events = eventsToResend.map((item) => item.event); this.__sendToServer({ tag: "ToServerEvents", - val: events, + val: eventsToResend, }); } - // TODO(RVT-4986): Prune when server acks events instead of based on old events - #pruneOldEvents() { - const fiveMinutesAgo = Date.now() - 5 * 60 * 1000; - const originalLength = this.#eventHistory.length; - - // Remove events older than 5 minutes - this.#eventHistory = this.#eventHistory.filter( - (item) => item.timestamp > fiveMinutesAgo, - ); - - const prunedCount = originalLength - this.#eventHistory.length; - if (prunedCount > 0) { - //logger()?.log(`Pruned ${prunedCount} old events from history`); - } - } - #cleanupOldKvRequests() { const thirtySecondsAgo = Date.now() - KV_EXPIRE; const toDelete: number[] = [];