RocketMQ控制台消费者堆栈信息展示优化分析

本文涉及的产品
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
可观测可视化 Grafana 版,10个用户账号 1个月
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: RocketMQ控制台消费者堆栈信息展示优化分析

背景介绍

专有云企业版v_3_12,消息队列RocketMQ控制台->Group管理,查看Group ID下单个消费端堆栈信息,期望只展示与该Group ID相关的堆栈信息,在以下场景与期望不符。

场景介绍

在同一个程序中创建两个不同Group ID的消费端实例,在控制台中查看一个Group ID下单个消费端堆栈信息,堆栈信息中包含了两个Group ID消费端的堆栈信息,给排查问题造成了困扰。

示例代码

pom

<dependency>
  <groupId>com.aliyun.openservices</groupId>
  <artifactId>ons-client</artifactId>
  <version>1.8.8.3.Final</version>
</dependency>

code

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import com.aliyun.openservices.ons.api.bean.BatchConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class Main {
    public static void main(String[] args){
        String nameSrvAddr = "xxx";
        String accessKey = "xxx";
        String secretKey = "xxx";
        String groupId1 = "Goup_ID_1";
        String topic1 = "xxx_1";
        String tag1 = "xxx_1";
        BatchMessageListener batchMessageListener1 = (messages, context) -> Action.CommitMessage;
        BatchConsumerBean batchConsumerBean1 = batchConsumerBean(nameSrvAddr,accessKey,secretKey,
                                                                 groupId1,topic1,tag1,batchMessageListener1);
        batchConsumerBean1.start();
        String groupId2 = "Goup_ID_2";
        String topic2 = "xxx_2";
        String tag2 = "xxx_2";
        BatchMessageListener batchMessageListener2 = (messages, context) -> Action.CommitMessage;
        BatchConsumerBean batchConsumerBean2 = batchConsumerBean(nameSrvAddr,accessKey,secretKey,
                                                                 groupId2,topic2,tag2,batchMessageListener2);
        batchConsumerBean2.start();
    }
    private static BatchConsumerBean batchConsumerBean(String nameSrvAddr,String accessKey,String secretKey,String groupId,String topic,String tag,BatchMessageListener batchMessageListener){
        BatchConsumerBean batchConsumerBean = new BatchConsumerBean();
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.NAMESRV_ADDR,nameSrvAddr);
        properties.put(PropertyKeyConst.AccessKey,accessKey);
        properties.put(PropertyKeyConst.SecretKey,secretKey);
        properties.put(PropertyKeyConst.GROUP_ID,groupId);
        batchConsumerBean.setProperties(properties);
        Subscription subscription = new Subscription();
        subscription.setTopic(topic);
        subscription.setExpression(tag);
        Map<Subscription, BatchMessageListener> subscriptionTable = new HashMap<>();
        subscriptionTable.put(subscription,batchMessageListener);
        batchConsumerBean.setSubscriptionTable(subscriptionTable);
        return batchConsumerBean;
    }
}

分析过程

首先分析示例代码中与BatchConsumerBean相关联的对象,然后分析控制台展示消费端堆栈信息的流程,最后分析下不同版本的RocketMQ Client SDK对消费端消费线程命名方式的变化。

BatchConsumerBean

示例代码中创建了两个BatchConsumerBean实例,与BatchConsumerBean实例相关联的对象如下:

从上图看,BatchConsumerBean实例是比较重的,所以上面的示例代码可以优化为只创建一个BatchConsumerBean实例,与该问题不太相关,暂时忽略;

上图中与该问题直接相关的是ClientRemotingProcessor、MQClientInstance、DefaultMQPushConsumerImpl、ConsumerStatsManager,下面继续分析。

堆栈信息展示流程

下面描述的是在浏览器请求一个Group ID单个消费端堆栈信息的流程。

浏览器请求控制台应用

当在控制台单机某个消费端堆栈信息的时候,浏览器会向控制台应用发起http请求,主要请求参数是:

GroupID,ClientId,其中每个MQClientInstance实例对应一个ClientId。

控制台应用请求Broker

控制台应用收到浏览器请求后,主要进行以下操作:

String topic = MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
if (brokerDatas != null) {
    for (BrokerData brokerData : brokerDatas) {
        String addr = brokerData.selectBrokerAddr();
        if (addr != null) {
            return this.mqClientInstance.getMQClientAPIImpl().getConsumerRunningInfo(addr, consumerGroup, clientId, jstack,timeoutMillis * 3);
        }
    }
}
  1. 根据%RETRY% + GroupIID查找对应的TopicRouteData
  2. 从TopicRouteData中选择一个Broker的地址发送getConsumerRunningInfo请求

Broker请求Consumer

