From e4df593ae77d6db0b2bd0795abbc9403c7cc6cce Mon Sep 17 00:00:00 2001 From: Stephen Parente Date: Tue, 16 Aug 2016 13:51:29 -0700 Subject: [PATCH] Do native isConnected checking inside a lock (#7) Fixes test case (as far as I can tell) for Issue #7 --- lib/producer.js | 11 ++++++++--- src/connection.cc | 24 ++++++++++++++++++++---- src/connection.h | 1 + src/errors.cc | 19 ++++++++++++++++--- src/errors.h | 2 ++ src/message.cc | 1 - src/topic.cc | 18 +++++++++++++++--- 7 files changed, 62 insertions(+), 14 deletions(-) diff --git a/lib/producer.js b/lib/producer.js index cc334bd1..734f6cec 100644 --- a/lib/producer.js +++ b/lib/producer.js @@ -155,10 +155,15 @@ function maybeTopic(name, config) { * @return {Topic} - a new Kafka Topic. */ Producer.prototype.Topic = function(name, config) { - if (!this._isConnected) { - throw new Error('Producer not connected'); + try { + if (!this._isConnected) { + throw new Error('Producer not connected'); + } + return new Kafka.Topic(name, config, this._client); + } catch (err) { + err.message = 'Error creating topic "' + name + '"": ' + err.message; + this.emit('error', LibrdKafkaError.create(err)); } - return new Kafka.Topic(name, config, this._client); }; /** diff --git a/src/connection.cc b/src/connection.cc index 13474142..93ebe8ef 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -94,11 +94,27 @@ RdKafka::Handle* Connection::GetClient() { } Baton Connection::CreateTopic(std::string topic_name) { - RdKafka::Topic* topic = - RdKafka::Topic::create(m_client, topic_name, NULL, m_errstr); + return CreateTopic(topic_name, NULL); +} + +Baton Connection::CreateTopic(std::string topic_name, RdKafka::Conf* conf) { + std::string errstr; + + RdKafka::Topic* topic = NULL; + + if (IsConnected()) { + scoped_mutex_lock lock(m_connection_lock); + if (IsConnected()) { + topic = RdKafka::Topic::create(m_client, topic_name, conf, errstr); + } else { + return Baton(RdKafka::ErrorCode::ERR__STATE); + } + } else { + return Baton(RdKafka::ErrorCode::ERR__STATE); + } - if (!m_errstr.empty()) { - return Baton(RdKafka::ErrorCode::ERR_TOPIC_EXCEPTION); + if (!errstr.empty()) { + return Baton(RdKafka::ErrorCode::ERR_TOPIC_EXCEPTION, errstr); } // Maybe do it this way later? Then we don't need to do static_cast diff --git a/src/connection.h b/src/connection.h index d0741fa2..f576a501 100644 --- a/src/connection.h +++ b/src/connection.h @@ -50,6 +50,7 @@ class Connection : public Nan::ObjectWrap { // Baton Baton CreateTopic(std::string); + Baton CreateTopic(std::string, RdKafka::Conf*); Baton GetMetadata(std::string, int); RdKafka::Handle* GetClient(); diff --git a/src/errors.cc b/src/errors.cc index 02f4055b..440a351e 100644 --- a/src/errors.cc +++ b/src/errors.cc @@ -13,31 +13,44 @@ namespace NodeKafka { -v8::Local RdKafkaError(const RdKafka::ErrorCode &err) { +v8::Local RdKafkaError(const RdKafka::ErrorCode &err, std::string errstr) { // NOLINT // int code = static_cast(err); v8::Local ret = Nan::New(); ret->Set(Nan::New("message").ToLocalChecked(), - Nan::New(RdKafka::err2str(err)).ToLocalChecked()); + Nan::New(errstr).ToLocalChecked()); ret->Set(Nan::New("code").ToLocalChecked(), Nan::New(code)); return ret; } +v8::Local RdKafkaError(const RdKafka::ErrorCode &err) { + return RdKafkaError(err, RdKafka::err2str(err)); +} + Baton::Baton(const RdKafka::ErrorCode &code) { m_err = code; } +Baton::Baton(const RdKafka::ErrorCode &code, std::string errstr) { + m_err = code; + m_errstr = errstr; +} + Baton::Baton(void* _data) { m_err = RdKafka::ERR_NO_ERROR; m_data = _data; } v8::Local Baton::ToObject() { - return RdKafkaError(m_err); + if (m_errstr.empty()) { + return RdKafkaError(m_err); + } else { + return RdKafkaError(m_err, m_errstr); + } } RdKafka::ErrorCode Baton::err() { diff --git a/src/errors.h b/src/errors.h index cc770ce0..2f85b9f6 100644 --- a/src/errors.h +++ b/src/errors.h @@ -24,6 +24,7 @@ class Baton { public: explicit Baton(const RdKafka::ErrorCode &); explicit Baton(void* data); + explicit Baton(const RdKafka::ErrorCode &, std::string); template T data() { return static_cast(m_data); @@ -35,6 +36,7 @@ class Baton { private: void* m_data; + std::string m_errstr; RdKafka::ErrorCode m_err; }; diff --git a/src/message.cc b/src/message.cc index a9d1ec26..00829138 100644 --- a/src/message.cc +++ b/src/message.cc @@ -90,7 +90,6 @@ Message::Message(RdKafka::Message *message): } Message::~Message() { - // @todo I think I'd rather check if _message is NULL if (m_message) { delete m_message; } diff --git a/src/topic.cc b/src/topic.cc index fd16feb4..71762dfb 100644 --- a/src/topic.cc +++ b/src/topic.cc @@ -30,12 +30,19 @@ namespace NodeKafka { */ Topic::Topic(std::string topic_name, RdKafka::Conf* config, Connection * handle) { // NOLINT - m_topic = - RdKafka::Topic::create(handle->GetClient(), topic_name, config, errstr); + Baton b = handle->CreateTopic(topic_name, config); + + if (b.err() != RdKafka::ERR_NO_ERROR) { + m_topic = NULL; + } else { + m_topic = b.data(); + } } Topic::~Topic() { - delete m_topic; + if (m_topic) { + delete m_topic; + } } Nan::Persistent Topic::constructor; @@ -90,7 +97,12 @@ void Topic::New(const Nan::FunctionCallbackInfo& info) { Connection* connection = ObjectWrap::Unwrap(info[2]->ToObject()); + if (!connection->IsConnected()) { + return Nan::ThrowError("Client is not connected"); + } + Topic* topic = new Topic(topic_name, config, connection); + // Wrap it topic->Wrap(info.This());