Skip to content

Commit 483d769

Browse files
committed
fix(runner): handle ack events message (#3108)
Fixes RVT-5232
1 parent f178a61 commit 483d769

File tree

2 files changed

+62
-54
lines changed
  • packages/services/pegboard/src/workflows
  • sdks/typescript/runner/src

2 files changed

+62
-54
lines changed

packages/services/pegboard/src/workflows/runner.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ pub const RUNNER_ELIGIBLE_THRESHOLD_MS: i64 = util::duration::seconds(10);
1616
/// How long to wait after last ping before forcibly removing a runner from the database and deleting its
1717
/// workflow, evicting all actors. Note that the runner may still be running and can reconnect.
1818
const RUNNER_LOST_THRESHOLD_MS: i64 = util::duration::minutes(2);
19+
/// Batch size of how many events to ack.
20+
const EVENT_ACK_BATCH_SIZE: i64 = 500;
1921

2022
#[derive(Debug, Serialize, Deserialize, Clone)]
2123
pub struct Input {
@@ -199,8 +201,12 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
199201

200202
state.last_event_idx = last_event_idx;
201203

202-
// Ack every 500 events
203-
if last_event_idx > state.last_event_ack_idx.saturating_add(500) {
204+
// Ack events in batch
205+
if last_event_idx
206+
> state
207+
.last_event_ack_idx
208+
.saturating_add(EVENT_ACK_BATCH_SIZE)
209+
{
204210
state.last_event_ack_idx = last_event_idx;
205211

206212
ctx.activity(SendMessageToRunnerInput {

sdks/typescript/runner/src/mod.ts

Lines changed: 54 additions & 52 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)