diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index dcd03abe..10797094 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -87,8 +87,18 @@ function KafkaConsumer(conf, topicConf) { this.globalConfig = conf; this.topicConfig = topicConf; + + this._consumeTimeout = 1000; } +/** + * Set the default consume timeout provided to c++land + * @param {number} timeoutMs - number of milliseconds to wait for a message to be fetched + */ +KafkaConsumer.prototype.setDefaultConsumeTimeout = function(timeoutMs) { + this._consumeTimeout = timeoutMs; +}; + /** * Get a stream representation of this KafkaConsumer * @@ -201,6 +211,8 @@ KafkaConsumer.prototype.unsubscribe = function() { * is fetched. */ KafkaConsumer.prototype.consume = function(topics, cb) { + var timeoutMs = this._consumeTimeout || 1000; + var self = this; // If topics is set and is an array or a string, or if we have both // parameters, run it like this. @@ -216,7 +228,7 @@ KafkaConsumer.prototype.consume = function(topics, cb) { cb = function() {}; } - this._consumeLoop(topics, cb); + this._consumeLoop(timeoutMs, topics, cb); } else if ((topics && typeof topics === 'number') || (topics && cb)) { @@ -227,7 +239,7 @@ KafkaConsumer.prototype.consume = function(topics, cb) { throw new TypeError('Callback must be a function'); } - this._consumeNum(numMessages, cb); + this._consumeNum(timeoutMs, numMessages, cb); } else { if (topics === undefined) { @@ -238,7 +250,7 @@ KafkaConsumer.prototype.consume = function(topics, cb) { cb = topics; } - this._consumeOne(cb); + this._consumeOne(timeoutMs, cb); } return this; }; @@ -251,7 +263,7 @@ KafkaConsumer.prototype.consume = function(topics, cb) { * @private * @see consume */ -KafkaConsumer.prototype._consumeLoop = function(topics, cb) { +KafkaConsumer.prototype._consumeLoop = function(timeoutMs, topics, cb) { var self = this; this._client.subscribe(topics, function subscribeCallback(err) { @@ -261,7 +273,7 @@ KafkaConsumer.prototype._consumeLoop = function(topics, cb) { cb(err); } else { - self._client.consumeLoop(function readCallback(err, message) { + self._client.consumeLoop(timeoutMs, function readCallback(err, message) { if (err) { // A few different types of errors here @@ -297,11 +309,11 @@ KafkaConsumer.prototype._consumeLoop = function(topics, cb) { * @private * @see consume */ -KafkaConsumer.prototype._consumeNum = function(numMessages, cb) { +KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) { var self = this; try { - this._client.consume(numMessages, function(err, messages) { + this._client.consume(timeoutMs, numMessages, function(err, messages) { if (err) { err = new LibrdKafkaError(err); if (cb) { @@ -342,12 +354,12 @@ KafkaConsumer.prototype._consumeNum = function(numMessages, cb) { * @private * @see consume */ -KafkaConsumer.prototype._consumeOne = function(cb) { +KafkaConsumer.prototype._consumeOne = function(timeoutMs, cb) { // Otherwise, we run this method var self = this; try { - this._client.consume(function(err, message) { + this._client.consume(timeoutMs, function(err, message) { if (err) { err = new LibrdKafkaError(err); if (cb) { diff --git a/src/consumer.cc b/src/consumer.cc index 094bb104..0ad3ef90 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -244,7 +244,7 @@ Baton Consumer::Subscribe(std::vector topics) { return Baton(RdKafka::ERR_NO_ERROR); } -NodeKafka::Message* Consumer::Consume() { +NodeKafka::Message* Consumer::Consume(int timeout_ms) { NodeKafka::Message* m; if (IsConnected()) { @@ -254,7 +254,7 @@ NodeKafka::Message* Consumer::Consume() { } else { RdKafka::KafkaConsumer* consumer = dynamic_cast(m_client); - m = new NodeKafka::Message(consumer->consume(1000)); + m = new NodeKafka::Message(consumer->consume(timeout_ms)); if (m->ConsumerShouldStop()) { Unsubscribe(); @@ -720,21 +720,36 @@ NAN_METHOD(Consumer::NodeSubscribeSync) { NAN_METHOD(Consumer::NodeConsumeLoop) { Nan::HandleScope scope; - if (info.Length() < 1) { + if (info.Length() < 2) { // Just throw an exception return Nan::ThrowError("Invalid number of parameters"); } - if (!info[0]->IsFunction()) { + if (!info[0]->IsNumber()) { + return Nan::ThrowError("Need to specify a timeout"); + } + + if (!info[1]->IsFunction()) { return Nan::ThrowError("Need to specify a callback"); } + int timeout_ms; + Nan::Maybe maybeTimeout = + Nan::To(info[0].As()); + + if (maybeTimeout.IsNothing()) { + timeout_ms = 1000; + } else { + timeout_ms = static_cast(maybeTimeout.FromJust()); + } + Consumer* consumer = ObjectWrap::Unwrap(info.This()); - v8::Local cb = info[0].As(); + v8::Local cb = info[1].As(); Nan::Callback *callback = new Nan::Callback(cb); - Nan::AsyncQueueWorker(new Workers::ConsumerConsumeLoop(callback, consumer)); + Nan::AsyncQueueWorker( + new Workers::ConsumerConsumeLoop(callback, consumer, timeout_ms)); info.GetReturnValue().Set(Nan::Null()); } @@ -742,17 +757,27 @@ NAN_METHOD(Consumer::NodeConsumeLoop) { NAN_METHOD(Consumer::NodeConsume) { Nan::HandleScope scope; - if (info.Length() < 1) { + if (info.Length() < 2) { // Just throw an exception return Nan::ThrowError("Invalid number of parameters"); } - if (info[0]->IsNumber()) { - if (!info[1]->IsFunction()) { + int timeout_ms; + Nan::Maybe maybeTimeout = + Nan::To(info[0].As()); + + if (maybeTimeout.IsNothing()) { + timeout_ms = 1000; + } else { + timeout_ms = static_cast(maybeTimeout.FromJust()); + } + + if (info[1]->IsNumber()) { + if (!info[2]->IsFunction()) { return Nan::ThrowError("Need to specify a callback"); } - v8::Local numMessagesNumber = info[0].As(); + v8::Local numMessagesNumber = info[1].As(); Nan::Maybe numMessagesMaybe = Nan::To(numMessagesNumber); // NOLINT uint32_t numMessages; @@ -764,21 +789,22 @@ NAN_METHOD(Consumer::NodeConsume) { Consumer* consumer = ObjectWrap::Unwrap(info.This()); - v8::Local cb = info[1].As(); + v8::Local cb = info[2].As(); Nan::Callback *callback = new Nan::Callback(cb); Nan::AsyncQueueWorker( - new Workers::ConsumerConsumeNum(callback, consumer, numMessages)); + new Workers::ConsumerConsumeNum(callback, consumer, numMessages, timeout_ms)); // NOLINT } else { - if (!info[0]->IsFunction()) { + if (!info[1]->IsFunction()) { return Nan::ThrowError("Need to specify a callback"); } Consumer* consumer = ObjectWrap::Unwrap(info.This()); - v8::Local cb = info[0].As(); + v8::Local cb = info[1].As(); Nan::Callback *callback = new Nan::Callback(cb); - Nan::AsyncQueueWorker(new Workers::ConsumerConsume(callback, consumer)); + Nan::AsyncQueueWorker( + new Workers::ConsumerConsume(callback, consumer, timeout_ms)); } info.GetReturnValue().Set(Nan::Null()); diff --git a/src/consumer.h b/src/consumer.h index 4612bbcb..adbedbd1 100644 --- a/src/consumer.h +++ b/src/consumer.h @@ -70,7 +70,7 @@ class Consumer : public Connection { std::string Name(); Baton Subscribe(std::vector); - NodeKafka::Message* Consume(); + NodeKafka::Message* Consume(int timeout_ms); void ActivateDispatchers(); void DeactivateDispatchers(); diff --git a/src/workers.cc b/src/workers.cc index 9eae5661..6ac0fd93 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -388,16 +388,18 @@ void ConsumerUnsubscribe::HandleErrorCallback() { */ ConsumerConsumeLoop::ConsumerConsumeLoop(Nan::Callback *callback, - Consumer* consumer) : + Consumer* consumer, + const int & timeout_ms) : MessageWorker(callback), - consumer(consumer) {} + consumer(consumer), + m_timeout_ms(timeout_ms) {} ConsumerConsumeLoop::~ConsumerConsumeLoop() {} void ConsumerConsumeLoop::Execute(const ExecutionMessageBus& bus) { // Do one check here before we move forward while (consumer->IsConnected()) { - NodeKafka::Message* message = consumer->Consume(); + NodeKafka::Message* message = consumer->Consume(m_timeout_ms); if (message->errcode() == RdKafka::ERR__PARTITION_EOF) { delete message; usleep(1*1000); @@ -466,10 +468,12 @@ void ConsumerConsumeLoop::HandleErrorCallback() { ConsumerConsumeNum::ConsumerConsumeNum(Nan::Callback *callback, Consumer* consumer, - const uint32_t & num_messages) : + const uint32_t & num_messages, + const int & timeout_ms) : ErrorAwareWorker(callback), m_consumer(consumer), - m_num_messages(num_messages) {} + m_num_messages(num_messages), + m_timeout_ms(timeout_ms) {} ConsumerConsumeNum::~ConsumerConsumeNum() {} @@ -477,7 +481,7 @@ void ConsumerConsumeNum::Execute() { const int max = static_cast(m_num_messages); for (int i = 0; i < max; i++) { // Get a message - NodeKafka::Message* message = m_consumer->Consume(); + NodeKafka::Message* message = m_consumer->Consume(m_timeout_ms); if (message->IsError()) { if (message->errcode() != RdKafka::ERR__TIMED_OUT && message->errcode() != RdKafka::ERR__PARTITION_EOF) { @@ -543,14 +547,16 @@ void ConsumerConsumeNum::HandleErrorCallback() { */ ConsumerConsume::ConsumerConsume(Nan::Callback *callback, - Consumer* consumer) : + Consumer* consumer, + const int & timeout_ms) : ErrorAwareWorker(callback), - consumer(consumer) {} + consumer(consumer), + m_timeout_ms(timeout_ms) {} ConsumerConsume::~ConsumerConsume() {} void ConsumerConsume::Execute() { - _message = consumer->Consume(); + _message = consumer->Consume(m_timeout_ms); if (_message->IsError()) { if (_message->errcode() != RdKafka::ERR__TIMED_OUT || _message->errcode() != RdKafka::ERR__PARTITION_EOF) { diff --git a/src/workers.h b/src/workers.h index 031696f1..1b5a5505 100644 --- a/src/workers.h +++ b/src/workers.h @@ -259,7 +259,7 @@ class ConsumerUnsubscribe : public ErrorAwareWorker { class ConsumerConsumeLoop : public MessageWorker { public: - ConsumerConsumeLoop(Nan::Callback*, NodeKafka::Consumer*); + ConsumerConsumeLoop(Nan::Callback*, NodeKafka::Consumer*, const int &); ~ConsumerConsumeLoop(); void Execute(const ExecutionMessageBus&); @@ -268,11 +268,12 @@ class ConsumerConsumeLoop : public MessageWorker { void HandleMessageCallback(NodeKafka::Message*); private: NodeKafka::Consumer * consumer; + const int m_timeout_ms; }; class ConsumerConsume : public ErrorAwareWorker { public: - ConsumerConsume(Nan::Callback*, NodeKafka::Consumer*); + ConsumerConsume(Nan::Callback*, NodeKafka::Consumer*, const int &); ~ConsumerConsume(); void Execute(); @@ -280,12 +281,13 @@ class ConsumerConsume : public ErrorAwareWorker { void HandleErrorCallback(); private: NodeKafka::Consumer * consumer; + const int m_timeout_ms; NodeKafka::Message* _message; }; class ConsumerConsumeNum : public ErrorAwareWorker { public: - ConsumerConsumeNum(Nan::Callback*, NodeKafka::Consumer*, const uint32_t &); + ConsumerConsumeNum(Nan::Callback*, NodeKafka::Consumer*, const uint32_t &, const int &); // NOLINT ~ConsumerConsumeNum(); void Execute(); @@ -294,6 +296,7 @@ class ConsumerConsumeNum : public ErrorAwareWorker { private: NodeKafka::Consumer * m_consumer; const uint32_t m_num_messages; + const int m_timeout_ms; std::vector m_messages; };