Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 68 additions & 32 deletions e2e/both.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ var t = require('assert');

var Kafka = require('../');
var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
const { createTopics, deleteTopics } = require('./topicUtils');
var eventListener = require('./listener');
var topic = 'test';
var topic2 = 'test2';
var topic;

describe('Consumer/Producer', function() {

var producer;
var consumer;
var grp;

let createdTopics = [];

beforeEach(function(done) {
var finished = 0;
Expand All @@ -36,12 +38,24 @@ describe('Consumer/Producer', function() {
return done(err);
}

if (finished === 2) {
if (finished === 3) {
done();
}
}

var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');
grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');
topic = 'test' + crypto.randomBytes(20).toString('hex');

createTopics(
[{ topic, num_partitions: 1, replication_factor: 1 }],
kafkaBrokerList,
function(err) {
t.ifError(err);
maybeDone(err);
}
);

createdTopics.push(topic);

consumer = new Kafka.KafkaConsumer({
'metadata.broker.list': kafkaBrokerList,
Expand All @@ -53,7 +67,7 @@ describe('Consumer/Producer', function() {
'debug': 'all'
// paused: true,
}, {
'auto.offset.reset': 'largest'
'auto.offset.reset': 'smallest'
});

consumer.connect({}, function(err, d) {
Expand Down Expand Up @@ -99,16 +113,21 @@ describe('Consumer/Producer', function() {
return done(err);
}

if (finished === 2) {
if (finished === 3) {
done();
}
}

consumer.disconnect(function(err) {
producer.disconnect(function(err) {
maybeDone(err);
});

producer.disconnect(function(err) {
deleteTopics(createdTopics, kafkaBrokerList, function(err) {
createdTopics.length = 0;
maybeDone(err);
});

consumer.disconnect(function(err) {
maybeDone(err);
});
});
Expand Down Expand Up @@ -154,6 +173,7 @@ describe('Consumer/Producer', function() {
t.equal(position.length, 1);
t.deepStrictEqual(position[0].partition, 0);
t.ok(position[0].offset >= 0);
consumer.unsubscribe();
done();
});
};
Expand All @@ -180,6 +200,7 @@ describe('Consumer/Producer', function() {
consumer.consume(100000, function(err, messages) {
t.ifError(err);
t.equal(messages.length, 1);
consumer.unsubscribe();
done();
});
};
Expand Down Expand Up @@ -228,12 +249,13 @@ describe('Consumer/Producer', function() {

setTimeout(function() {
producer.produce(topic, null, buffer, null);
}, 500)
consumer.setDefaultConsumeTimeout(2000);
}, 500);
consumer.setDefaultConsumeTimeout(20000);
consumer.consume(1000, function(err, messages) {
t.ifError(err);
t.equal(messages.length, 1);
t.deepStrictEqual(events, ["data", "partition.eof"]);
consumer.unsubscribe();
done();
});
});
Expand Down Expand Up @@ -261,12 +283,13 @@ describe('Consumer/Producer', function() {

setTimeout(function() {
producer.produce(topic, null, buffer, null);
}, 2000)
consumer.setDefaultConsumeTimeout(3000);
}, 4000);
consumer.setDefaultConsumeTimeout(20000);
consumer.consume(1000, function(err, messages) {
t.ifError(err);
t.equal(messages.length, 1);
t.deepStrictEqual(events, ["partition.eof", "data", "partition.eof"]);
consumer.unsubscribe();
done();
});
});
Expand All @@ -276,7 +299,6 @@ describe('Consumer/Producer', function() {
var key = 'key';

crypto.randomBytes(4096, function(ex, buffer) {

producer.setPollInterval(10);

producer.once('delivery-report', function(err, report) {
Expand All @@ -292,6 +314,7 @@ describe('Consumer/Producer', function() {
t.equal(key, message.key, 'invalid message key');
t.equal(topic, message.topic, 'invalid message topic');
t.ok(message.offset >= 0, 'invalid message offset');
consumer.unsubscribe();
done();
});

Expand All @@ -306,15 +329,13 @@ describe('Consumer/Producer', function() {
});

it('should emit \'partition.eof\' events in consumeLoop', function(done) {

crypto.randomBytes(4096, function(ex, buffer) {
producer.setPollInterval(10);

producer.once('delivery-report', function(err, report) {
t.ifError(err);
});


var events = [];
var offsets = [];

Expand All @@ -337,11 +358,11 @@ describe('Consumer/Producer', function() {

setTimeout(function() {
producer.produce(topic, null, buffer);
}, 2000);
}, 4000);

setTimeout(function() {
producer.produce(topic, null, buffer);
}, 4000);
}, 6000);

setTimeout(function() {
t.deepStrictEqual(events, ['partition.eof', 'data', 'partition.eof', 'data', 'partition.eof']);
Expand All @@ -352,8 +373,9 @@ describe('Consumer/Producer', function() {
startOffset + 1,
startOffset + 1,
startOffset + 2 ]);
consumer.unsubscribe();
done();
}, 6000);
}, 8000);
});
});

Expand Down Expand Up @@ -386,16 +408,26 @@ describe('Consumer/Producer', function() {
run_headers_test(done, headers);
});

it('should be able to produce and consume messages with one header value as int: consumeLoop', function(done) {
it('should be able to produce and consume messages with one header value as string with unicode: consumeLoop', function(done) {
var headers = [
{ key: 10 }
{ key: '10👍' },
{ key: 'こんにちは' },
{ key: '🌍🌎🌏' }
];
run_headers_test(done, headers);
});

it('should be able to produce and consume messages with one header value as float: consumeLoop', function(done) {
it('should be able to produce and consume messages with one header value as string with emojis: consumeLoop', function(done) {
var headers = [
{ key: 1.11 }
{ key: '😀😃😄😁' }
];
run_headers_test(done, headers);
});

it('should be able to produce and consume messages with one header value as string in other languages: consumeLoop', function(done) {
var headers = [
{ key: '你好' },
{ key: 'Привет' }
];
run_headers_test(done, headers);
});
Expand All @@ -422,8 +454,8 @@ describe('Consumer/Producer', function() {

it('should be able to produce and consume messages with multiple headers with mixed values: consumeLoop', function(done) {
var headers = [
{ key1: 'value1' },
{ key2: Buffer.from('value2') },
{ key1: 'value1' },
{ key2: Buffer.from('value2') }
];
run_headers_test(done, headers);
});
Expand All @@ -440,7 +472,7 @@ describe('Consumer/Producer', function() {
const buffer = Buffer.from('value');
const key = 'key';
t.throws(
() => producer.produce(topic, null, buffer, key, null, "", headerCase),
() => producer.produce(topic, null, buffer, key, null, '', headerCase),
'must be string or buffer'
);
}
Expand All @@ -451,15 +483,16 @@ describe('Consumer/Producer', function() {
it('should be able to produce and consume messages: empty buffer key and empty value', function(done) {
var emptyString = '';
var key = Buffer.from(emptyString);
var value = Buffer.from('');
var value = Buffer.from(emptyString);

producer.setPollInterval(10);

consumer.once('data', function(message) {
t.notEqual(message.value, null, 'message should not be null');
t.equal(value.toString(), message.value.toString(), 'invalid message value');
t.equal(emptyString, message.key, 'invalid message key');
done();
consumer.unsubscribe();
done();
});

consumer.subscribe([topic]);
Expand All @@ -480,6 +513,7 @@ describe('Consumer/Producer', function() {
t.notEqual(message.value, null, 'message should not be null');
t.equal(value.toString(), message.value.toString(), 'invalid message value');
t.equal(key, message.key, 'invalid message key');
consumer.unsubscribe();
done();
});

Expand All @@ -500,6 +534,7 @@ describe('Consumer/Producer', function() {
consumer.once('data', function(message) {
t.equal(value, message.value, 'invalid message value');
t.equal(key, message.key, 'invalid message key');
consumer.unsubscribe();
done();
});

Expand All @@ -525,7 +560,7 @@ describe('Consumer/Producer', function() {

beforeEach(function(done) {
consumer = new Kafka.KafkaConsumer(consumerOpts, {
'auto.offset.reset': 'largest',
'auto.offset.reset': 'smallest',
});

consumer.connect({}, function(err, d) {
Expand Down Expand Up @@ -569,6 +604,7 @@ describe('Consumer/Producer', function() {
});

consumer.subscribe([topic]);
consumer.setDefaultConsumeTimeout(4000);
consumer.consume();

setTimeout(function() {
Expand Down Expand Up @@ -612,14 +648,15 @@ describe('Consumer/Producer', function() {
}
};
consumer = new Kafka.KafkaConsumer(consumerOpts, {
'auto.offset.reset': 'largest',
'auto.offset.reset': 'smallest',
});
eventListener(consumer);

consumer.connect({}, function(err, d) {
t.ifError(err);
t.equal(typeof d, 'object', 'metadata should be returned');
consumer.subscribe([topic]);
consumer.setDefaultConsumeTimeout(4000);
consumer.consume();
setTimeout(function() {
producer.produce(topic, null, Buffer.from(''), '');
Expand Down Expand Up @@ -668,6 +705,7 @@ describe('Consumer/Producer', function() {
t.equal(topic, message.topic, 'invalid message topic');
t.ok(message.offset >= 0, 'invalid message offset');
assert_headers_match(headers, message.headers);
consumer.unsubscribe();
done();
});

Expand All @@ -678,8 +716,6 @@ describe('Consumer/Producer', function() {
var timestamp = new Date().getTime();
producer.produce(topic, null, buffer, key, timestamp, "", headers);
}, 2000);

});
}

});