8484 * @author Martin Dam 
8585 * @author Artem Bilan 
8686 * @author Loic Talhouarne 
87+  * @author Tom van den Berge 
8788 */ 
8889public  class  KafkaMessageListenerContainerTests  {
8990
@@ -115,9 +116,11 @@ public class KafkaMessageListenerContainerTests {
115116
116117	private  static  String  topic17  = "testTopic17" ;
117118
119+ 	private  static  String  topic18  = "testTopic18" ;
120+ 
118121	@ ClassRule 
119122	public  static  KafkaEmbedded  embeddedKafka  = new  KafkaEmbedded (1 , true , topic5 ,
120- 			topic6 , topic7 , topic8 , topic9 , topic10 , topic11 , topic12 , topic13 , topic14 , topic15 , topic16 , topic17 );
123+ 			topic6 , topic7 , topic8 , topic9 , topic10 , topic11 , topic12 , topic13 , topic14 , topic15 , topic16 , topic17 ,  topic18 );
121124
122125	@ Rule 
123126	public  TestName  testName  = new  TestName ();
@@ -1225,6 +1228,102 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
12251228		logger .info ("Stop manual ack rebalance" );
12261229	}
12271230
1231+ 	@ Test 
1232+ 	public  void  testRebalanceAfterFailedRecord () throws  Exception  {
1233+ 		logger .info ("Start rebalance after failed record" );
1234+ 		Map <String , Object > props  = KafkaTestUtils .consumerProps ("test18" , "false" , embeddedKafka );
1235+ 		DefaultKafkaConsumerFactory <Integer , String > cf  = new  DefaultKafkaConsumerFactory <>(props );
1236+ 		ContainerProperties  containerProps  = new  ContainerProperties (topic18 );
1237+ 		final  List <AtomicInteger > counts  = new  ArrayList <>();
1238+ 		counts .add (new  AtomicInteger ());
1239+ 		counts .add (new  AtomicInteger ());
1240+ 		containerProps .setMessageListener (new  MessageListener <Integer , String >() {
1241+ 
1242+ 			@ Override 
1243+ 			public  void  onMessage (ConsumerRecord <Integer , String > message ) {
1244+ 				// The 1st message per partition fails 
1245+ 				if  (counts .get (message .partition ()).incrementAndGet () < 2 ) {
1246+ 					throw  new  RuntimeException ("Failure wile processing message" );
1247+ 				}
1248+ 			}
1249+ 		});
1250+ 		containerProps .setSyncCommits (true );
1251+ 		containerProps .setAckMode (AckMode .RECORD );
1252+ 		final  CountDownLatch  rebalanceLatch  = new  CountDownLatch (2 );
1253+ 		containerProps .setConsumerRebalanceListener (new  ConsumerRebalanceListener () {
1254+ 
1255+ 			@ Override 
1256+ 			public  void  onPartitionsRevoked (Collection <TopicPartition > partitions ) {
1257+ 			}
1258+ 
1259+ 			@ Override 
1260+ 			public  void  onPartitionsAssigned (Collection <TopicPartition > partitions ) {
1261+ 				logger .info ("manual ack: assigned "  + partitions );
1262+ 				rebalanceLatch .countDown ();
1263+ 			}
1264+ 		});
1265+ 
1266+ 		CountDownLatch  stubbingComplete1  = new  CountDownLatch (1 );
1267+ 		KafkaMessageListenerContainer <Integer , String > container1  =
1268+ 				spyOnContainer (new  KafkaMessageListenerContainer <>(cf , containerProps ), stubbingComplete1 );
1269+ 		container1 .setBeanName ("testRebalanceAfterFailedRecord" );
1270+ 		container1 .start ();
1271+ 		Consumer <?, ?> containerConsumer  = spyOnConsumer (container1 );
1272+ 		final  CountDownLatch  commitLatch  = new  CountDownLatch (2 );
1273+ 		willAnswer (invocation  -> {
1274+ 
1275+ 			@ SuppressWarnings ({ "unchecked"  })
1276+ 			Map <TopicPartition , OffsetAndMetadata > map  = invocation .getArgumentAt (0 , Map .class );
1277+ 			try  {
1278+ 				return  invocation .callRealMethod ();
1279+ 			}
1280+ 			finally  {
1281+ 				for  (Entry <TopicPartition , OffsetAndMetadata > entry  : map .entrySet ()) {
1282+ 					// Decrement when the last (successful) has been committed 
1283+ 					if  (entry .getValue ().offset () == 2 ) {
1284+ 						commitLatch .countDown ();
1285+ 					}
1286+ 				}
1287+ 			}
1288+ 
1289+ 		}).given (containerConsumer ).commitSync (any ());
1290+ 		stubbingComplete1 .countDown ();
1291+ 		ContainerTestUtils .waitForAssignment (container1 , embeddedKafka .getPartitionsPerTopic ());
1292+ 
1293+ 		Map <String , Object > senderProps  = KafkaTestUtils .producerProps (embeddedKafka );
1294+ 		ProducerFactory <Integer , String > pf  = new  DefaultKafkaProducerFactory <>(senderProps );
1295+ 		KafkaTemplate <Integer , String > template  = new  KafkaTemplate <>(pf );
1296+ 		template .setDefaultTopic (topic18 );
1297+ 		template .sendDefault (0 , 0 , "foo" );
1298+ 		template .sendDefault (1 , 0 , "baz" );
1299+ 		template .sendDefault (0 , 0 , "bar" );
1300+ 		template .sendDefault (1 , 0 , "qux" );
1301+ 		template .flush ();
1302+ 
1303+ 		// Wait until both partitions have committed offset 2 (i.e. the last message) 
1304+ 		assertThat (commitLatch .await (30 , TimeUnit .SECONDS )).isTrue ();
1305+ 
1306+ 		// Start a 2nd consumer, triggering a rebalance 
1307+ 		KafkaMessageListenerContainer <Integer , String > container2  =
1308+ 				new  KafkaMessageListenerContainer <>(cf , containerProps );
1309+ 		container2 .setBeanName ("testRebalanceAfterFailedRecord2" );
1310+ 		container2 .start ();
1311+ 		// Wait until both consumers have finished rebalancing 
1312+ 		assertThat (rebalanceLatch .await (60 , TimeUnit .SECONDS )).isTrue ();
1313+ 
1314+ 		// Stop both consumers 
1315+ 		container1 .stop ();
1316+ 		container2 .stop ();
1317+ 		Consumer <Integer , String > consumer  = cf .createConsumer ();
1318+ 		consumer .assign (Arrays .asList (new  TopicPartition (topic18 , 0 ), new  TopicPartition (topic18 , 1 )));
1319+ 
1320+ 		// Verify that offset of both partitions is the highest committed offset 
1321+ 		assertThat (consumer .position (new  TopicPartition (topic18 , 0 ))).isEqualTo (2 );
1322+ 		assertThat (consumer .position (new  TopicPartition (topic18 , 1 ))).isEqualTo (2 );
1323+ 		consumer .close ();
1324+ 		logger .info ("Stop rebalance after failed record" );
1325+ 	}
1326+ 
12281327	private  Consumer <?, ?> spyOnConsumer (KafkaMessageListenerContainer <Integer , String > container ) {
12291328		Consumer <?, ?> consumer  = spy (
12301329				KafkaTestUtils .getPropertyValue (container , "listenerConsumer.consumer" , Consumer .class ));
@@ -1233,8 +1332,10 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
12331332		return  consumer ;
12341333	}
12351334
1236- 	private  KafkaMessageListenerContainer <Integer , String > spyOnContainer (KafkaMessageListenerContainer <Integer , String > container ,
1335+ 	private  KafkaMessageListenerContainer <Integer , String > spyOnContainer (
1336+ 			KafkaMessageListenerContainer <Integer , String > container ,
12371337			final  CountDownLatch  stubbingComplete ) {
1338+ 
12381339		KafkaMessageListenerContainer <Integer , String > spy  = spy (container );
12391340		willAnswer (i  -> {
12401341			if  (stubbingComplete .getCount () > 0  && Thread .currentThread ().getName ().endsWith ("-C-1" )) {
0 commit comments