diff --git a/e2e/both.spec.js b/e2e/both.spec.js index 62983179..1b53840e 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -12,14 +12,16 @@ var t = require('assert'); var Kafka = require('../'); var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092'; +const { createTopics, deleteTopics } = require('./topicUtils'); var eventListener = require('./listener'); -var topic = 'test'; -var topic2 = 'test2'; +var topic; describe('Consumer/Producer', function() { - var producer; var consumer; + var grp; + + let createdTopics = []; beforeEach(function(done) { var finished = 0; @@ -36,12 +38,24 @@ describe('Consumer/Producer', function() { return done(err); } - if (finished === 2) { + if (finished === 3) { done(); } } - var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex'); + grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex'); + topic = 'test' + crypto.randomBytes(20).toString('hex'); + + createTopics( + [{ topic, num_partitions: 1, replication_factor: 1 }], + kafkaBrokerList, + function(err) { + t.ifError(err); + maybeDone(err); + } + ); + + createdTopics.push(topic); consumer = new Kafka.KafkaConsumer({ 'metadata.broker.list': kafkaBrokerList, @@ -53,7 +67,7 @@ describe('Consumer/Producer', function() { 'debug': 'all' // paused: true, }, { - 'auto.offset.reset': 'largest' + 'auto.offset.reset': 'smallest' }); consumer.connect({}, function(err, d) { @@ -99,16 +113,21 @@ describe('Consumer/Producer', function() { return done(err); } - if (finished === 2) { + if (finished === 3) { done(); } } - consumer.disconnect(function(err) { + producer.disconnect(function(err) { maybeDone(err); }); - producer.disconnect(function(err) { + deleteTopics(createdTopics, kafkaBrokerList, function(err) { + createdTopics.length = 0; + maybeDone(err); + }); + + consumer.disconnect(function(err) { maybeDone(err); }); }); @@ -154,6 +173,7 @@ describe('Consumer/Producer', function() { t.equal(position.length, 1); t.deepStrictEqual(position[0].partition, 0); t.ok(position[0].offset >= 0); + consumer.unsubscribe(); done(); }); }; @@ -180,6 +200,7 @@ describe('Consumer/Producer', function() { consumer.consume(100000, function(err, messages) { t.ifError(err); t.equal(messages.length, 1); + consumer.unsubscribe(); done(); }); }; @@ -228,12 +249,13 @@ describe('Consumer/Producer', function() { setTimeout(function() { producer.produce(topic, null, buffer, null); - }, 500) - consumer.setDefaultConsumeTimeout(2000); + }, 500); + consumer.setDefaultConsumeTimeout(20000); consumer.consume(1000, function(err, messages) { t.ifError(err); t.equal(messages.length, 1); t.deepStrictEqual(events, ["data", "partition.eof"]); + consumer.unsubscribe(); done(); }); }); @@ -261,12 +283,13 @@ describe('Consumer/Producer', function() { setTimeout(function() { producer.produce(topic, null, buffer, null); - }, 2000) - consumer.setDefaultConsumeTimeout(3000); + }, 4000); + consumer.setDefaultConsumeTimeout(20000); consumer.consume(1000, function(err, messages) { t.ifError(err); t.equal(messages.length, 1); t.deepStrictEqual(events, ["partition.eof", "data", "partition.eof"]); + consumer.unsubscribe(); done(); }); }); @@ -276,7 +299,6 @@ describe('Consumer/Producer', function() { var key = 'key'; crypto.randomBytes(4096, function(ex, buffer) { - producer.setPollInterval(10); producer.once('delivery-report', function(err, report) { @@ -292,6 +314,7 @@ describe('Consumer/Producer', function() { t.equal(key, message.key, 'invalid message key'); t.equal(topic, message.topic, 'invalid message topic'); t.ok(message.offset >= 0, 'invalid message offset'); + consumer.unsubscribe(); done(); }); @@ -306,7 +329,6 @@ describe('Consumer/Producer', function() { }); it('should emit \'partition.eof\' events in consumeLoop', function(done) { - crypto.randomBytes(4096, function(ex, buffer) { producer.setPollInterval(10); @@ -314,7 +336,6 @@ describe('Consumer/Producer', function() { t.ifError(err); }); - var events = []; var offsets = []; @@ -337,11 +358,11 @@ describe('Consumer/Producer', function() { setTimeout(function() { producer.produce(topic, null, buffer); - }, 2000); + }, 4000); setTimeout(function() { producer.produce(topic, null, buffer); - }, 4000); + }, 6000); setTimeout(function() { t.deepStrictEqual(events, ['partition.eof', 'data', 'partition.eof', 'data', 'partition.eof']); @@ -352,8 +373,9 @@ describe('Consumer/Producer', function() { startOffset + 1, startOffset + 1, startOffset + 2 ]); + consumer.unsubscribe(); done(); - }, 6000); + }, 8000); }); }); @@ -386,16 +408,26 @@ describe('Consumer/Producer', function() { run_headers_test(done, headers); }); - it('should be able to produce and consume messages with one header value as int: consumeLoop', function(done) { + it('should be able to produce and consume messages with one header value as string with unicode: consumeLoop', function(done) { var headers = [ - { key: 10 } + { key: '10πŸ‘' }, + { key: 'こんにけは' }, + { key: '🌍🌎🌏' } ]; run_headers_test(done, headers); }); - it('should be able to produce and consume messages with one header value as float: consumeLoop', function(done) { + it('should be able to produce and consume messages with one header value as string with emojis: consumeLoop', function(done) { var headers = [ - { key: 1.11 } + { key: 'πŸ˜€πŸ˜ƒπŸ˜„πŸ˜' } + ]; + run_headers_test(done, headers); + }); + + it('should be able to produce and consume messages with one header value as string in other languages: consumeLoop', function(done) { + var headers = [ + { key: 'δ½ ε₯½' }, + { key: 'ΠŸΡ€ΠΈΠ²Π΅Ρ‚' } ]; run_headers_test(done, headers); }); @@ -422,8 +454,8 @@ describe('Consumer/Producer', function() { it('should be able to produce and consume messages with multiple headers with mixed values: consumeLoop', function(done) { var headers = [ - { key1: 'value1' }, - { key2: Buffer.from('value2') }, + { key1: 'value1' }, + { key2: Buffer.from('value2') } ]; run_headers_test(done, headers); }); @@ -440,7 +472,7 @@ describe('Consumer/Producer', function() { const buffer = Buffer.from('value'); const key = 'key'; t.throws( - () => producer.produce(topic, null, buffer, key, null, "", headerCase), + () => producer.produce(topic, null, buffer, key, null, '', headerCase), 'must be string or buffer' ); } @@ -451,7 +483,7 @@ describe('Consumer/Producer', function() { it('should be able to produce and consume messages: empty buffer key and empty value', function(done) { var emptyString = ''; var key = Buffer.from(emptyString); - var value = Buffer.from(''); + var value = Buffer.from(emptyString); producer.setPollInterval(10); @@ -459,7 +491,8 @@ describe('Consumer/Producer', function() { t.notEqual(message.value, null, 'message should not be null'); t.equal(value.toString(), message.value.toString(), 'invalid message value'); t.equal(emptyString, message.key, 'invalid message key'); - done(); + consumer.unsubscribe(); + done(); }); consumer.subscribe([topic]); @@ -480,6 +513,7 @@ describe('Consumer/Producer', function() { t.notEqual(message.value, null, 'message should not be null'); t.equal(value.toString(), message.value.toString(), 'invalid message value'); t.equal(key, message.key, 'invalid message key'); + consumer.unsubscribe(); done(); }); @@ -500,6 +534,7 @@ describe('Consumer/Producer', function() { consumer.once('data', function(message) { t.equal(value, message.value, 'invalid message value'); t.equal(key, message.key, 'invalid message key'); + consumer.unsubscribe(); done(); }); @@ -525,7 +560,7 @@ describe('Consumer/Producer', function() { beforeEach(function(done) { consumer = new Kafka.KafkaConsumer(consumerOpts, { - 'auto.offset.reset': 'largest', + 'auto.offset.reset': 'smallest', }); consumer.connect({}, function(err, d) { @@ -569,6 +604,7 @@ describe('Consumer/Producer', function() { }); consumer.subscribe([topic]); + consumer.setDefaultConsumeTimeout(4000); consumer.consume(); setTimeout(function() { @@ -612,7 +648,7 @@ describe('Consumer/Producer', function() { } }; consumer = new Kafka.KafkaConsumer(consumerOpts, { - 'auto.offset.reset': 'largest', + 'auto.offset.reset': 'smallest', }); eventListener(consumer); @@ -620,6 +656,7 @@ describe('Consumer/Producer', function() { t.ifError(err); t.equal(typeof d, 'object', 'metadata should be returned'); consumer.subscribe([topic]); + consumer.setDefaultConsumeTimeout(4000); consumer.consume(); setTimeout(function() { producer.produce(topic, null, Buffer.from(''), ''); @@ -668,6 +705,7 @@ describe('Consumer/Producer', function() { t.equal(topic, message.topic, 'invalid message topic'); t.ok(message.offset >= 0, 'invalid message offset'); assert_headers_match(headers, message.headers); + consumer.unsubscribe(); done(); }); @@ -678,8 +716,6 @@ describe('Consumer/Producer', function() { var timestamp = new Date().getTime(); producer.produce(topic, null, buffer, key, timestamp, "", headers); }, 2000); - }); } - });