Skip to content

Commit 012215c

Browse files
garyrussellartembilan
authored andcommitted
GH-813: Re-pause a paused consumer after rebalance
Fixes #813 Kafka resumes the consumer(s) after a rebalance, but since the `ListenerConsumer.consumerPaused` is still true, the container starts consuming again and can't be paused without first issuing a resume. On a rebalance, reset the boolean; the consumer will re-pause before the next poll, if the container's `isPaused()` is still true after the rebalance listeners exit. Tested with a stand-alone Boot app (see the referenced issue). **cherry-pick to 2.1.x**
1 parent c5294f1 commit 012215c

File tree

1 file changed

+6
-0
lines changed

1 file changed

+6
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,12 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
566566

567567
@Override
568568
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
569+
if (ListenerConsumer.this.consumerPaused) {
570+
ListenerConsumer.this.consumerPaused = false;
571+
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
572+
+ "the container will pause again before polling, unless the container's "
573+
+ "'paused' property is reset by a custom rebalance listener");
574+
}
569575
ListenerConsumer.this.assignedPartitions = partitions;
570576
if (!ListenerConsumer.this.autoCommit) {
571577
// Commit initial positions - this is generally redundant but

0 commit comments

Comments
 (0)