@@ -90,7 +90,6 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
9090 } ) ;
9191
9292 it ( 'is cleared on seek' , async ( ) => {
93- const producer = createProducer ( { } , { 'batch.num.messages' : '1' } ) ;
9493 await consumer . connect ( ) ;
9594 await producer . connect ( ) ;
9695 await consumer . subscribe ( { topic : topicName } ) ;
@@ -135,8 +134,6 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
135134 expect ( messagesConsumed . filter ( m => m . partition === 1 ) . map ( m => m . message . offset ) ) . toEqual ( Array ( 1024 * 3 ) . fill ( ) . map ( ( _ , i ) => `${ i } ` ) ) ;
136135 // partition 2
137136 expect ( messagesConsumed . filter ( m => m . partition === 2 ) . map ( m => m . message . offset ) ) . toEqual ( Array ( 1024 * 3 ) . fill ( ) . map ( ( _ , i ) => `${ i } ` ) ) ;
138-
139- await producer . disconnect ( ) ;
140137 } ) ;
141138
142139 it ( 'is cleared before rebalance' , async ( ) => {
@@ -145,7 +142,7 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
145142 * the consumers are created with the same groupId, we create them here.
146143 * TODO: verify correctness of theory. It's conjecture... which solves flakiness. */
147144 let groupId = `consumer-group-id-${ secureRandom ( ) } ` ;
148- const multiplier = 9 ;
145+ const multiplier = 18 ;
149146 const numMessages = 16 * multiplier ;
150147 consumer = createConsumer ( {
151148 groupId,
@@ -161,7 +158,6 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
161158 autoCommit : isAutoCommit ,
162159 clientId : "consumer2" ,
163160 } ) ;
164- const producer = createProducer ( { } , { 'batch.num.messages' : '1' } ) ;
165161
166162 await consumer . connect ( ) ;
167163 await producer . connect ( ) ;
@@ -181,11 +177,14 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
181177 { topic : event . topic , partition : event . partition , offset : Number ( event . message . offset ) + 1 } ,
182178 ] ) ;
183179
184- await sleep ( 100 ) ;
180+ // Simulate some processing time so we don't poll all messages
181+ // and put them in the cache before consumer2 joins.
182+ if ( messagesConsumedConsumer2 . length === 0 )
183+ await sleep ( 100 ) ;
185184 }
186185 } ) ;
187186
188- /* Evenly distribute 1024*9 messages across 3 partitions */
187+ /* Evenly distribute numMessages messages across 3 partitions */
189188 let i = 0 ;
190189 const messages = Array ( numMessages )
191190 . fill ( )
@@ -226,7 +225,6 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
226225 expect ( messagesConsumedConsumer2 . length ) . toBeGreaterThan ( 0 ) ;
227226
228227 await consumer2 . disconnect ( ) ;
229- await producer . disconnect ( ) ;
230228 } , 60000 ) ;
231229
232230 it ( 'does not hold up polling for non-message events' , async ( ) => {
0 commit comments