Skip to content

Commit 5c72de3

Browse files
authored
[ISSUE apache#2732] Fix message loss problem when rebalance with LitePullConsumer (apache#2832)
* [ISSUE apache#2732] Fix message loss problem when rebalance with LitePullConsumer * Fix message loss problem when rebalance with LitePullConsumer, update 2
1 parent 1ffebf6 commit 5c72de3

File tree

2 files changed

+13
-5
lines changed

2 files changed

+13
-5
lines changed

client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,12 @@ public long getPullOffset(MessageQueue messageQueue) {
8383
return -1;
8484
}
8585

86-
public void updatePullOffset(MessageQueue messageQueue, long offset) {
86+
public void updatePullOffset(MessageQueue messageQueue, long offset, ProcessQueue processQueue) {
8787
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
8888
if (messageQueueState != null) {
89+
if (messageQueueState.getProcessQueue() != processQueue) {
90+
return;
91+
}
8992
messageQueueState.setPullOffset(offset);
9093
}
9194
}

client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -612,9 +612,9 @@ public synchronized void commitAll() {
612612
}
613613
}
614614

615-
private void updatePullOffset(MessageQueue messageQueue, long nextPullOffset) {
615+
private void updatePullOffset(MessageQueue messageQueue, long nextPullOffset, ProcessQueue processQueue) {
616616
if (assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
617-
assignedMessageQueue.updatePullOffset(messageQueue, nextPullOffset);
617+
assignedMessageQueue.updatePullOffset(messageQueue, nextPullOffset, processQueue);
618618
}
619619
}
620620

@@ -740,6 +740,9 @@ public void run() {
740740
}
741741

742742
long offset = nextPullOffset(messageQueue);
743+
if (this.isCancelled() || processQueue.isDropped()) {
744+
return;
745+
}
743746
long pullDelayTimeMills = 0;
744747
try {
745748
SubscriptionData subscriptionData;
@@ -752,7 +755,9 @@ public void run() {
752755
}
753756

754757
PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
755-
758+
if (this.isCancelled() || processQueue.isDropped()) {
759+
return;
760+
}
756761
switch (pullResult.getPullStatus()) {
757762
case FOUND:
758763
final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
@@ -769,7 +774,7 @@ public void run() {
769774
default:
770775
break;
771776
}
772-
updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
777+
updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);
773778
} catch (Throwable e) {
774779
pullDelayTimeMills = pullTimeDelayMillsWhenException;
775780
log.error("An error occurred in pull message process.", e);

0 commit comments

Comments
 (0)