diff --git a/CHANGELOG.md b/CHANGELOG.md index 18a55b81..417d6bdf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ + # confluent-kafka-javascript v0.5.0 v0.5.0 is a limited availability feature release. It is supported for all usage. @@ -5,6 +6,7 @@ v0.5.0 is a limited availability feature release. It is supported for all usage. ## Enhancements 1. Add support for an Admin API to delete records.(#141). +2. Fixes an issue with unresolved raced Promises leaking in the consumer (#151). # confluent-kafka-javascript v0.4.0 diff --git a/lib/kafkajs/_common.js b/lib/kafkajs/_common.js index 2d02ce5d..438fd5dc 100644 --- a/lib/kafkajs/_common.js +++ b/lib/kafkajs/_common.js @@ -669,6 +669,11 @@ class Timer { * or the passed promise resolves, when it's passed, clearing the timeout * in any case. * + * WARNING: it must be avoided to call `withTimeout` with the same promise + * more than once, as `Promise.race` will add more callbacks to it, + * creating a memory leak if the promise is never resolved or not resolved + * soon enough. + * * @param {number} timeoutMs The timeout in milliseconds. * @param {Promise|undefined} promise The promise to wait for, * alternatively to the timeout, or `undefined` to just wait for the timeout. diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 743e04a6..b772a104 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -23,6 +23,7 @@ const { const { Buffer } = require('buffer'); const MessageCache = require('./_consumer_cache'); const { hrtime } = require('process'); +const { LinkedList } = require('./_linked-list'); const ConsumerState = Object.freeze({ INIT: 0, @@ -170,9 +171,9 @@ class Consumer { #fetchInProgress; /** - * Promise that resolves when there is something we need to poll for (messages, rebalance, etc). + * List of DeferredPromises waiting on consumer queue to be non-empty. */ - #queueNonEmpty = new DeferredPromise(); + #queueWaiters = new LinkedList(); /** * Whether any rebalance callback is in progress. @@ -1270,8 +1271,9 @@ class Consumer { } #queueNonEmptyCb() { - /* Unconditionally resolve the promise - not a problem if it's already resolved. */ - this.#queueNonEmpty.resolve(); + for (const waiter of this.#queueWaiters) { + waiter.resolve(); + } } async #nextFetchRetry() { @@ -1280,15 +1282,21 @@ class Consumer { } else { /* Backoff a little. If m is null, we might be without messages * or in available partition starvation, and calling consumeSingleCached - * in a tight loop will help no one. We still keep it to 1000ms because we - * want to keep polling, though (ideally) we could increase it all the way - * up to max.poll.interval.ms. + * in a tight loop will help no one. * In case there is any message in the queue, we'll be woken up before the - * timer expires. */ - await Timer.withTimeout(1000, this.#queueNonEmpty); - if (this.#queueNonEmpty.resolved) { - this.#queueNonEmpty = new DeferredPromise(); - } + * timer expires. + * We have a per-worker promise, otherwise we end up awakening + * other workers when they've already looped and just restarted awaiting. + * The `Promise` passed to `Timer.withTimeout` cannot be reused + * in next call to this method, to avoid memory leaks caused + * by `Promise.race`. */ + const waiter = new DeferredPromise(); + const waiterNode = this.#queueWaiters.addLast(waiter); + await Timer.withTimeout(1000, waiter); + + /* Resolves the "extra" promise that has been spawned when creating the timer. */ + waiter.resolve(); + this.#queueWaiters.remove(waiterNode); } } @@ -1374,10 +1382,7 @@ class Consumer { let interval = Number(cacheExpiration - now) / 1e6; if (interval < 100) interval = 100; - const promises = Promise.race([this.#workerTerminationScheduled, - this.#maxPollIntervalRestart]); - await Timer.withTimeout(interval, - promises); + await Timer.withTimeout(interval, this.#maxPollIntervalRestart); if (this.#maxPollIntervalRestart.resolved) this.#maxPollIntervalRestart = new DeferredPromise(); } diff --git a/test/promisified/consumer/consumeMessages.spec.js b/test/promisified/consumer/consumeMessages.spec.js index de66c35d..9a4f2230 100644 --- a/test/promisified/consumer/consumeMessages.spec.js +++ b/test/promisified/consumer/consumeMessages.spec.js @@ -520,8 +520,10 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit let calls = 0; let failedSeek = false; + let eachMessageStarted = false; consumer.run({ eachMessage: async ({ message }) => { + eachMessageStarted = true; /* Take a long time to process the message. */ await sleep(7000); try { @@ -540,7 +542,7 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit /* Waiting for assignment and then a bit more means that the first eachMessage starts running. */ await waitFor(() => consumer.assignment().length > 0, () => { }, { delay: 50 }); - await sleep(200); + await waitFor(() => eachMessageStarted, () => { }, { delay: 50 }); await consumer.disconnect(); /* Even without explicitly waiting for it, a pending call to eachMessage must complete before disconnect does. */