From 7c6f35c340117f6cf893b34d5eaeb03f324c4984 Mon Sep 17 00:00:00 2001 From: Milind L Date: Sun, 10 Nov 2024 14:12:49 +0530 Subject: [PATCH 1/4] Ensure that all raced promises are resolved after the race --- lib/kafkajs/_common.js | 7 +++++++ lib/kafkajs/_consumer.js | 42 +++++++++++++++++++++++----------------- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/lib/kafkajs/_common.js b/lib/kafkajs/_common.js index 2d02ce5d..bbe39994 100644 --- a/lib/kafkajs/_common.js +++ b/lib/kafkajs/_common.js @@ -660,6 +660,12 @@ class DeferredPromise extends Promise{ } } +/* Convenience function to resolve a DeferredPromise and return a new one. */ +function replaceDeferredPromise(promise) { + promise.resolve(); + return new DeferredPromise(); +} + /** * Utility class for time related functions */ @@ -856,6 +862,7 @@ module.exports = { checkIfKafkaJsKeysPresent, Lock, DeferredPromise, + replaceDeferredPromise, Timer, partitionKey, }; diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 743e04a6..a8883d75 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -18,11 +18,13 @@ const { Lock, partitionKey, DeferredPromise, + replaceDeferredPromise, Timer } = require('./_common'); const { Buffer } = require('buffer'); const MessageCache = require('./_consumer_cache'); const { hrtime } = require('process'); +const { LinkedList } = require('./_linked-list'); const ConsumerState = Object.freeze({ INIT: 0, @@ -170,9 +172,9 @@ class Consumer { #fetchInProgress; /** - * Promise that resolves when there is something we need to poll for (messages, rebalance, etc). + * List of DeferredPromises waiting on consumer queue to be non-empty. */ - #queueNonEmpty = new DeferredPromise(); + #queueWaiters = new LinkedList(); /** * Whether any rebalance callback is in progress. @@ -1270,8 +1272,9 @@ class Consumer { } #queueNonEmptyCb() { - /* Unconditionally resolve the promise - not a problem if it's already resolved. */ - this.#queueNonEmpty.resolve(); + for (const waiter of this.#queueWaiters) { + waiter.resolve(); + } } async #nextFetchRetry() { @@ -1280,15 +1283,20 @@ class Consumer { } else { /* Backoff a little. If m is null, we might be without messages * or in available partition starvation, and calling consumeSingleCached - * in a tight loop will help no one. We still keep it to 1000ms because we - * want to keep polling, though (ideally) we could increase it all the way - * up to max.poll.interval.ms. + * in a tight loop will help no one. * In case there is any message in the queue, we'll be woken up before the - * timer expires. */ - await Timer.withTimeout(1000, this.#queueNonEmpty); - if (this.#queueNonEmpty.resolved) { - this.#queueNonEmpty = new DeferredPromise(); - } + * timer expires. We cannot just await the `waiter` promise. This is because + * it's possible, in the time between the fetch and when `nextFetchRetry` are + * called, that the queue becomes non-empty. In that case, this worker will + * halt for no reason. Having a bounded wait ensures that even if we miss + * being woken up, we will still make progress with this worker soon. */ + const waiter = new DeferredPromise(); + const waiterNode = this.#queueWaiters.addLast(waiter); + await Timer.withTimeout(1000, waiter); + + /* Resolves the "extra" promise that has been spawned when creating the timer. */ + waiter.resolve(); + this.#queueWaiters.remove(waiterNode); } } @@ -1343,6 +1351,8 @@ class Consumer { interval = 1; await Timer.withTimeout(interval, this.#maxPollIntervalRestart); + this.#maxPollIntervalRestart = replaceDeferredPromise(this.#maxPollIntervalRestart); + now = hrtime.bigint(); if (now > (maxPollExpiration - 1000000n)) { @@ -1374,12 +1384,8 @@ class Consumer { let interval = Number(cacheExpiration - now) / 1e6; if (interval < 100) interval = 100; - const promises = Promise.race([this.#workerTerminationScheduled, - this.#maxPollIntervalRestart]); - await Timer.withTimeout(interval, - promises); - if (this.#maxPollIntervalRestart.resolved) - this.#maxPollIntervalRestart = new DeferredPromise(); + await Timer.withTimeout(interval, this.#maxPollIntervalRestart); + this.#maxPollIntervalRestart = replaceDeferredPromise(this.#maxPollIntervalRestart); } if (this.#maxPollIntervalRestart.resolved) this.#maxPollIntervalRestart = new DeferredPromise(); From f607cf1ab56b76a4e9543e3444641d1b3e7f59b1 Mon Sep 17 00:00:00 2001 From: Milind L Date: Mon, 11 Nov 2024 19:15:27 +0530 Subject: [PATCH 2/4] Add CHANGELOG.md entry --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 41ff7425..8c8ea3d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +# confluent-kafka-javascript v0.4.1 + +v0.4.1 is a limited availability maintainence release. It is supported for all usage. + +## Enhancements + +1. Fixes an issue with unresolved raced Promises leaking in the consumer (#151). + + # confluent-kafka-javascript v0.4.0 v0.4.0 is a limited availability feature release. It is supported for all usage. From 620ee773a4eaad1a5b607f6e8d476d286d02d7bf Mon Sep 17 00:00:00 2001 From: Milind L Date: Mon, 11 Nov 2024 20:36:15 +0530 Subject: [PATCH 3/4] Reduce flakiness in test --- test/promisified/consumer/consumeMessages.spec.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/promisified/consumer/consumeMessages.spec.js b/test/promisified/consumer/consumeMessages.spec.js index de66c35d..9a4f2230 100644 --- a/test/promisified/consumer/consumeMessages.spec.js +++ b/test/promisified/consumer/consumeMessages.spec.js @@ -520,8 +520,10 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit let calls = 0; let failedSeek = false; + let eachMessageStarted = false; consumer.run({ eachMessage: async ({ message }) => { + eachMessageStarted = true; /* Take a long time to process the message. */ await sleep(7000); try { @@ -540,7 +542,7 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit /* Waiting for assignment and then a bit more means that the first eachMessage starts running. */ await waitFor(() => consumer.assignment().length > 0, () => { }, { delay: 50 }); - await sleep(200); + await waitFor(() => eachMessageStarted, () => { }, { delay: 50 }); await consumer.disconnect(); /* Even without explicitly waiting for it, a pending call to eachMessage must complete before disconnect does. */ From 6184a8dabfcb65b7eee8dd7c41d1be2a3d133375 Mon Sep 17 00:00:00 2001 From: Milind L Date: Mon, 11 Nov 2024 22:43:48 +0530 Subject: [PATCH 4/4] Address review comments --- lib/kafkajs/_common.js | 12 +++++------- lib/kafkajs/_consumer.js | 17 ++++++++--------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/lib/kafkajs/_common.js b/lib/kafkajs/_common.js index bbe39994..438fd5dc 100644 --- a/lib/kafkajs/_common.js +++ b/lib/kafkajs/_common.js @@ -660,12 +660,6 @@ class DeferredPromise extends Promise{ } } -/* Convenience function to resolve a DeferredPromise and return a new one. */ -function replaceDeferredPromise(promise) { - promise.resolve(); - return new DeferredPromise(); -} - /** * Utility class for time related functions */ @@ -675,6 +669,11 @@ class Timer { * or the passed promise resolves, when it's passed, clearing the timeout * in any case. * + * WARNING: it must be avoided to call `withTimeout` with the same promise + * more than once, as `Promise.race` will add more callbacks to it, + * creating a memory leak if the promise is never resolved or not resolved + * soon enough. + * * @param {number} timeoutMs The timeout in milliseconds. * @param {Promise|undefined} promise The promise to wait for, * alternatively to the timeout, or `undefined` to just wait for the timeout. @@ -862,7 +861,6 @@ module.exports = { checkIfKafkaJsKeysPresent, Lock, DeferredPromise, - replaceDeferredPromise, Timer, partitionKey, }; diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index a8883d75..b772a104 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -18,7 +18,6 @@ const { Lock, partitionKey, DeferredPromise, - replaceDeferredPromise, Timer } = require('./_common'); const { Buffer } = require('buffer'); @@ -1285,11 +1284,12 @@ class Consumer { * or in available partition starvation, and calling consumeSingleCached * in a tight loop will help no one. * In case there is any message in the queue, we'll be woken up before the - * timer expires. We cannot just await the `waiter` promise. This is because - * it's possible, in the time between the fetch and when `nextFetchRetry` are - * called, that the queue becomes non-empty. In that case, this worker will - * halt for no reason. Having a bounded wait ensures that even if we miss - * being woken up, we will still make progress with this worker soon. */ + * timer expires. + * We have a per-worker promise, otherwise we end up awakening + * other workers when they've already looped and just restarted awaiting. + * The `Promise` passed to `Timer.withTimeout` cannot be reused + * in next call to this method, to avoid memory leaks caused + * by `Promise.race`. */ const waiter = new DeferredPromise(); const waiterNode = this.#queueWaiters.addLast(waiter); await Timer.withTimeout(1000, waiter); @@ -1351,8 +1351,6 @@ class Consumer { interval = 1; await Timer.withTimeout(interval, this.#maxPollIntervalRestart); - this.#maxPollIntervalRestart = replaceDeferredPromise(this.#maxPollIntervalRestart); - now = hrtime.bigint(); if (now > (maxPollExpiration - 1000000n)) { @@ -1385,7 +1383,8 @@ class Consumer { if (interval < 100) interval = 100; await Timer.withTimeout(interval, this.#maxPollIntervalRestart); - this.#maxPollIntervalRestart = replaceDeferredPromise(this.#maxPollIntervalRestart); + if (this.#maxPollIntervalRestart.resolved) + this.#maxPollIntervalRestart = new DeferredPromise(); } if (this.#maxPollIntervalRestart.resolved) this.#maxPollIntervalRestart = new DeferredPromise();