@@ -3,7 +3,7 @@ const admin = require('../../lib/admin');
33
44const bootstrapServers = 'localhost:9092' ;
55
6- function adminFromProducer ( ) {
6+ function adminFromProducer ( callback ) {
77 const producer = new Kafka . Producer ( {
88 'bootstrap.servers' : bootstrapServers ,
99 'dr_msg_cb' : true ,
@@ -60,15 +60,13 @@ function adminFromProducer() {
6060
6161 producer . on ( 'event.error' , ( err ) => {
6262 console . error ( err ) ;
63+ producer . disconnect ( callback ) ;
6364 } ) ;
6465
6566 producer . on ( 'delivery-report' , ( err , report ) => {
6667 console . log ( "Delivery report received:" , report ) ;
68+ producer . disconnect ( callback ) ;
6769 } ) ;
68-
69- setTimeout ( ( ) => {
70- producer . disconnect ( ) ;
71- } , 30000 ) ;
7270}
7371
7472function adminFromConsumer ( ) {
@@ -108,17 +106,15 @@ function adminFromConsumer() {
108106 } ) ;
109107
110108 consumer . on ( 'data' , ( data ) => {
109+ // Quit after receiving a message.
111110 console . log ( "Consumer:data" , data ) ;
111+ consumer . disconnect ( ) ;
112112 } ) ;
113113
114114 consumer . on ( 'event.error' , ( err ) => {
115115 console . error ( "Consumer:error" , err ) ;
116- } ) ;
117-
118- setTimeout ( ( ) => {
119116 consumer . disconnect ( ) ;
120- } , 30000 ) ;
117+ } ) ;
121118}
122119
123- adminFromProducer ( ) ;
124- setTimeout ( ( ) => adminFromConsumer ( ) , 35000 ) ;
120+ adminFromProducer ( ( ) => adminFromConsumer ( ) ) ;
0 commit comments