Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,18 @@ function KafkaConsumer(conf, topicConf) {

this.globalConfig = conf;
this.topicConfig = topicConf;

this._consumeTimeout = 1000;
}

/**
* Set the default consume timeout provided to c++land
* @param {number} timeoutMs - number of milliseconds to wait for a message to be fetched
*/
KafkaConsumer.prototype.setDefaultConsumeTimeout = function(timeoutMs) {
this._consumeTimeout = timeoutMs;
};

/**
* Get a stream representation of this KafkaConsumer
*
Expand Down Expand Up @@ -201,6 +211,8 @@ KafkaConsumer.prototype.unsubscribe = function() {
* is fetched.
*/
KafkaConsumer.prototype.consume = function(topics, cb) {
var timeoutMs = this._consumeTimeout || 1000;

var self = this;
// If topics is set and is an array or a string, or if we have both
// parameters, run it like this.
Expand All @@ -216,7 +228,7 @@ KafkaConsumer.prototype.consume = function(topics, cb) {
cb = function() {};
}

this._consumeLoop(topics, cb);
this._consumeLoop(timeoutMs, topics, cb);

} else if ((topics && typeof topics === 'number') || (topics && cb)) {

Expand All @@ -227,7 +239,7 @@ KafkaConsumer.prototype.consume = function(topics, cb) {
throw new TypeError('Callback must be a function');
}

this._consumeNum(numMessages, cb);
this._consumeNum(timeoutMs, numMessages, cb);

} else {
if (topics === undefined) {
Expand All @@ -238,7 +250,7 @@ KafkaConsumer.prototype.consume = function(topics, cb) {
cb = topics;
}

this._consumeOne(cb);
this._consumeOne(timeoutMs, cb);
}
return this;
};
Expand All @@ -251,7 +263,7 @@ KafkaConsumer.prototype.consume = function(topics, cb) {
* @private
* @see consume
*/
KafkaConsumer.prototype._consumeLoop = function(topics, cb) {
KafkaConsumer.prototype._consumeLoop = function(timeoutMs, topics, cb) {
var self = this;

this._client.subscribe(topics, function subscribeCallback(err) {
Expand All @@ -261,7 +273,7 @@ KafkaConsumer.prototype._consumeLoop = function(topics, cb) {
cb(err);
} else {

self._client.consumeLoop(function readCallback(err, message) {
self._client.consumeLoop(timeoutMs, function readCallback(err, message) {

if (err) {
// A few different types of errors here
Expand Down Expand Up @@ -297,11 +309,11 @@ KafkaConsumer.prototype._consumeLoop = function(topics, cb) {
* @private
* @see consume
*/
KafkaConsumer.prototype._consumeNum = function(numMessages, cb) {
KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) {
var self = this;

try {
this._client.consume(numMessages, function(err, messages) {
this._client.consume(timeoutMs, numMessages, function(err, messages) {
if (err) {
err = new LibrdKafkaError(err);
if (cb) {
Expand Down Expand Up @@ -342,12 +354,12 @@ KafkaConsumer.prototype._consumeNum = function(numMessages, cb) {
* @private
* @see consume
*/
KafkaConsumer.prototype._consumeOne = function(cb) {
KafkaConsumer.prototype._consumeOne = function(timeoutMs, cb) {
// Otherwise, we run this method
var self = this;

try {
this._client.consume(function(err, message) {
this._client.consume(timeoutMs, function(err, message) {
if (err) {
err = new LibrdKafkaError(err);
if (cb) {
Expand Down
56 changes: 41 additions & 15 deletions src/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ Baton Consumer::Subscribe(std::vector<std::string> topics) {
return Baton(RdKafka::ERR_NO_ERROR);
}

NodeKafka::Message* Consumer::Consume() {
NodeKafka::Message* Consumer::Consume(int timeout_ms) {
NodeKafka::Message* m;

if (IsConnected()) {
Expand All @@ -254,7 +254,7 @@ NodeKafka::Message* Consumer::Consume() {
} else {
RdKafka::KafkaConsumer* consumer =
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
m = new NodeKafka::Message(consumer->consume(1000));
m = new NodeKafka::Message(consumer->consume(timeout_ms));

if (m->ConsumerShouldStop()) {
Unsubscribe();
Expand Down Expand Up @@ -720,39 +720,64 @@ NAN_METHOD(Consumer::NodeSubscribeSync) {
NAN_METHOD(Consumer::NodeConsumeLoop) {
Nan::HandleScope scope;

if (info.Length() < 1) {
if (info.Length() < 2) {
// Just throw an exception
return Nan::ThrowError("Invalid number of parameters");
}

if (!info[0]->IsFunction()) {
if (!info[0]->IsNumber()) {
return Nan::ThrowError("Need to specify a timeout");
}

if (!info[1]->IsFunction()) {
return Nan::ThrowError("Need to specify a callback");
}

int timeout_ms;
Nan::Maybe<uint32_t> maybeTimeout =
Nan::To<uint32_t>(info[0].As<v8::Number>());

if (maybeTimeout.IsNothing()) {
timeout_ms = 1000;
} else {
timeout_ms = static_cast<int>(maybeTimeout.FromJust());
}

Consumer* consumer = ObjectWrap::Unwrap<Consumer>(info.This());

v8::Local<v8::Function> cb = info[0].As<v8::Function>();
v8::Local<v8::Function> cb = info[1].As<v8::Function>();

Nan::Callback *callback = new Nan::Callback(cb);
Nan::AsyncQueueWorker(new Workers::ConsumerConsumeLoop(callback, consumer));
Nan::AsyncQueueWorker(
new Workers::ConsumerConsumeLoop(callback, consumer, timeout_ms));

info.GetReturnValue().Set(Nan::Null());
}

NAN_METHOD(Consumer::NodeConsume) {
Nan::HandleScope scope;

if (info.Length() < 1) {
if (info.Length() < 2) {
// Just throw an exception
return Nan::ThrowError("Invalid number of parameters");
}

if (info[0]->IsNumber()) {
if (!info[1]->IsFunction()) {
int timeout_ms;
Nan::Maybe<uint32_t> maybeTimeout =
Nan::To<uint32_t>(info[0].As<v8::Number>());

if (maybeTimeout.IsNothing()) {
timeout_ms = 1000;
} else {
timeout_ms = static_cast<int>(maybeTimeout.FromJust());
}

if (info[1]->IsNumber()) {
if (!info[2]->IsFunction()) {
return Nan::ThrowError("Need to specify a callback");
}

v8::Local<v8::Number> numMessagesNumber = info[0].As<v8::Number>();
v8::Local<v8::Number> numMessagesNumber = info[1].As<v8::Number>();
Nan::Maybe<uint32_t> numMessagesMaybe = Nan::To<uint32_t>(numMessagesNumber); // NOLINT

uint32_t numMessages;
Expand All @@ -764,21 +789,22 @@ NAN_METHOD(Consumer::NodeConsume) {

Consumer* consumer = ObjectWrap::Unwrap<Consumer>(info.This());

v8::Local<v8::Function> cb = info[1].As<v8::Function>();
v8::Local<v8::Function> cb = info[2].As<v8::Function>();
Nan::Callback *callback = new Nan::Callback(cb);
Nan::AsyncQueueWorker(
new Workers::ConsumerConsumeNum(callback, consumer, numMessages));
new Workers::ConsumerConsumeNum(callback, consumer, numMessages, timeout_ms)); // NOLINT

} else {
if (!info[0]->IsFunction()) {
if (!info[1]->IsFunction()) {
return Nan::ThrowError("Need to specify a callback");
}

Consumer* consumer = ObjectWrap::Unwrap<Consumer>(info.This());

v8::Local<v8::Function> cb = info[0].As<v8::Function>();
v8::Local<v8::Function> cb = info[1].As<v8::Function>();
Nan::Callback *callback = new Nan::Callback(cb);
Nan::AsyncQueueWorker(new Workers::ConsumerConsume(callback, consumer));
Nan::AsyncQueueWorker(
new Workers::ConsumerConsume(callback, consumer, timeout_ms));
}

info.GetReturnValue().Set(Nan::Null());
Expand Down
2 changes: 1 addition & 1 deletion src/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Consumer : public Connection {
std::string Name();

Baton Subscribe(std::vector<std::string>);
NodeKafka::Message* Consume();
NodeKafka::Message* Consume(int timeout_ms);

void ActivateDispatchers();
void DeactivateDispatchers();
Expand Down
24 changes: 15 additions & 9 deletions src/workers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,16 +388,18 @@ void ConsumerUnsubscribe::HandleErrorCallback() {
*/

ConsumerConsumeLoop::ConsumerConsumeLoop(Nan::Callback *callback,
Consumer* consumer) :
Consumer* consumer,
const int & timeout_ms) :
MessageWorker(callback),
consumer(consumer) {}
consumer(consumer),
m_timeout_ms(timeout_ms) {}

ConsumerConsumeLoop::~ConsumerConsumeLoop() {}

void ConsumerConsumeLoop::Execute(const ExecutionMessageBus& bus) {
// Do one check here before we move forward
while (consumer->IsConnected()) {
NodeKafka::Message* message = consumer->Consume();
NodeKafka::Message* message = consumer->Consume(m_timeout_ms);
if (message->errcode() == RdKafka::ERR__PARTITION_EOF) {
delete message;
usleep(1*1000);
Expand Down Expand Up @@ -466,18 +468,20 @@ void ConsumerConsumeLoop::HandleErrorCallback() {

ConsumerConsumeNum::ConsumerConsumeNum(Nan::Callback *callback,
Consumer* consumer,
const uint32_t & num_messages) :
const uint32_t & num_messages,
const int & timeout_ms) :
ErrorAwareWorker(callback),
m_consumer(consumer),
m_num_messages(num_messages) {}
m_num_messages(num_messages),
m_timeout_ms(timeout_ms) {}

ConsumerConsumeNum::~ConsumerConsumeNum() {}

void ConsumerConsumeNum::Execute() {
const int max = static_cast<int>(m_num_messages);
for (int i = 0; i < max; i++) {
// Get a message
NodeKafka::Message* message = m_consumer->Consume();
NodeKafka::Message* message = m_consumer->Consume(m_timeout_ms);
if (message->IsError()) {
if (message->errcode() != RdKafka::ERR__TIMED_OUT &&
message->errcode() != RdKafka::ERR__PARTITION_EOF) {
Expand Down Expand Up @@ -543,14 +547,16 @@ void ConsumerConsumeNum::HandleErrorCallback() {
*/

ConsumerConsume::ConsumerConsume(Nan::Callback *callback,
Consumer* consumer) :
Consumer* consumer,
const int & timeout_ms) :
ErrorAwareWorker(callback),
consumer(consumer) {}
consumer(consumer),
m_timeout_ms(timeout_ms) {}

ConsumerConsume::~ConsumerConsume() {}

void ConsumerConsume::Execute() {
_message = consumer->Consume();
_message = consumer->Consume(m_timeout_ms);
if (_message->IsError()) {
if (_message->errcode() != RdKafka::ERR__TIMED_OUT ||
_message->errcode() != RdKafka::ERR__PARTITION_EOF) {
Expand Down
9 changes: 6 additions & 3 deletions src/workers.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ class ConsumerUnsubscribe : public ErrorAwareWorker {

class ConsumerConsumeLoop : public MessageWorker {
public:
ConsumerConsumeLoop(Nan::Callback*, NodeKafka::Consumer*);
ConsumerConsumeLoop(Nan::Callback*, NodeKafka::Consumer*, const int &);
~ConsumerConsumeLoop();

void Execute(const ExecutionMessageBus&);
Expand All @@ -268,24 +268,26 @@ class ConsumerConsumeLoop : public MessageWorker {
void HandleMessageCallback(NodeKafka::Message*);
private:
NodeKafka::Consumer * consumer;
const int m_timeout_ms;
};

class ConsumerConsume : public ErrorAwareWorker {
public:
ConsumerConsume(Nan::Callback*, NodeKafka::Consumer*);
ConsumerConsume(Nan::Callback*, NodeKafka::Consumer*, const int &);
~ConsumerConsume();

void Execute();
void HandleOKCallback();
void HandleErrorCallback();
private:
NodeKafka::Consumer * consumer;
const int m_timeout_ms;
NodeKafka::Message* _message;
};

class ConsumerConsumeNum : public ErrorAwareWorker {
public:
ConsumerConsumeNum(Nan::Callback*, NodeKafka::Consumer*, const uint32_t &);
ConsumerConsumeNum(Nan::Callback*, NodeKafka::Consumer*, const uint32_t &, const int &); // NOLINT
~ConsumerConsumeNum();

void Execute();
Expand All @@ -294,6 +296,7 @@ class ConsumerConsumeNum : public ErrorAwareWorker {
private:
NodeKafka::Consumer * m_consumer;
const uint32_t m_num_messages;
const int m_timeout_ms;
std::vector<NodeKafka::Message*> m_messages;
};

Expand Down