@@ -12,14 +12,16 @@ var t = require('assert');
1212
1313var Kafka = require ( '../' ) ;
1414var kafkaBrokerList = process . env . KAFKA_HOST || 'localhost:9092' ;
15+ const { createTopics, deleteTopics } = require ( './topicUtils' ) ;
1516var eventListener = require ( './listener' ) ;
16- var topic = 'test' ;
17- var topic2 = 'test2' ;
17+ var topic ;
1818
1919describe ( 'Consumer/Producer' , function ( ) {
20-
2120 var producer ;
2221 var consumer ;
22+ var grp ;
23+
24+ let createdTopics = [ ] ;
2325
2426 beforeEach ( function ( done ) {
2527 var finished = 0 ;
@@ -36,12 +38,24 @@ describe('Consumer/Producer', function() {
3638 return done ( err ) ;
3739 }
3840
39- if ( finished === 2 ) {
41+ if ( finished === 3 ) {
4042 done ( ) ;
4143 }
4244 }
4345
44- var grp = 'kafka-mocha-grp-' + crypto . randomBytes ( 20 ) . toString ( 'hex' ) ;
46+ grp = 'kafka-mocha-grp-' + crypto . randomBytes ( 20 ) . toString ( 'hex' ) ;
47+ topic = 'test' + crypto . randomBytes ( 20 ) . toString ( 'hex' ) ;
48+
49+ createTopics (
50+ [ { topic, num_partitions : 1 , replication_factor : 1 } ] ,
51+ kafkaBrokerList ,
52+ function ( err ) {
53+ t . ifError ( err ) ;
54+ maybeDone ( err ) ;
55+ }
56+ ) ;
57+
58+ createdTopics . push ( topic ) ;
4559
4660 consumer = new Kafka . KafkaConsumer ( {
4761 'metadata.broker.list' : kafkaBrokerList ,
@@ -53,7 +67,7 @@ describe('Consumer/Producer', function() {
5367 'debug' : 'all'
5468 // paused: true,
5569 } , {
56- 'auto.offset.reset' : 'largest '
70+ 'auto.offset.reset' : 'smallest '
5771 } ) ;
5872
5973 consumer . connect ( { } , function ( err , d ) {
@@ -99,16 +113,21 @@ describe('Consumer/Producer', function() {
99113 return done ( err ) ;
100114 }
101115
102- if ( finished === 2 ) {
116+ if ( finished === 3 ) {
103117 done ( ) ;
104118 }
105119 }
106120
107- consumer . disconnect ( function ( err ) {
121+ producer . disconnect ( function ( err ) {
108122 maybeDone ( err ) ;
109123 } ) ;
110124
111- producer . disconnect ( function ( err ) {
125+ deleteTopics ( createdTopics , kafkaBrokerList , function ( err ) {
126+ createdTopics . length = 0 ;
127+ maybeDone ( err ) ;
128+ } ) ;
129+
130+ consumer . disconnect ( function ( err ) {
112131 maybeDone ( err ) ;
113132 } ) ;
114133 } ) ;
@@ -154,6 +173,7 @@ describe('Consumer/Producer', function() {
154173 t . equal ( position . length , 1 ) ;
155174 t . deepStrictEqual ( position [ 0 ] . partition , 0 ) ;
156175 t . ok ( position [ 0 ] . offset >= 0 ) ;
176+ consumer . unsubscribe ( ) ;
157177 done ( ) ;
158178 } ) ;
159179 } ;
@@ -180,6 +200,7 @@ describe('Consumer/Producer', function() {
180200 consumer . consume ( 100000 , function ( err , messages ) {
181201 t . ifError ( err ) ;
182202 t . equal ( messages . length , 1 ) ;
203+ consumer . unsubscribe ( ) ;
183204 done ( ) ;
184205 } ) ;
185206 } ;
@@ -228,12 +249,13 @@ describe('Consumer/Producer', function() {
228249
229250 setTimeout ( function ( ) {
230251 producer . produce ( topic , null , buffer , null ) ;
231- } , 500 )
232- consumer . setDefaultConsumeTimeout ( 2000 ) ;
252+ } , 500 ) ;
253+ consumer . setDefaultConsumeTimeout ( 20000 ) ;
233254 consumer . consume ( 1000 , function ( err , messages ) {
234255 t . ifError ( err ) ;
235256 t . equal ( messages . length , 1 ) ;
236257 t . deepStrictEqual ( events , [ "data" , "partition.eof" ] ) ;
258+ consumer . unsubscribe ( ) ;
237259 done ( ) ;
238260 } ) ;
239261 } ) ;
@@ -261,12 +283,13 @@ describe('Consumer/Producer', function() {
261283
262284 setTimeout ( function ( ) {
263285 producer . produce ( topic , null , buffer , null ) ;
264- } , 2000 )
265- consumer . setDefaultConsumeTimeout ( 3000 ) ;
286+ } , 4000 ) ;
287+ consumer . setDefaultConsumeTimeout ( 20000 ) ;
266288 consumer . consume ( 1000 , function ( err , messages ) {
267289 t . ifError ( err ) ;
268290 t . equal ( messages . length , 1 ) ;
269291 t . deepStrictEqual ( events , [ "partition.eof" , "data" , "partition.eof" ] ) ;
292+ consumer . unsubscribe ( ) ;
270293 done ( ) ;
271294 } ) ;
272295 } ) ;
@@ -276,7 +299,6 @@ describe('Consumer/Producer', function() {
276299 var key = 'key' ;
277300
278301 crypto . randomBytes ( 4096 , function ( ex , buffer ) {
279-
280302 producer . setPollInterval ( 10 ) ;
281303
282304 producer . once ( 'delivery-report' , function ( err , report ) {
@@ -292,6 +314,7 @@ describe('Consumer/Producer', function() {
292314 t . equal ( key , message . key , 'invalid message key' ) ;
293315 t . equal ( topic , message . topic , 'invalid message topic' ) ;
294316 t . ok ( message . offset >= 0 , 'invalid message offset' ) ;
317+ consumer . unsubscribe ( ) ;
295318 done ( ) ;
296319 } ) ;
297320
@@ -306,15 +329,13 @@ describe('Consumer/Producer', function() {
306329 } ) ;
307330
308331 it ( 'should emit \'partition.eof\' events in consumeLoop' , function ( done ) {
309-
310332 crypto . randomBytes ( 4096 , function ( ex , buffer ) {
311333 producer . setPollInterval ( 10 ) ;
312334
313335 producer . once ( 'delivery-report' , function ( err , report ) {
314336 t . ifError ( err ) ;
315337 } ) ;
316338
317-
318339 var events = [ ] ;
319340 var offsets = [ ] ;
320341
@@ -337,11 +358,11 @@ describe('Consumer/Producer', function() {
337358
338359 setTimeout ( function ( ) {
339360 producer . produce ( topic , null , buffer ) ;
340- } , 2000 ) ;
361+ } , 4000 ) ;
341362
342363 setTimeout ( function ( ) {
343364 producer . produce ( topic , null , buffer ) ;
344- } , 4000 ) ;
365+ } , 6000 ) ;
345366
346367 setTimeout ( function ( ) {
347368 t . deepStrictEqual ( events , [ 'partition.eof' , 'data' , 'partition.eof' , 'data' , 'partition.eof' ] ) ;
@@ -352,8 +373,9 @@ describe('Consumer/Producer', function() {
352373 startOffset + 1 ,
353374 startOffset + 1 ,
354375 startOffset + 2 ] ) ;
376+ consumer . unsubscribe ( ) ;
355377 done ( ) ;
356- } , 6000 ) ;
378+ } , 8000 ) ;
357379 } ) ;
358380 } ) ;
359381
@@ -386,16 +408,26 @@ describe('Consumer/Producer', function() {
386408 run_headers_test ( done , headers ) ;
387409 } ) ;
388410
389- it ( 'should be able to produce and consume messages with one header value as int : consumeLoop' , function ( done ) {
411+ it ( 'should be able to produce and consume messages with one header value as string with unicode : consumeLoop' , function ( done ) {
390412 var headers = [
391- { key : 10 }
413+ { key : '10👍' } ,
414+ { key : 'こんにちは' } ,
415+ { key : '🌍🌎🌏' }
392416 ] ;
393417 run_headers_test ( done , headers ) ;
394418 } ) ;
395419
396- it ( 'should be able to produce and consume messages with one header value as float : consumeLoop' , function ( done ) {
420+ it ( 'should be able to produce and consume messages with one header value as string with emojis : consumeLoop' , function ( done ) {
397421 var headers = [
398- { key : 1.11 }
422+ { key : '😀😃😄😁' }
423+ ] ;
424+ run_headers_test ( done , headers ) ;
425+ } ) ;
426+
427+ it ( 'should be able to produce and consume messages with one header value as string in other languages: consumeLoop' , function ( done ) {
428+ var headers = [
429+ { key : '你好' } ,
430+ { key : 'Привет' }
399431 ] ;
400432 run_headers_test ( done , headers ) ;
401433 } ) ;
@@ -422,8 +454,8 @@ describe('Consumer/Producer', function() {
422454
423455 it ( 'should be able to produce and consume messages with multiple headers with mixed values: consumeLoop' , function ( done ) {
424456 var headers = [
425- { key1 : 'value1' } ,
426- { key2 : Buffer . from ( 'value2' ) } ,
457+ { key1 : 'value1' } ,
458+ { key2 : Buffer . from ( 'value2' ) }
427459 ] ;
428460 run_headers_test ( done , headers ) ;
429461 } ) ;
@@ -440,7 +472,7 @@ describe('Consumer/Producer', function() {
440472 const buffer = Buffer . from ( 'value' ) ;
441473 const key = 'key' ;
442474 t . throws (
443- ( ) => producer . produce ( topic , null , buffer , key , null , "" , headerCase ) ,
475+ ( ) => producer . produce ( topic , null , buffer , key , null , '' , headerCase ) ,
444476 'must be string or buffer'
445477 ) ;
446478 }
@@ -451,15 +483,16 @@ describe('Consumer/Producer', function() {
451483 it ( 'should be able to produce and consume messages: empty buffer key and empty value' , function ( done ) {
452484 var emptyString = '' ;
453485 var key = Buffer . from ( emptyString ) ;
454- var value = Buffer . from ( '' ) ;
486+ var value = Buffer . from ( emptyString ) ;
455487
456488 producer . setPollInterval ( 10 ) ;
457489
458490 consumer . once ( 'data' , function ( message ) {
459491 t . notEqual ( message . value , null , 'message should not be null' ) ;
460492 t . equal ( value . toString ( ) , message . value . toString ( ) , 'invalid message value' ) ;
461493 t . equal ( emptyString , message . key , 'invalid message key' ) ;
462- done ( ) ;
494+ consumer . unsubscribe ( ) ;
495+ done ( ) ;
463496 } ) ;
464497
465498 consumer . subscribe ( [ topic ] ) ;
@@ -480,6 +513,7 @@ describe('Consumer/Producer', function() {
480513 t . notEqual ( message . value , null , 'message should not be null' ) ;
481514 t . equal ( value . toString ( ) , message . value . toString ( ) , 'invalid message value' ) ;
482515 t . equal ( key , message . key , 'invalid message key' ) ;
516+ consumer . unsubscribe ( ) ;
483517 done ( ) ;
484518 } ) ;
485519
@@ -500,6 +534,7 @@ describe('Consumer/Producer', function() {
500534 consumer . once ( 'data' , function ( message ) {
501535 t . equal ( value , message . value , 'invalid message value' ) ;
502536 t . equal ( key , message . key , 'invalid message key' ) ;
537+ consumer . unsubscribe ( ) ;
503538 done ( ) ;
504539 } ) ;
505540
@@ -525,7 +560,7 @@ describe('Consumer/Producer', function() {
525560
526561 beforeEach ( function ( done ) {
527562 consumer = new Kafka . KafkaConsumer ( consumerOpts , {
528- 'auto.offset.reset' : 'largest ' ,
563+ 'auto.offset.reset' : 'smallest ' ,
529564 } ) ;
530565
531566 consumer . connect ( { } , function ( err , d ) {
@@ -569,6 +604,7 @@ describe('Consumer/Producer', function() {
569604 } ) ;
570605
571606 consumer . subscribe ( [ topic ] ) ;
607+ consumer . setDefaultConsumeTimeout ( 4000 ) ;
572608 consumer . consume ( ) ;
573609
574610 setTimeout ( function ( ) {
@@ -612,14 +648,15 @@ describe('Consumer/Producer', function() {
612648 }
613649 } ;
614650 consumer = new Kafka . KafkaConsumer ( consumerOpts , {
615- 'auto.offset.reset' : 'largest ' ,
651+ 'auto.offset.reset' : 'smallest ' ,
616652 } ) ;
617653 eventListener ( consumer ) ;
618654
619655 consumer . connect ( { } , function ( err , d ) {
620656 t . ifError ( err ) ;
621657 t . equal ( typeof d , 'object' , 'metadata should be returned' ) ;
622658 consumer . subscribe ( [ topic ] ) ;
659+ consumer . setDefaultConsumeTimeout ( 4000 ) ;
623660 consumer . consume ( ) ;
624661 setTimeout ( function ( ) {
625662 producer . produce ( topic , null , Buffer . from ( '' ) , '' ) ;
@@ -668,6 +705,7 @@ describe('Consumer/Producer', function() {
668705 t . equal ( topic , message . topic , 'invalid message topic' ) ;
669706 t . ok ( message . offset >= 0 , 'invalid message offset' ) ;
670707 assert_headers_match ( headers , message . headers ) ;
708+ consumer . unsubscribe ( ) ;
671709 done ( ) ;
672710 } ) ;
673711
@@ -678,8 +716,6 @@ describe('Consumer/Producer', function() {
678716 var timestamp = new Date ( ) . getTime ( ) ;
679717 producer . produce ( topic , null , buffer , key , timestamp , "" , headers ) ;
680718 } , 2000 ) ;
681-
682719 } ) ;
683720 }
684-
685721} ) ;
0 commit comments