@@ -283,6 +283,7 @@ public GroupConsumeInfo queryGroup(String consumerGroup, String address) {
283
283
284
284
@ Override
285
285
public List <TopicConsumerInfo > queryConsumeStatsListByGroupName (String groupName , String address ) {
286
+ groupName = getConsumerGroup (groupName );
286
287
List <ConsumeStats > consumeStatses = new ArrayList <>();
287
288
String topic = null ;
288
289
try {
@@ -295,16 +296,18 @@ public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName
295
296
throw new RuntimeException (e );
296
297
}
297
298
List <TopicConsumerInfo > res = new ArrayList <>();
299
+ String finalGroupName = groupName ;
298
300
consumeStatses .forEach (consumeStats -> {
299
301
if (consumeStats != null && consumeStats .getOffsetTable () != null && !consumeStats .getOffsetTable ().isEmpty ()) {
300
- res .addAll (toTopicConsumerInfoList (topic , consumeStats , groupName ));
302
+ res .addAll (toTopicConsumerInfoList (topic , consumeStats , finalGroupName ));
301
303
}
302
304
});
303
305
return res ;
304
306
}
305
307
306
308
@ Override
307
309
public List <TopicConsumerInfo > queryConsumeStatsList (final String topic , String groupName ) {
310
+ groupName = getConsumerGroup (groupName );
308
311
ConsumeStats consumeStats = null ;
309
312
try {
310
313
consumeStats = mqAdminExt .examineConsumeStats (groupName , topic );
@@ -316,6 +319,7 @@ public List<TopicConsumerInfo> queryConsumeStatsList(final String topic, String
316
319
}
317
320
318
321
private List <TopicConsumerInfo > toTopicConsumerInfoList (String topic , ConsumeStats consumeStats , String groupName ) {
322
+ groupName = getConsumerGroup (groupName );
319
323
List <MessageQueue > mqList = Lists .newArrayList (Iterables .filter (consumeStats .getOffsetTable ().keySet (), new Predicate <MessageQueue >() {
320
324
@ Override
321
325
public boolean apply (MessageQueue o ) {
@@ -339,6 +343,7 @@ public boolean apply(MessageQueue o) {
339
343
}
340
344
341
345
private Map <MessageQueue , String > getClientConnection (String groupName ) {
346
+ groupName = getConsumerGroup (groupName );
342
347
Map <MessageQueue , String > results = Maps .newHashMap ();
343
348
try {
344
349
ConsumerConnection consumerConnection = mqAdminExt .examineConsumerConnectionInfo (groupName );
@@ -417,17 +422,18 @@ public Map<String, ConsumerGroupRollBackStat> resetOffset(ResetOffsetRequest res
417
422
}
418
423
419
424
@ Override
420
- public List <ConsumerConfigInfo > examineSubscriptionGroupConfig (String group ) {
425
+ public List <ConsumerConfigInfo > examineSubscriptionGroupConfig (String consumerGroup ) {
426
+ consumerGroup = getConsumerGroup (consumerGroup );
421
427
List <ConsumerConfigInfo > consumerConfigInfoList = Lists .newArrayList ();
422
428
try {
423
429
ClusterInfo clusterInfo = clusterInfoService .get ();
424
430
for (String brokerName : clusterInfo .getBrokerAddrTable ().keySet ()) { //foreach brokerName
425
431
String brokerAddress = clusterInfo .getBrokerAddrTable ().get (brokerName ).selectBrokerAddr ();
426
432
SubscriptionGroupConfig subscriptionGroupConfig = null ;
427
433
try {
428
- subscriptionGroupConfig = mqAdminExt .examineSubscriptionGroupConfig (brokerAddress , group );
434
+ subscriptionGroupConfig = mqAdminExt .examineSubscriptionGroupConfig (brokerAddress , consumerGroup );
429
435
} catch (Exception e ) {
430
- logger .warn ("op=examineSubscriptionGroupConfig_error brokerName={} group={}" , brokerName , group );
436
+ logger .warn ("op=examineSubscriptionGroupConfig_error brokerName={} group={}" , brokerName , consumerGroup );
431
437
}
432
438
if (subscriptionGroupConfig == null ) {
433
439
continue ;
@@ -480,6 +486,7 @@ private void deleteResources(String topic, String brokerName, ClusterInfo cluste
480
486
481
487
@ Override
482
488
public boolean createAndUpdateSubscriptionGroupConfig (ConsumerConfigInfo consumerConfigInfo ) {
489
+ consumerConfigInfo .getSubscriptionGroupConfig ().setGroupName (getConsumerGroup (consumerConfigInfo .getSubscriptionGroupConfig ().getGroupName ()));
483
490
try {
484
491
ClusterInfo clusterInfo = clusterInfoService .get ();
485
492
for (String brokerName : changeToBrokerNameSet (clusterInfo .getClusterAddrTable (),
@@ -495,6 +502,7 @@ public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consume
495
502
496
503
@ Override
497
504
public Set <String > fetchBrokerNameSetBySubscriptionGroup (String group ) {
505
+ group = getConsumerGroup (group );
498
506
Set <String > brokerNameSet = Sets .newHashSet ();
499
507
try {
500
508
List <ConsumerConfigInfo > consumerConfigInfoList = examineSubscriptionGroupConfig (group );
@@ -511,6 +519,7 @@ public Set<String> fetchBrokerNameSetBySubscriptionGroup(String group) {
511
519
512
520
@ Override
513
521
public ConsumerConnection getConsumerConnection (String consumerGroup , String address ) {
522
+ consumerGroup = getConsumerGroup (consumerGroup );
514
523
try {
515
524
String [] addresses = address .split ("," );
516
525
String addr = addresses [0 ];
@@ -523,6 +532,7 @@ public ConsumerConnection getConsumerConnection(String consumerGroup, String add
523
532
524
533
@ Override
525
534
public ConsumerRunningInfo getConsumerRunningInfo (String consumerGroup , String clientId , boolean jstack ) {
535
+ consumerGroup = getConsumerGroup (consumerGroup );
526
536
try {
527
537
return mqAdminExt .getConsumerRunningInfo (consumerGroup , clientId , jstack );
528
538
} catch (Exception e ) {
@@ -533,15 +543,14 @@ public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String c
533
543
534
544
@ Override
535
545
public GroupConsumeInfo refreshGroup (String address , String consumerGroup ) {
536
-
537
546
if (isCacheBeingBuilt || cacheConsumeInfoList .isEmpty ()) {
538
547
throw new RuntimeException ("Cache is being built or empty, please try again later" );
539
548
}
540
549
synchronized (cacheConsumeInfoList ) {
541
550
for (int i = 0 ; i < cacheConsumeInfoList .size (); i ++) {
542
551
GroupConsumeInfo groupConsumeInfo = cacheConsumeInfoList .get (i );
543
552
if (groupConsumeInfo .getGroup ().equals (consumerGroup )) {
544
- GroupConsumeInfo updatedInfo = queryGroup (consumerGroup , "" );
553
+ GroupConsumeInfo updatedInfo = queryGroup (consumerGroup , address );
545
554
updatedInfo .setUpdateTime (new Date ());
546
555
updatedInfo .setGroup (consumerGroup );
547
556
updatedInfo .setAddress (consumerGroupMap .get (consumerGroup ));
@@ -559,4 +568,11 @@ public List<GroupConsumeInfo> refreshAllGroup(String address) {
559
568
consumerGroupMap .clear ();
560
569
return queryGroupList (false , address );
561
570
}
571
+
572
+ public String getConsumerGroup (String consumerGroup ) {
573
+ if (consumerGroup != null && consumerGroup .startsWith ("%SYS%" )) {
574
+ return consumerGroup .substring (5 ); // Remove "%SYS%" prefix
575
+ }
576
+ return consumerGroup ;
577
+ }
562
578
}
0 commit comments