Skip to content

Commit 5b64ffc

Browse files
authored
[RIP-48] Not commit offset when broker has server-side offset in ack message (#5878)
1 parent 3291934 commit 5b64ffc

File tree

1 file changed

+7
-6
lines changed

1 file changed

+7
-6
lines changed

broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,12 +167,13 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
167167
requestHeader.getQueueId(), requestHeader.getOffset(),
168168
ExtraInfoUtil.getPopTime(extraInfo));
169169
if (nextOffset > -1) {
170-
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
171-
requestHeader.getConsumerGroup(), requestHeader.getTopic(),
172-
requestHeader.getQueueId(),
173-
nextOffset);
174-
this.brokerController.getPopMessageProcessor().notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
175-
requestHeader.getQueueId());
170+
if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
171+
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueId())) {
172+
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
173+
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), nextOffset);
174+
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
175+
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueId());
176+
}
176177
} else if (nextOffset == -1) {
177178
String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s",
178179
lockKey, oldOffset, requestHeader.getOffset(), nextOffset, channel.remoteAddress());

0 commit comments

Comments
 (0)