Broker收到请求后,主要进行以下操作:

ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);
RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, null);
newRequest.setExtFields(request.getExtFields());
newRequest.setBody(request.getBody());
return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);
  1. AdminBrokerProcessor响应查询请求
  2. 根据GroupID和ClientId找到对应Consumer实例的channel socket
  3. 通过channel socket发送请求到Consumer实例

Consumer处理逻辑

Consumer收到请求后,主要进行以下操作:

ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());
if (requestHeader.isJstackEnable()) {
  Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
  String jstack = UtilAll.jstack(map);
  consumerRunningInfo.setJstack(jstack);
}
  1. 通过MQClientInstance实例请求Consumer实例的consumerRunningInfo方法获取Consumer运行信息,如:pullRT、pullTPS、consumeRT、consumeOKTPS、consumeFailedTPS等信息
  2. 获取JVM所有线程栈信息
  3. 将获取到的ConsumerRunningInfo返回给Broker。

其中第2步【获取JVM所有线程栈信息】就是我们需要查看的堆栈信息,目前控制台主要展示了以ConsumeMessageThread__开头的线程和RebalanceService线程,这块期望只展示与该消费端相关的ConsumeMessageThread__线程Rebalance线程,不期望将不相关的消费端线程也展示出来。

ConsumeMessageThread线程的命名

在当前版本中处理业务的消费者线程名的形式是:ConsumeMessageThread_数字,ConsumeMessageConcurrentlyService类中相关代码如下:

//该线程池用于处理业务逻辑
this.consumeExecutor = new ThreadPoolExecutor(
  this.defaultMQPushConsumer.getConsumeThreadMin(),
  this.defaultMQPushConsumer.getConsumeThreadMax(),
  1000 * 60,
  TimeUnit.MILLISECONDS,
  this.consumeRequestQueue,
  new ThreadFactoryImpl("ConsumeMessageThread_"));

新版本中线程的命名中增加了GroupId,相关代码如下:

String consumeThreadPrefix = null;
if (consumerGroup.length() > 100) {
    consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup, 0, 100).append("_").toString();
} else {
    consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup).append("_").toString();
}
this.consumeExecutor = new ThreadPoolExecutor(
    this.defaultMQPushConsumer.getConsumeThreadMin(),
    this.defaultMQPushConsumer.getConsumeThreadMax(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.consumeRequestQueue,
    new ThreadFactoryImpl(consumeThreadPrefix));

线程名形式为:ConsumeMessageThread_GroupId__数字,从一定程度对以上问题进行了优化。

总结

  1. ONS SDK对RocketMQ Client进行了封装,更加方便业务的使用,Consumer对象比较重,需要根据业务采用合理的初始化方式
  2. ConsumerStatsManager记录了消费端的一些统计信息
  3. ConsumeMessageConcurrentlyService对消费端线程命名进行了优化
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
222 2
|
4月前
|
消息中间件 Java 调度
消息队列 MQ使用问题之消费者自动掉线是什么导致的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
Dubbo IDE Java
dubbo学习二:下载Dubbo-Admin管理控制台,并分析在2.6.1及2.6.1以后版本的变化
这篇文章是关于如何下载和部署Dubbo管理控制台(dubbo-admin)的教程,并分析了2.6.1版本及以后版本的变化。
41 0
dubbo学习二:下载Dubbo-Admin管理控制台,并分析在2.6.1及2.6.1以后版本的变化
|
3月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
3月前
|
消息中间件 存储 数据中心
RocketMQ的长轮询(Long Polling)实现分析
文章深入分析了RocketMQ的长轮询实现机制,长轮询结合了推送(push)和拉取(pull)两种消息消费模式的优点,通过客户端和服务端的配合,确保了消息的实时性同时将主动权保留在客户端。文中首先解释了长轮询的基本概念和实现步骤,然后通过一个简单的实例模拟了长轮询的过程,最后详细介绍了RocketMQ中DefaultMQPushConsumer的长轮询实现方式,包括PullMessage服务、PullMessageProcessor服务和PullCallback回调的工作原理。
115 1
|
3月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
3月前
|
消息中间件 Arthas Java
RocketMQ—一次连接namesvr失败的案例分析
项目组在使用RocketMQ时遇到Consumer连接Name Server失败的问题,异常显示连接特定地址失败。通过Arthas工具逐步分析代码执行路径,定位到创建Channel返回空值导致异常。进一步跟踪发现,问题源于Netty组件在初始化`ByteBufAllocator`时出现错误。分析依赖后确认存在Netty版本冲突。解决方法为排除冲突的Netty包,仅保留兼容版本。
229 0
RocketMQ—一次连接namesvr失败的案例分析
|
3月前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
91 0
|
3月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
72 0
|
3月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
58 0

相关产品

  • 云消息队列 MQ