8686 * @author Martin Dam
8787 * @author Artem Bilan
8888 * @author Loic Talhouarne
89+ * @author Tom van den Berge
8990 */
9091public class KafkaMessageListenerContainerTests {
9192
@@ -121,9 +122,13 @@ public class KafkaMessageListenerContainerTests {
121122
122123 private static String topic17 = "testTopic17" ;
123124
125+ private static String topic18 = "testTopic18" ;
126+
127+
124128 @ ClassRule
125129 public static KafkaEmbedded embeddedKafka = new KafkaEmbedded (1 , true , topic3 , topic4 , topic5 ,
126- topic6 , topic7 , topic8 , topic9 , topic10 , topic11 , topic12 , topic13 , topic14 , topic15 , topic16 , topic17 );
130+ topic6 , topic7 , topic8 , topic9 , topic10 , topic11 , topic12 , topic13 , topic14 , topic15 , topic16 , topic17 ,
131+ topic18 );
127132
128133 @ Rule
129134 public TestName testName = new TestName ();
@@ -1435,6 +1440,101 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
14351440 logger .info ("Stop manual ack rebalance" );
14361441 }
14371442
1443+ @ Test
1444+ public void testRebalanceAfterFailedRecord () throws Exception {
1445+ logger .info ("Start rebalance after failed record" );
1446+ Map <String , Object > props = KafkaTestUtils .consumerProps ("test18" , "false" , embeddedKafka );
1447+ DefaultKafkaConsumerFactory <Integer , String > cf = new DefaultKafkaConsumerFactory <>(props );
1448+ ContainerProperties containerProps = new ContainerProperties (topic18 );
1449+ final List <AtomicInteger > counts = new ArrayList <>();
1450+ counts .add (new AtomicInteger ());
1451+ counts .add (new AtomicInteger ());
1452+ containerProps .setMessageListener (new MessageListener <Integer , String >() {
1453+
1454+ @ Override
1455+ public void onMessage (ConsumerRecord <Integer , String > message ) {
1456+ // The 1st message per partition fails
1457+ if (counts .get (message .partition ()).incrementAndGet () < 2 ) {
1458+ throw new RuntimeException ("Failure wile processing message" );
1459+ }
1460+ }
1461+ });
1462+ containerProps .setSyncCommits (true );
1463+ containerProps .setAckMode (AckMode .RECORD );
1464+ final CountDownLatch rebalanceLatch = new CountDownLatch (2 );
1465+ containerProps .setConsumerRebalanceListener (new ConsumerRebalanceListener () {
1466+
1467+ @ Override
1468+ public void onPartitionsRevoked (Collection <TopicPartition > partitions ) {
1469+ }
1470+
1471+ @ Override
1472+ public void onPartitionsAssigned (Collection <TopicPartition > partitions ) {
1473+ logger .info ("manual ack: assigned " + partitions );
1474+ rebalanceLatch .countDown ();
1475+ }
1476+ });
1477+
1478+ CountDownLatch stubbingComplete1 = new CountDownLatch (1 );
1479+ KafkaMessageListenerContainer <Integer , String > container1 =
1480+ spyOnContainer (new KafkaMessageListenerContainer <>(cf , containerProps ), stubbingComplete1 );
1481+ container1 .setBeanName ("testRebalanceAfterFailedRecord" );
1482+ container1 .start ();
1483+ Consumer <?, ?> containerConsumer = spyOnConsumer (container1 );
1484+ final CountDownLatch commitLatch = new CountDownLatch (2 );
1485+ willAnswer (invocation -> {
1486+
1487+ @ SuppressWarnings ({ "unchecked" })
1488+ Map <TopicPartition , OffsetAndMetadata > map = invocation .getArgument (0 );
1489+ try {
1490+ return invocation .callRealMethod ();
1491+ }
1492+ finally {
1493+ for (Entry <TopicPartition , OffsetAndMetadata > entry : map .entrySet ()) {
1494+ // Decrement when the last (successful) has been committed
1495+ if (entry .getValue ().offset () == 2 ) {
1496+ commitLatch .countDown ();
1497+ }
1498+ }
1499+ }
1500+
1501+ }).given (containerConsumer ).commitSync (any ());
1502+ stubbingComplete1 .countDown ();
1503+ ContainerTestUtils .waitForAssignment (container1 , embeddedKafka .getPartitionsPerTopic ());
1504+
1505+ Map <String , Object > senderProps = KafkaTestUtils .producerProps (embeddedKafka );
1506+ ProducerFactory <Integer , String > pf = new DefaultKafkaProducerFactory <>(senderProps );
1507+ KafkaTemplate <Integer , String > template = new KafkaTemplate <>(pf );
1508+ template .setDefaultTopic (topic18 );
1509+ template .sendDefault (0 , 0 , "foo" );
1510+ template .sendDefault (1 , 0 , "baz" );
1511+ template .sendDefault (0 , 0 , "bar" );
1512+ template .sendDefault (1 , 0 , "qux" );
1513+ template .flush ();
1514+
1515+ // Wait until both partitions have committed offset 2 (i.e. the last message)
1516+ assertThat (commitLatch .await (30 , TimeUnit .SECONDS )).isTrue ();
1517+
1518+ // Start a 2nd consumer, triggering a rebalance
1519+ KafkaMessageListenerContainer <Integer , String > container2 =
1520+ new KafkaMessageListenerContainer <>(cf , containerProps );
1521+ container2 .setBeanName ("testRebalanceAfterFailedRecord2" );
1522+ container2 .start ();
1523+ // Wait until both consumers have finished rebalancing
1524+ assertThat (rebalanceLatch .await (60 , TimeUnit .SECONDS )).isTrue ();
1525+
1526+ // Stop both consumers
1527+ container1 .stop ();
1528+ container2 .stop ();
1529+ Consumer <Integer , String > consumer = cf .createConsumer ();
1530+ consumer .assign (Arrays .asList (new TopicPartition (topic18 , 0 ), new TopicPartition (topic18 , 1 )));
1531+
1532+ // Verify that offset of both partitions is the highest committed offset
1533+ assertThat (consumer .position (new TopicPartition (topic18 , 0 ))).isEqualTo (2 );
1534+ assertThat (consumer .position (new TopicPartition (topic18 , 1 ))).isEqualTo (2 );
1535+ consumer .close ();
1536+ logger .info ("Stop rebalance after failed record" );
1537+ }
14381538 private Consumer <?, ?> spyOnConsumer (KafkaMessageListenerContainer <Integer , String > container ) {
14391539 Consumer <?, ?> consumer = spy (
14401540 KafkaTestUtils .getPropertyValue (container , "listenerConsumer.consumer" , Consumer .class ));
@@ -1443,8 +1543,10 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
14431543 return consumer ;
14441544 }
14451545
1446- private KafkaMessageListenerContainer <Integer , String > spyOnContainer (KafkaMessageListenerContainer <Integer , String > container ,
1546+ private KafkaMessageListenerContainer <Integer , String > spyOnContainer (
1547+ KafkaMessageListenerContainer <Integer , String > container ,
14471548 final CountDownLatch stubbingComplete ) {
1549+
14481550 KafkaMessageListenerContainer <Integer , String > spy = spy (container );
14491551 willAnswer (i -> {
14501552 if (stubbingComplete .getCount () > 0 && Thread .currentThread ().getName ().endsWith ("-C-1" )) {
0 commit comments