Skip to content

Commit ac9aaa4

Browse files
duhengluckyShadowySpiritspanzhi33panzhi33ArronHuang
authored
Fix conflict,merge develop (#2906)
* [ISSUE #1233] Fix CVE-2011-1473 * fix Multiple instances in the same application share MQClientInstance * [ISSUE #2748] Fix deleteSubscriptionGroup not remove consumer offset * [ISSUE #2745] Changed the support time of the request/reply feature to 4.6.0. Co-authored-by: von gosling <[email protected]> * [ISSUE #2729] Replace with Math.min method call * [ISSUE #2801]Fix NamesrvAddr connot set in Producer * [ISSUE 2800] optimize: the spelling of topicSynFlag Co-authored-by: ph3636 <[email protected]> * [ISSUE #2803] Fix the endpoint cannot get instanceId without http (#2804) * fix the endpoint cannot get instanceId without http * fix the endpoint cannot get instanceId without http * add unit test * add unit test * add unit test Co-authored-by: panzhi33 <[email protected]> * fix messageArrivingListener NPE * [ISSUE #2538]Optimize log output when message trace saving fails * [ISSUE #2811] Fix the wrong topic was consumed in the DefaultMessageStoreTest test program * [ISSUE #2821] Overriding the ServiceThread#shutdown in HAClient class * [ISSUE #2805] remove redundant package imports * [ISSUE #2833] Support trace for TranscationProducer (#2834) * [ISSUE #2732] Fix message loss problem when rebalance with LitePullConsumer (#2832) * [ISSUE #2732] Fix message loss problem when rebalance with LitePullConsumer * Fix message loss problem when rebalance with LitePullConsumer, update 2 * [ISSUE #2846]fix -E might not port to other systems * fix some nonconformity after checkstyle * Support OpenTracing(#2861) * [ISSUE #2872] remove log files created by integration test when mvn clean * [ISSUE #2872] move log files created by integration test to target dir * Change log level to debug: "Half offset {} has been committed/rolled back" * Fix unit test stability Bump mockito-core to 3.10.0, remove powermock dependency, suppress useless logging * [ISSUE #2898] Resolve rocketmq-example project failed during checkstyle execution (#2899) Co-authored-by: SSpirits <[email protected]> Co-authored-by: panzhi33 <[email protected]> Co-authored-by: panzhi <[email protected]> Co-authored-by: ArronHuang <[email protected]> Co-authored-by: von gosling <[email protected]> Co-authored-by: drgnchan <[email protected]> Co-authored-by: zhangjidi2016 <[email protected]> Co-authored-by: ph3636 <[email protected]> Co-authored-by: ph3636 <[email protected]> Co-authored-by: BurningCN <[email protected]> Co-authored-by: francis lee <[email protected]> Co-authored-by: 灼华 <[email protected]> Co-authored-by: yuz10 <[email protected]> Co-authored-by: huangli <[email protected]> Co-authored-by: chenrl <[email protected]> Co-authored-by: ayanamist <[email protected]> Co-authored-by: zhangjidi2016 <[email protected]>
1 parent 7d32c02 commit ac9aaa4

File tree

63 files changed

+2041
-146
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+2041
-146
lines changed

broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,4 +232,20 @@ public void cloneOffset(final String srcGroup, final String destGroup, final Str
232232
}
233233
}
234234

235+
public void removeOffset(final String group) {
236+
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
237+
while (it.hasNext()) {
238+
Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
239+
String topicAtGroup = next.getKey();
240+
if (topicAtGroup.contains(group)) {
241+
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
242+
if (arrays.length == 2 && group.equals(arrays[1])) {
243+
it.remove();
244+
log.warn("clean group offset {}", topicAtGroup);
245+
}
246+
}
247+
}
248+
249+
}
250+
235251
}

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,10 @@ private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx,
714714

715715
this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName());
716716

717+
if (requestHeader.isRemoveOffset()) {
718+
this.brokerController.getConsumerOffsetManager().removeOffset(requestHeader.getGroupName());
719+
}
720+
717721
if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
718722
this.brokerController.getBrokerStatsManager().onGroupDeleted(requestHeader.getGroupName());
719723
}

broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,7 @@ public TopicConfig createTopicInSendMessageMethod(final String topic, final Stri
176176
if (PermName.isInherited(defaultTopicConfig.getPerm())) {
177177
topicConfig = new TopicConfig(topic);
178178

179-
int queueNums =
180-
clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
181-
.getWriteQueueNums() : clientDefaultTopicQueueNums;
179+
int queueNums = Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums());
182180

183181
if (queueNums < 0) {
184182
queueNums = 0;

broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public void check(long transactionTimeout, int transactionCheckMax,
164164
break;
165165
}
166166
if (removeMap.containsKey(i)) {
167-
log.info("Half offset {} has been committed/rolled back", i);
167+
log.debug("Half offset {} has been committed/rolled back", i);
168168
Long removedOpOffset = removeMap.remove(i);
169169
doneOpOffset.add(removedOpOffset);
170170
} else {

client/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,18 @@
4747
<groupId>org.apache.commons</groupId>
4848
<artifactId>commons-lang3</artifactId>
4949
</dependency>
50+
<dependency>
51+
<groupId>io.opentracing</groupId>
52+
<artifactId>opentracing-api</artifactId>
53+
<version>0.33.0</version>
54+
<scope>provided</scope>
55+
</dependency>
56+
<dependency>
57+
<groupId>io.opentracing</groupId>
58+
<artifactId>opentracing-mock</artifactId>
59+
<version>0.33.0</version>
60+
<scope>test</scope>
61+
</dependency>
5062
<dependency>
5163
<groupId>org.apache.logging.log4j</groupId>
5264
<artifactId>log4j-core</artifactId>

client/src/main/java/org/apache/rocketmq/client/ClientConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void setInstanceName(String instanceName) {
9494

9595
public void changeInstanceNameToPID() {
9696
if (this.instanceName.equals("DEFAULT")) {
97-
this.instanceName = String.valueOf(UtilAll.getPid());
97+
this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
9898
}
9999
}
100100

@@ -178,8 +178,8 @@ public ClientConfig cloneClientConfig() {
178178
}
179179

180180
public String getNamesrvAddr() {
181-
if (StringUtils.isNotEmpty(namesrvAddr) && NameServerAddressUtils.NAMESRV_ENDPOINT_PATTERN.matcher(namesrvAddr.trim()).matches()) {
182-
return namesrvAddr.substring(NameServerAddressUtils.ENDPOINT_PREFIX.length());
181+
if (StringUtils.isNotEmpty(namesrvAddr) && NameServerAddressUtils.validateInstanceEndpoint(namesrvAddr.trim())) {
182+
return NameServerAddressUtils.getNameSrvAddrFromNamesrvEndpoint(namesrvAddr);
183183
}
184184
return namesrvAddr;
185185
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.client.hook;
18+
19+
import org.apache.rocketmq.client.producer.LocalTransactionState;
20+
import org.apache.rocketmq.common.message.Message;
21+
22+
public class EndTransactionContext {
23+
private String producerGroup;
24+
private Message message;
25+
private String brokerAddr;
26+
private String msgId;
27+
private String transactionId;
28+
private LocalTransactionState transactionState;
29+
private boolean fromTransactionCheck;
30+
31+
public String getProducerGroup() {
32+
return producerGroup;
33+
}
34+
35+
public void setProducerGroup(String producerGroup) {
36+
this.producerGroup = producerGroup;
37+
}
38+
39+
public Message getMessage() {
40+
return message;
41+
}
42+
43+
public void setMessage(Message message) {
44+
this.message = message;
45+
}
46+
47+
public String getBrokerAddr() {
48+
return brokerAddr;
49+
}
50+
51+
public void setBrokerAddr(String brokerAddr) {
52+
this.brokerAddr = brokerAddr;
53+
}
54+
55+
public String getMsgId() {
56+
return msgId;
57+
}
58+
59+
public void setMsgId(String msgId) {
60+
this.msgId = msgId;
61+
}
62+
63+
public String getTransactionId() {
64+
return transactionId;
65+
}
66+
67+
public void setTransactionId(String transactionId) {
68+
this.transactionId = transactionId;
69+
}
70+
71+
public LocalTransactionState getTransactionState() {
72+
return transactionState;
73+
}
74+
75+
public void setTransactionState(LocalTransactionState transactionState) {
76+
this.transactionState = transactionState;
77+
}
78+
79+
public boolean isFromTransactionCheck() {
80+
return fromTransactionCheck;
81+
}
82+
83+
public void setFromTransactionCheck(boolean fromTransactionCheck) {
84+
this.fromTransactionCheck = fromTransactionCheck;
85+
}
86+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.client.hook;
18+
19+
public interface EndTransactionHook {
20+
String hookName();
21+
22+
void endTransaction(final EndTransactionContext context);
23+
}

client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1721,10 +1721,11 @@ public void deleteTopicInNameServer(final String addr, final String topic, final
17211721
throw new MQClientException(response.getCode(), response.getRemark());
17221722
}
17231723

1724-
public void deleteSubscriptionGroup(final String addr, final String groupName, final long timeoutMillis)
1724+
public void deleteSubscriptionGroup(final String addr, final String groupName, final boolean removeOffset, final long timeoutMillis)
17251725
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
17261726
DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader();
17271727
requestHeader.setGroupName(groupName);
1728+
requestHeader.setRemoveOffset(removeOffset);
17281729
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, requestHeader);
17291730

17301731
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),

client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,12 @@ public long getPullOffset(MessageQueue messageQueue) {
8383
return -1;
8484
}
8585

86-
public void updatePullOffset(MessageQueue messageQueue, long offset) {
86+
public void updatePullOffset(MessageQueue messageQueue, long offset, ProcessQueue processQueue) {
8787
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
8888
if (messageQueueState != null) {
89+
if (messageQueueState.getProcessQueue() != processQueue) {
90+
return;
91+
}
8992
messageQueueState.setPullOffset(offset);
9093
}
9194
}

0 commit comments

Comments
 (0)