-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Description
Looks like different partitions are registered for same ConsumerSeekCallback when we have a multi-@KafkaListener scenario.
See AbstractConsumerSeekAwareTests and its TODO.
when we have different groups and concurrency, different listener container instances are going to deal with different partitions.
But looks like we add all of them to the same map entry:
this.callbackToTopics.computeIfAbsent(threadCallback, key -> new LinkedList<>()).add(tp);
This way when we call seek later from the logic, may lead to error like:
2024-07-15 20:30:17,431 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] [org.springframework.kafka.listener.KafkaMessageListenerContainer] - Exception while seeking TopicPartitionOffset{topicPartition=Seek-0, offset=null, relativeToCurrent=false, position=BEGINNING}
java.lang.IllegalStateException: No current assignment for partition Seek-0
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367) ~[kafka-clients-3.7.1.jar:?]
at org.apache.kafka.clients.consumer.internals.SubscriptionState.lambda$requestOffsetReset$3(SubscriptionState.java:650) ~[kafka-clients-3.7.1.jar:?]
at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4966) ~[?:?]
at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestOffsetReset(SubscriptionState.java:648) ~[kafka-clients-3.7.1.jar:?]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.seekToBeginning(LegacyKafkaConsumer.java:804) ~[kafka-clients-3.7.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1178) ~[kafka-clients-3.7.1.jar:?]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processSeeks(KafkaMessageListenerContainer.java:3040) [main/:?]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1397) [main/:?]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1298) [main/:?]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) [?:?]
at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
2024-07-15 20:30:17,431 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] [org.springframework.kafka.listener.KafkaMessageListenerContainer] - Exception while seeking TopicPartitionOffset{topicPartition=Seek-2, offset=null, relativeToCurrent=false, position=BEGINNING}
java.lang.IllegalStateException: No current assignment for partition Seek-2
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367) ~[kafka-clients-3.7.1.jar:?]
at org.apache.kafka.clients.consumer.internals.SubscriptionState.lambda$requestOffsetReset$3(SubscriptionState.java:650) ~[kafka-clients-3.7.1.jar:?]
at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4966) ~[?:?]
at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestOffsetReset(SubscriptionState.java:648) ~[kafka-clients-3.7.1.jar:?]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.seekToBeginning(LegacyKafkaConsumer.java:804) ~[kafka-clients-3.7.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1178) ~[kafka-clients-3.7.1.jar:?]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processSeeks(KafkaMessageListenerContainer.java:3040) [main/:?]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1397) [main/:?]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1298) [main/:?]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) [?:?]
at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
2024-07-15 20:30:17,432 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] [org.springframework.kafka.listener.KafkaMessageListenerContainer] - Exception while seeking TopicPartitionOffset{topicPartition=Seek-1, offset=null, relativeToCurrent=false, position=BEGINNING}
java.lang.IllegalStateException: No current assignment for partition Seek-1
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367) ~[kafka-clients-3.7.1.jar:?]
at org.apache.kafka.clients.consumer.internals.SubscriptionState.lambda$requestOffsetReset$3(SubscriptionState.java:650) ~[kafka-clients-3.7.1.jar:?]
at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4966) ~[?:?]
at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestOffsetReset(SubscriptionState.java:648) ~[kafka-clients-3.7.1.jar:?]
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.seekToBeginning(LegacyKafkaConsumer.java:804) ~[kafka-clients-3.7.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1178) ~[kafka-clients-3.7.1.jar:?]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processSeeks(KafkaMessageListenerContainer.java:3040) [main/:?]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1397) [main/:?]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1298) [main/:?]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) [?:?]
at java.base/java.lang.Thread.run(Thread.java:840) [?:?]