diff --git a/sdks/typescript/runner/src/tunnel.ts b/sdks/typescript/runner/src/tunnel.ts index f8bd3c1009..99e934e1ff 100644 --- a/sdks/typescript/runner/src/tunnel.ts +++ b/sdks/typescript/runner/src/tunnel.ts @@ -16,7 +16,7 @@ interface PendingRequest { actorId?: string; } -interface PendingMessage { +interface PendingTunnelMessage { sentAt: number; requestIdStr: string; } @@ -24,10 +24,14 @@ interface PendingMessage { export class Tunnel { #runner: Runner; + /** Requests over the tunnel to the actor that are in flight. */ #actorPendingRequests: Map = new Map(); + /** WebSockets over the tunnel to the actor that are in flight. */ #actorWebSockets: Map = new Map(); - #pendingMessages: Map = new Map(); + /** Messages sent from the actor over the tunnel that have not been acked by the gateway. */ + #pendingTunnelMessages: Map = new Map(); + #gcInterval?: NodeJS.Timeout; constructor(runner: Runner) { @@ -65,7 +69,9 @@ export class Tunnel { ) { // TODO: Switch this with runner WS if (!this.#runner.__webSocketReady()) { - console.warn("Cannot send tunnel message, WebSocket not connected"); + logger()?.warn( + "cannot send tunnel message, socket not connected to engine", + ); return; } @@ -73,7 +79,7 @@ export class Tunnel { const messageId = generateUuidBuffer(); const requestIdStr = bufferToString(requestId); - this.#pendingMessages.set(bufferToString(messageId), { + this.#pendingTunnelMessages.set(bufferToString(messageId), { sentAt: Date.now(), requestIdStr, }); @@ -121,7 +127,7 @@ export class Tunnel { const now = Date.now(); const messagesToDelete: string[] = []; - for (const [messageId, pendingMessage] of this.#pendingMessages) { + for (const [messageId, pendingMessage] of this.#pendingTunnelMessages) { // Check if message is older than timeout if (now - pendingMessage.sentAt > MESSAGE_ACK_TIMEOUT) { messagesToDelete.push(messageId); @@ -161,9 +167,14 @@ export class Tunnel { } // Remove timed out messages - for (const messageId of messagesToDelete) { - this.#pendingMessages.delete(messageId); - console.warn(`Purged unacked message: ${messageId}`); + if (messagesToDelete.length > 0) { + logger()?.warn({ + msg: "purging unacked tunnel messages, this indicates that the Rivet Engine is disconnected or not responding", + count: messagesToDelete.length, + }); + for (const messageId of messagesToDelete) { + this.#pendingTunnelMessages.delete(messageId); + } } } @@ -214,9 +225,9 @@ export class Tunnel { if (message.messageKind.tag === "TunnelAck") { // Mark pending message as acknowledged and remove it const msgIdStr = bufferToString(message.messageId); - const pending = this.#pendingMessages.get(msgIdStr); + const pending = this.#pendingTunnelMessages.get(msgIdStr); if (pending) { - this.#pendingMessages.delete(msgIdStr); + this.#pendingTunnelMessages.delete(msgIdStr); } } else { this.#sendAck(message.requestId, message.messageId); @@ -438,7 +449,6 @@ export class Tunnel { const websocketHandler = this.#runner.config.websocket; if (!websocketHandler) { - console.error("No websocket handler configured for tunnel"); logger()?.error({ msg: "no websocket handler configured for tunnel", });