@@ -3,7 +3,7 @@ const admin = require('../../lib/admin');
33
44const  bootstrapServers  =  'localhost:9092' ; 
55
6- function  adminFromProducer ( )  { 
6+ function  adminFromProducer ( callback )  { 
77    const  producer  =  new  Kafka . Producer ( { 
88        'bootstrap.servers' : bootstrapServers , 
99        'dr_msg_cb' : true , 
@@ -60,15 +60,13 @@ function adminFromProducer() {
6060
6161    producer . on ( 'event.error' ,  ( err )  =>  { 
6262        console . error ( err ) ; 
63+         producer . disconnect ( callback ) ; 
6364    } ) ; 
6465
6566    producer . on ( 'delivery-report' ,  ( err ,  report )  =>  { 
6667        console . log ( "Delivery report received:" ,  report ) ; 
68+         producer . disconnect ( callback ) ; 
6769    } ) ; 
68- 
69-     setTimeout ( ( )  =>  { 
70-         producer . disconnect ( ) ; 
71-     } ,  30000 ) ; 
7270} 
7371
7472function  adminFromConsumer ( )  { 
@@ -108,17 +106,15 @@ function adminFromConsumer() {
108106    } ) ; 
109107
110108    consumer . on ( 'data' ,  ( data )  =>  { 
109+         // Quit after receiving a message. 
111110        console . log ( "Consumer:data" ,  data ) ; 
111+         consumer . disconnect ( ) ; 
112112    } ) ; 
113113
114114    consumer . on ( 'event.error' ,  ( err )  =>  { 
115115        console . error ( "Consumer:error" ,  err ) ; 
116-     } ) ; 
117- 
118-     setTimeout ( ( )  =>  { 
119116        consumer . disconnect ( ) ; 
120-     } ,   30000 ) ; 
117+     } ) ; 
121118} 
122119
123- adminFromProducer ( ) ; 
124- setTimeout ( ( )  =>  adminFromConsumer ( ) ,  35000 ) ; 
120+ adminFromProducer ( ( )  =>  adminFromConsumer ( ) ) ; 
0 commit comments