Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions lib/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,15 @@ function maybeTopic(name, config) {
* @return {Topic} - a new Kafka Topic.
*/
Producer.prototype.Topic = function(name, config) {
if (!this._isConnected) {
throw new Error('Producer not connected');
try {
if (!this._isConnected) {
throw new Error('Producer not connected');
}
return new Kafka.Topic(name, config, this._client);
} catch (err) {
err.message = 'Error creating topic "' + name + '"": ' + err.message;
this.emit('error', LibrdKafkaError.create(err));
}
return new Kafka.Topic(name, config, this._client);
};

/**
Expand Down
24 changes: 20 additions & 4 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,27 @@ RdKafka::Handle* Connection::GetClient() {
}

Baton Connection::CreateTopic(std::string topic_name) {
RdKafka::Topic* topic =
RdKafka::Topic::create(m_client, topic_name, NULL, m_errstr);
return CreateTopic(topic_name, NULL);
}

Baton Connection::CreateTopic(std::string topic_name, RdKafka::Conf* conf) {
std::string errstr;

RdKafka::Topic* topic = NULL;

if (IsConnected()) {
scoped_mutex_lock lock(m_connection_lock);
if (IsConnected()) {
topic = RdKafka::Topic::create(m_client, topic_name, conf, errstr);
} else {
return Baton(RdKafka::ErrorCode::ERR__STATE);
}
} else {
return Baton(RdKafka::ErrorCode::ERR__STATE);
}

if (!m_errstr.empty()) {
return Baton(RdKafka::ErrorCode::ERR_TOPIC_EXCEPTION);
if (!errstr.empty()) {
return Baton(RdKafka::ErrorCode::ERR_TOPIC_EXCEPTION, errstr);
}

// Maybe do it this way later? Then we don't need to do static_cast
Expand Down
1 change: 1 addition & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Connection : public Nan::ObjectWrap {

// Baton<RdKafka::Topic*>
Baton CreateTopic(std::string);
Baton CreateTopic(std::string, RdKafka::Conf*);
Baton GetMetadata(std::string, int);

RdKafka::Handle* GetClient();
Expand Down
19 changes: 16 additions & 3 deletions src/errors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,44 @@

namespace NodeKafka {

v8::Local<v8::Object> RdKafkaError(const RdKafka::ErrorCode &err) {
v8::Local<v8::Object> RdKafkaError(const RdKafka::ErrorCode &err, std::string errstr) { // NOLINT
//
int code = static_cast<int>(err);

v8::Local<v8::Object> ret = Nan::New<v8::Object>();

ret->Set(Nan::New("message").ToLocalChecked(),
Nan::New<v8::String>(RdKafka::err2str(err)).ToLocalChecked());
Nan::New<v8::String>(errstr).ToLocalChecked());
ret->Set(Nan::New("code").ToLocalChecked(),
Nan::New<v8::Number>(code));

return ret;
}

v8::Local<v8::Object> RdKafkaError(const RdKafka::ErrorCode &err) {
return RdKafkaError(err, RdKafka::err2str(err));
}

Baton::Baton(const RdKafka::ErrorCode &code) {
m_err = code;
}

Baton::Baton(const RdKafka::ErrorCode &code, std::string errstr) {
m_err = code;
m_errstr = errstr;
}

Baton::Baton(void* _data) {
m_err = RdKafka::ERR_NO_ERROR;
m_data = _data;
}

v8::Local<v8::Object> Baton::ToObject() {
return RdKafkaError(m_err);
if (m_errstr.empty()) {
return RdKafkaError(m_err);
} else {
return RdKafkaError(m_err, m_errstr);
}
}

RdKafka::ErrorCode Baton::err() {
Expand Down
2 changes: 2 additions & 0 deletions src/errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Baton {
public:
explicit Baton(const RdKafka::ErrorCode &);
explicit Baton(void* data);
explicit Baton(const RdKafka::ErrorCode &, std::string);

template<typename T> T data() {
return static_cast<T>(m_data);
Expand All @@ -35,6 +36,7 @@ class Baton {

private:
void* m_data;
std::string m_errstr;
RdKafka::ErrorCode m_err;
};

Expand Down
1 change: 0 additions & 1 deletion src/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ Message::Message(RdKafka::Message *message):
}

Message::~Message() {
// @todo I think I'd rather check if _message is NULL
if (m_message) {
delete m_message;
}
Expand Down
18 changes: 15 additions & 3 deletions src/topic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,19 @@ namespace NodeKafka {
*/

Topic::Topic(std::string topic_name, RdKafka::Conf* config, Connection * handle) { // NOLINT
m_topic =
RdKafka::Topic::create(handle->GetClient(), topic_name, config, errstr);
Baton b = handle->CreateTopic(topic_name, config);

if (b.err() != RdKafka::ERR_NO_ERROR) {
m_topic = NULL;
} else {
m_topic = b.data<RdKafka::Topic*>();
}
}

Topic::~Topic() {
delete m_topic;
if (m_topic) {
delete m_topic;
}
}

Nan::Persistent<v8::Function> Topic::constructor;
Expand Down Expand Up @@ -90,7 +97,12 @@ void Topic::New(const Nan::FunctionCallbackInfo<v8::Value>& info) {

Connection* connection = ObjectWrap::Unwrap<Connection>(info[2]->ToObject());

if (!connection->IsConnected()) {
return Nan::ThrowError("Client is not connected");
}

Topic* topic = new Topic(topic_name, config, connection);

// Wrap it
topic->Wrap(info.This());

Expand Down