-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.1.3
Describe the bug
This is the sequence of events observed:
- Consumption of a batch of records is acknowledged but offset commit fails due to rebalance in progress - the offsets are saved to be able to retry.
- Commit of the saved offsets is retried during the next poll but fails again due to rebalance still in progress - the same offsets are again saved to be able to retry. (This step shouldn't matter but we always see it so including for completeness)
- Consumption of a subsequent batch of records is acknowledged and now offset commit is successful - the offsets are higher than the original ones.
- One or more partitions are revoked from the consumer, which triggers it to retry commit of its saved offsets.
This means that the last committed offsets are not the highest committed offsets, which causes the other consumer that has taken over the revoked partitions to reconsume the records in those partitions that have a higher offset than the last committed offset.
Log messages from a test run when this happened:
2024-04-10 08:37:21,370 DEBUG [noBeanNameSet-2-C-1] o.s.k.l.KafkaMessageListenerContainer [LogAccessor.java:313] Committing: {messages.xms.mt.output-2=OffsetAndMetadata{offset=139259990, leaderEpoch=null, metadata=''}, messages.xms.mt.output-5=OffsetAndMetadata{offset=118910413, leaderEpoch=null, metadata=''}, messages.xms.mt.output-11=OffsetAndMetadata{offset=69789114, leaderEpoch=null, metadata=''}, messages.xms.mt.output-8=OffsetAndMetadata{offset=63630534, leaderEpoch=null, metadata=''}}
2024-04-10 08:37:21,371 DEBUG [noBeanNameSet-2-C-1] o.s.k.l.KafkaMessageListenerContainer [LogAccessor.java:200] Non-fatal commit failure
2024-04-10 08:37:21,372 DEBUG [noBeanNameSet-2-C-1] o.s.k.l.KafkaMessageListenerContainer [LogAccessor.java:313] Commit list: {messages.xms.mt.output-2=OffsetAndMetadata{offset=139259990, leaderEpoch=null, metadata=''}, messages.xms.mt.output-5=OffsetAndMetadata{offset=118910413, leaderEpoch=null, metadata=''}, messages.xms.mt.output-11=OffsetAndMetadata{offset=69789114, leaderEpoch=null, metadata=''}, messages.xms.mt.output-8=OffsetAndMetadata{offset=63630534, leaderEpoch=null, metadata=''}}
2024-04-10 08:37:21,372 DEBUG [noBeanNameSet-2-C-1] o.s.k.l.KafkaMessageListenerContainer [LogAccessor.java:200] Non-fatal commit failure
2024-04-10 08:37:22,655 DEBUG [noBeanNameSet-2-C-1] o.s.k.l.KafkaMessageListenerContainer [LogAccessor.java:313] Committing: {messages.xms.mt.output-2=OffsetAndMetadata{offset=139260117, leaderEpoch=null, metadata=''}, messages.xms.mt.output-5=OffsetAndMetadata{offset=118910501, leaderEpoch=null, metadata=''}, messages.xms.mt.output-11=OffsetAndMetadata{offset=69789242, leaderEpoch=null, metadata=''}, messages.xms.mt.output-8=OffsetAndMetadata{offset=63630621, leaderEpoch=null, metadata=''}}
2024-04-10 08:37:22,659 INFO [noBeanNameSet-2-C-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:343] [Consumer clientId=consumer-messages.xms.mt.output-3, groupId=messages.xms.mt.output] Revoke previously assigned partitions messages.xms.mt.output-8, messages.xms.mt.output-11
2024-04-10 08:37:22,659 DEBUG [noBeanNameSet-2-C-1] o.s.k.l.KafkaMessageListenerContainer [LogAccessor.java:313] Commit list: {messages.xms.mt.output-2=OffsetAndMetadata{offset=139259990, leaderEpoch=null, metadata=''}, messages.xms.mt.output-5=OffsetAndMetadata{offset=118910413, leaderEpoch=null, metadata=''}, messages.xms.mt.output-11=OffsetAndMetadata{offset=69789114, leaderEpoch=null, metadata=''}, messages.xms.mt.output-8=OffsetAndMetadata{offset=63630534, leaderEpoch=null, metadata=''}}
To Reproduce
This is reproducible in our test environment when an application instance with a consumer is restarted during high load. When it comes up again, the bug is triggered by revocation of partitions from a consumer on another application instance. We are not sure what other factors are at play here, but apparently the window of potentially successful offset commit between rebalance in progress and partition revocation is a key factor here. The introduction of the bug coincided with a kafka-clients upgrade from 3.3.1 to 3.4.1, but the change in behaviour of kafka-clients doesn't in itself appear to be a bug.
Expected behavior
In step 3 above, the expected behaviour would be to remove the partitions included in the offset commit from the saved offsets, which in the above example would have meant that there were no offsets left to commit when the partitions were revoked.
Sample
May not be easily reproducible with a small example because it can be expected to be timing dependent.