背景介绍
专有云企业版v_3_12,消息队列RocketMQ控制台->Group管理,查看Group ID下单个消费端堆栈信息,期望只展示与该Group ID相关的堆栈信息,在以下场景与期望不符。
场景介绍
在同一个程序中创建两个不同Group ID的消费端实例,在控制台中查看一个Group ID下单个消费端堆栈信息,堆栈信息中包含了两个Group ID消费端的堆栈信息,给排查问题造成了困扰。
示例代码
pom
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.8.3.Final</version> </dependency>
code
import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.batch.BatchMessageListener; import com.aliyun.openservices.ons.api.bean.BatchConsumerBean; import com.aliyun.openservices.ons.api.bean.Subscription; import java.util.HashMap; import java.util.Map; import java.util.Properties; public class Main { public static void main(String[] args){ String nameSrvAddr = "xxx"; String accessKey = "xxx"; String secretKey = "xxx"; String groupId1 = "Goup_ID_1"; String topic1 = "xxx_1"; String tag1 = "xxx_1"; BatchMessageListener batchMessageListener1 = (messages, context) -> Action.CommitMessage; BatchConsumerBean batchConsumerBean1 = batchConsumerBean(nameSrvAddr,accessKey,secretKey, groupId1,topic1,tag1,batchMessageListener1); batchConsumerBean1.start(); String groupId2 = "Goup_ID_2"; String topic2 = "xxx_2"; String tag2 = "xxx_2"; BatchMessageListener batchMessageListener2 = (messages, context) -> Action.CommitMessage; BatchConsumerBean batchConsumerBean2 = batchConsumerBean(nameSrvAddr,accessKey,secretKey, groupId2,topic2,tag2,batchMessageListener2); batchConsumerBean2.start(); } private static BatchConsumerBean batchConsumerBean(String nameSrvAddr,String accessKey,String secretKey,String groupId,String topic,String tag,BatchMessageListener batchMessageListener){ BatchConsumerBean batchConsumerBean = new BatchConsumerBean(); Properties properties = new Properties(); properties.put(PropertyKeyConst.NAMESRV_ADDR,nameSrvAddr); properties.put(PropertyKeyConst.AccessKey,accessKey); properties.put(PropertyKeyConst.SecretKey,secretKey); properties.put(PropertyKeyConst.GROUP_ID,groupId); batchConsumerBean.setProperties(properties); Subscription subscription = new Subscription(); subscription.setTopic(topic); subscription.setExpression(tag); Map<Subscription, BatchMessageListener> subscriptionTable = new HashMap<>(); subscriptionTable.put(subscription,batchMessageListener); batchConsumerBean.setSubscriptionTable(subscriptionTable); return batchConsumerBean; } }
分析过程
首先分析示例代码中与BatchConsumerBean相关联的对象,然后分析控制台展示消费端堆栈信息的流程,最后分析下不同版本的RocketMQ Client SDK对消费端消费线程命名方式的变化。
BatchConsumerBean
示例代码中创建了两个BatchConsumerBean实例,与BatchConsumerBean实例相关联的对象如下:
从上图看,BatchConsumerBean实例是比较重的,所以上面的示例代码可以优化为只创建一个BatchConsumerBean实例,与该问题不太相关,暂时忽略;
上图中与该问题直接相关的是ClientRemotingProcessor、MQClientInstance、DefaultMQPushConsumerImpl、ConsumerStatsManager,下面继续分析。
堆栈信息展示流程
下面描述的是在浏览器请求一个Group ID单个消费端堆栈信息的流程。
浏览器请求控制台应用
当在控制台单机某个消费端堆栈信息的时候,浏览器会向控制台应用发起http请求,主要请求参数是:
GroupID,ClientId,其中每个MQClientInstance实例对应一个ClientId。
控制台应用请求Broker
控制台应用收到浏览器请求后,主要进行以下操作:
String topic = MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup; TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas(); if (brokerDatas != null) { for (BrokerData brokerData : brokerDatas) { String addr = brokerData.selectBrokerAddr(); if (addr != null) { return this.mqClientInstance.getMQClientAPIImpl().getConsumerRunningInfo(addr, consumerGroup, clientId, jstack,timeoutMillis * 3); } } }
- 根据%RETRY% + GroupIID查找对应的TopicRouteData
- 从TopicRouteData中选择一个Broker的地址发送getConsumerRunningInfo请求
Broker请求Consumer
Broker收到请求后,主要进行以下操作:
ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId); RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, null); newRequest.setExtFields(request.getExtFields()); newRequest.setBody(request.getBody()); return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);
- AdminBrokerProcessor响应查询请求
- 根据GroupID和ClientId找到对应Consumer实例的channel socket
- 通过channel socket发送请求到Consumer实例
Consumer处理逻辑
Consumer收到请求后,主要进行以下操作:
ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup()); if (requestHeader.isJstackEnable()) { Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces(); String jstack = UtilAll.jstack(map); consumerRunningInfo.setJstack(jstack); }
- 通过MQClientInstance实例请求Consumer实例的consumerRunningInfo方法获取Consumer运行信息,如:pullRT、pullTPS、consumeRT、consumeOKTPS、consumeFailedTPS等信息
- 获取JVM所有线程栈信息
- 将获取到的ConsumerRunningInfo返回给Broker。
其中第2步【获取JVM所有线程栈信息】就是我们需要查看的堆栈信息,目前控制台主要展示了以ConsumeMessageThread__开头的线程和RebalanceService线程,这块期望只展示与该消费端相关的ConsumeMessageThread__线程和Rebalance线程,不期望将不相关的消费端线程也展示出来。
ConsumeMessageThread线程的命名
在当前版本中处理业务的消费者线程名的形式是:ConsumeMessageThread_数字,ConsumeMessageConcurrentlyService类中相关代码如下:
//该线程池用于处理业务逻辑 this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_"));
新版本中线程的命名中增加了GroupId,相关代码如下:
String consumeThreadPrefix = null; if (consumerGroup.length() > 100) { consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup, 0, 100).append("_").toString(); } else { consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup).append("_").toString(); } this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl(consumeThreadPrefix));
线程名形式为:ConsumeMessageThread_GroupId__数字,从一定程度对以上问题进行了优化。
总结
- ONS SDK对RocketMQ Client进行了封装,更加方便业务的使用,Consumer对象比较重,需要根据业务采用合理的初始化方式
- ConsumerStatsManager记录了消费端的一些统计信息
- ConsumeMessageConcurrentlyService对消费端线程命名进行了优化