From 9d710becbead8709db1eeadecfd060a5370983ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1t=C3=A9=20Szab=C3=B3?= Date: Fri, 23 May 2025 21:56:24 +0200 Subject: [PATCH] Avoid leaking uv_async_t in Dispatcher::Deactivate() On disconnect, Dispatcher::Deactivate() calls uv_close() to close its active uv_async_t instance and nulls out the corresponding field, but never frees the underlying memory. To reproduce, run the following with node-rdkafka and librdkafka compiled with ASAN: ```js const { KafkaConsumer } = require('../'); const consumer = new KafkaConsumer({ 'group.id': 'kafka', 'metadata.broker.list': 'localhost:9092', }, {}); consumer.connect({ timeout: 2000 }, function (err) { if (err) { console.error('Error connecting to Kafka:', err); return; } consumer.disconnect(); }) ``` This should report: ``` Direct leak of 128 byte(s) in 1 object(s) allocated from: #0 0x7f049c3aa647 in operator new(unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:99 #1 0x7f0471d872a1 in NodeKafka::Callbacks::Dispatcher::Activate() ../src/callbacks.cc:69 #2 0x7f0471e75c06 in NodeKafka::Workers::KafkaConsumerConnect::HandleOKCallback() ../src/workers.cc:579 #3 0x7f0471dd679c in Nan::AsyncWorker::WorkComplete() ../node_modules/nan/nan.h:2008 #4 0x7f0471dd679c in Nan::AsyncExecuteComplete(uv_work_s*) ../node_modules/nan/nan.h:2365 #5 0x7f0471dd679c in Nan::AsyncExecuteComplete(uv_work_s*, int) ../node_modules/nan/nan.h:2369 #6 0x18bb75c in uv__work_done ../deps/uv/src/threadpool.c:329 #7 0x18bf0b2 in uv__async_io ../deps/uv/src/unix/async.c:176 #8 0x18d3b2a in uv__io_poll ../deps/uv/src/unix/linux.c:1485 #9 0x18bfdd6 in uv_run ../deps/uv/src/unix/core.c:447 #10 0xbc9be5 in node::SpinEventLoopInternal(node::Environment*) (/usr/bin/node+0xbc9be5) #11 0xd1d920 in node::NodeMainInstance::Run(node::ExitCode*, node::Environment*) [clone .part.0] (/usr/bin/node+0xd1d920) #12 0xd1e38c in node::NodeMainInstance::Run() (/usr/bin/node+0xd1e38c) #13 0xc710be in node::Start(int, char**) (/usr/bin/node+0xc710be) #14 0x7f049bdf0d09 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x23d09) ``` The [libuv documentation](https://docs.libuv.org/en/v1.x/handle.html#c.uv_close) says it's only safe to free the underlying memory in a close callback passed to uv_close(), or after such a callback has returned. So, use a unique_ptr with a custom deleter to accomplish this. --- src/callbacks.cc | 26 +++++++++++++++++--------- src/callbacks.h | 5 ++++- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/src/callbacks.cc b/src/callbacks.cc index c8f3b5c1..7df2392f 100644 --- a/src/callbacks.cc +++ b/src/callbacks.cc @@ -48,8 +48,7 @@ v8::Local TopicPartitionListToV8Array( return tp_array; } -Dispatcher::Dispatcher() { - async = NULL; +Dispatcher::Dispatcher(): async(nullptr, async_deleter) { uv_mutex_init(&async_lock); } @@ -66,8 +65,9 @@ Dispatcher::~Dispatcher() { // Only run this if we aren't already listening void Dispatcher::Activate() { if (!async) { - async = new uv_async_t; - uv_async_init(uv_default_loop(), async, AsyncMessage_); + async = std::unique_ptr( + new uv_async_t(), async_deleter); + uv_async_init(uv_default_loop(), async.get(), AsyncMessage_); async->data = this; } @@ -75,10 +75,7 @@ void Dispatcher::Activate() { // Should be able to run this regardless of whether it is active or not void Dispatcher::Deactivate() { - if (async) { - uv_close(reinterpret_cast(async), NULL); - async = NULL; - } + async.reset(); } bool Dispatcher::HasCallbacks() { @@ -87,7 +84,7 @@ bool Dispatcher::HasCallbacks() { void Dispatcher::Execute() { if (async) { - uv_async_send(async); + uv_async_send(async.get()); } } @@ -119,6 +116,17 @@ void Dispatcher::RemoveCallback(const v8::Local &cb) { } } +// Custom deleter for uv_async_t smart pointers +void Dispatcher::async_deleter(uv_async_t* async) { + uv_close( + reinterpret_cast(async), + // Release memory after uv_close() has finished. + [](uv_handle_t* handle) { + delete reinterpret_cast(handle); + } + ); +} + event_t::event_t(const RdKafka::Event &event) { message = ""; fac = ""; diff --git a/src/callbacks.h b/src/callbacks.h index 8fcb3311..7fa0f2c8 100644 --- a/src/callbacks.h +++ b/src/callbacks.h @@ -12,6 +12,7 @@ #include #include +#include #include #include @@ -49,7 +50,9 @@ class Dispatcher { dispatcher->Flush(); } - uv_async_t *async; + static inline void async_deleter(uv_async_t* async); + + std::unique_ptr async; }; struct event_t {