RocketMq-dashboard:topic 5min trend 原理和源码分析(一)

简介: RocketMq-dashboard:topic 5min trend 原理和源码分析(一)

本文阅读基础:使用或了解过rocketMq;想了解"topic 5min trend"背后的原理;想了解监控模式如何实现。


RocketMq的dashboard,有运维页面,驾驶舱,集群页面,主题页面,消费者页面,生产者页面,发布管理页面,消息查询页面等,为开发和运维提供了强大的监控功能。


中文文档:https://github.com/apache/rocketmq-dashboard/blob/master/docs/1_0_0/UserGuide_CN.md


这篇文章将介绍驾驶舱-topic 5min trend的实现和原理。



功能:

"topic 5min trend"表示当前时间/或选定时间,25小时内,rocketMq消息发送总量,如果是当天实时消息,可能有1h延迟。



e361b8818e7e0bc3b615b6fc722ca6fe.png


原理:

1.整体分为3个模块:


1、broker起定时任务收集消息-核心;

2、dashboard起定时任务从broker拉取统计内容,汇总并保存到本地;

3、前端拉取消息并在本地创建当天统计文件;


2.代码版本:

broker:4.2.0

dashboard:1.0.1-SNAPSHOT



3.总览:

这是一张非常重

网络异常,图片无法展示
|


dashboard topic 5min trend活动图

4.源码分析:


我们从右往左看这张图,右边是前端调用,左边是数据收集。


整体上,dashboard通过定时任务,从broker拉取统计数据,按天保存到本地。


第6、7泳道,是将数据保存到本地,提供给前端页面展示;


第5条泳道中,生成sum,我们主要看下sum的生成逻辑:先取statsDay,如果没有,再依次取statsHour和statsMinute;



d11dc10a1726127885376d58c06227d3.png


看到这里有点懵,brokerStatsData是从哪来的?又是如何处理的呢? 我们看第3,4条泳道,也就是本文的重点。


3号泳道,broker的StatsItemSet中,通过执行定时任务对写入的消息量采样汇总到StatsItem。


4号泳道,broker的AdminBrokerProcessor分别对StatsItem中的3个成员变量csListMinute,csListHour,csListDay处理,生成3个快照汇总到BrokerStatsData。


下面我们进入源码,看下3号泳道和4号泳道的实现原理。


3号泳道,broker-StatsItemSet 采样汇总



StatsItemSet类在实例化时,执行init方法,启动定时任务

image.png


具体的采样方法:



6f679304033110524ed681ac98206ddb.png


image.png


从图中我们看出,csListMinute,csListHour,csListDay,用来存放采样的数据。


csListMinute存放最近70s的数据快照;csListHour存放最近70min的数据快照;csListDay存放最近25h的数据快照。


同时,我们看到,三个取样器取的总量是来自同一个value:




image.png


value是写入的消息总量;times是写入次数;驾驶舱-topic 5min trend中的曲线表示的是消息总量,所以我们主要看value。

看到这里,大家可能和我有同样的疑惑,都取同一个值,要这么多采样器做什么?我们去找调用方看看怎么处理这些数据。



4号泳道:broker-AdminBrokerProcessor类ViewBrokerStatsData方法:

private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final ViewBrokerStatsDataRequestHeader requestHeader =
            (ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore();
        // 从Map中,根据topic获取statsItem
        StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey());
        if (null == statsItem) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(String.format("The stats <%s> <%s> not exist", requestHeader.getStatsName(), requestHeader.getStatsKey()));
            return response;
        }
        BrokerStatsData brokerStatsData = new BrokerStatsData();
        {
            BrokerStatsItem it = new BrokerStatsItem();
            // StatsDataInMinute 有计算方法
            StatsSnapshot ss = statsItem.getStatsDataInMinute();
            it.setSum(ss.getSum());
            it.setTps(ss.getTps());
            it.setAvgpt(ss.getAvgpt());
            brokerStatsData.setStatsMinute(it);
        }
        {
            BrokerStatsItem it = new BrokerStatsItem();
            StatsSnapshot ss = statsItem.getStatsDataInHour();
            it.setSum(ss.getSum());
            it.setTps(ss.getTps());
            it.setAvgpt(ss.getAvgpt());
            brokerStatsData.setStatsHour(it);
        }
        {
            BrokerStatsItem it = new BrokerStatsItem();
            StatsSnapshot ss = statsItem.getStatsDataInDay();
            it.setSum(ss.getSum());
            it.setTps(ss.getTps());
            it.setAvgpt(ss.getAvgpt());
            brokerStatsData.setStatsDay(it);
        }
        response.setBody(brokerStatsData.encode());
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }



dashboard调用broker的ViewBrokerStatsData()方法,获取统计快照BrokerStatsData。


BrokerStatsData类继承了RemotingSerializable类,有3个成员变量:statsMinute,statsHour,statsDay。



eef13b9d7798101f143a20fd02d10685.png


我们看下从哪里对这三个变量赋值的,以statsMinute为例:


image.png


快照ss来自于statsItem.getStatsDataInMinute():


image.png


从2,3两图我们看到,计算statsMinute快照时,computeStatsData的入参是csListMinute,就是我们采样的结果集。


computeStatsData是静态公共的计算方法,分别对csListMinute,csListHour,csListDay处理,计算出分钟级数据快照statsDataInMinute,小时级数据快照statsDataInHour,天级数据快照statsDataInDay;再将这些快照中的值,赋值给BrokerStatsItem和BrokerStatsData。


到这里,我们就梳理清楚了BrokerStatsData中的数据是怎么来的了,以及数据是如何处理的了。


下面我本地debug,结合dashboard来展开说下图3中的数据处理规则。已经明白的朋友可以跳过



5.图表和数据:




image.png


我们看到对csList写入和获取时,都对csList加锁。用于防止多线程请求下,csList动态变化产生数据不一致问题。因为用于监控,最快10s写入一次,调用方30s获取一次,所以可以忽略加锁对性能产生的影响。


计算sum时,是用最后一个节点的value-第一个节点的value得到。


这是我本地发送消息后的一段截图,我们debug看下,3000是怎么来的?



image.png


在文章开始处,代码分析,第5条泳道中,我们说过,sum的生成逻辑:从brokerStatsData中先取statsDay,如果没有,再依次取statsHour和statsMinute;所以,图中的3000,取自statsDay中的sum。



image.png




按照我们上面的推理,statsDay中sum,是用csListDay中的最后一个节点的value-第一个节点的value得到。

我们进入statsItem,看下是不是这样:



image.png



6000-3000=3000;

Q&A:


1、Q:为什么csListDay第一个节点到最后一个节点隔了10个小时左右,却只有5个节点。


A:我猜是我打断点或者电脑休眠导致的。你看,4和3节点之间间隔是1个小时的;按照代码的逻辑,一直运行的话,应该是25个节点。


2、Q:几天前统计的数据,是怎么保存的?


A:在文章开头的活动图中,泳道1中可以看到,收集到的数据会实时保存到本地文件。dashboard请求时,也是请求本地文件。将本地文件中的数据转成Map返回。

public Map<String, List<String>> getTopicCache(String date) {
        // 获取文件目录:/tmp/rocketmq-console/data/dashboard
        String dataLocationPath = configure.getDashboardCollectData();
        // 生成文件:/tmp/rocketmq-console/data/dashboardyyyy-MM-dd_topic.json
        File file = new File(dataLocationPath + date + "_topic" + ".json");
        if (!file.exists()) {
            log.info(String.format("No dashboard data for data: %s", date));
            //throw Throwables.propagate(new ServiceException(1, "This date have't data!"));
            return Maps.newHashMap();
        }
        return jsonDataFile2map(file);
    }

/tmp/rocketmq-console/data目录:


image.png


3、Q:标题叫驾驶舱-topic 5min trend。标题和分析的结果出入很大呢,5min没体现啊。


A:我分析的是4.2版本的代码,逻辑确实就是上面分析的结果,而且我结合dashboard反复验证过。


比如上面我们说,sum的生成逻辑:从brokerStatsData中先取statsDay,如果没有,再依次取statsHour和statsMinute;


也就是说,当statsDay不为空时,1个小时内即使有消息产生,只要没到samplingInHour任务执行的时间,在驾驶舱-topic 5min trend中就不会体现出来。


image.png


image.png


黄线标识的时刻我没有发送消息,从“Broker 5min trend”中可以看出,但“topic 5min trend”却上升了,是因为我本地的samplingInHour执行了,拿到了我在15:02到16:02的sum。在16:05和16:07左右,我分别发送了1000条消息,“主题 5min trend”却没有变化,预计在17:02时,才会上升。


4、Q:我们知道rocketMq可以集群部署,多主多从,一主多从。我们看到消息采集是分布在每个broker主节点上。那dashboard是如何收集到这些消息的呢?


A:非常简单,dashboard从namesrv可以获得到所有注册的broker主节点。从这些主节点挨个获取再加和就可以了。



image.png


总结:


到此,我们就将"topic 5min trend"中主要的实现结合源码,图表展示和数据分析完了。

"topic 5min trend"表示当前时间/或选定时间,25小时内,rocketMq消息发送总量,如果是当天实时消息,可能有1h延迟。

5min 没有体现出来。




















相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
9月前
|
消息中间件 存储 缓存
RocketMQ原理—4.消息读写的性能优化
本文详细解析了RocketMQ消息队列的核心原理与性能优化机制,涵盖Producer消息分发、Broker高并发写入、Consumer拉取消息流程等内容。重点探讨了基于队列的消息分发、Hash有序分发、CommitLog内存写入优化、ConsumeQueue物理存储设计等关键技术点。同时分析了数据丢失场景及解决方案,如同步刷盘与JVM OffHeap缓存分离策略,并总结了写入与读取流程的性能优化方法,为理解和优化分布式消息系统提供了全面指导。
RocketMQ原理—4.消息读写的性能优化
|
9月前
|
消息中间件 存储 设计模式
RocketMQ原理—5.高可用+高并发+高性能架构
本文主要从高可用架构、高并发架构、高性能架构三个方面来介绍RocketMQ的原理。
2977 21
RocketMQ原理—5.高可用+高并发+高性能架构
|
9月前
|
存储 消息中间件 缓存
RocketMQ原理—3.源码设计简单分析下
本文介绍了Producer作为生产者是如何创建出来的、启动时是如何准备好相关资源的、如何从拉取Topic元数据的、如何选择MessageQueue的、与Broker是如何进行网络通信的,Broker收到一条消息后是如何存储的、如何实时更新索引文件的、如何实现同步刷盘以及异步刷盘的、如何清理存储较久的磁盘数据的,Consumer作为消费者是如何创建和启动的、消费者组的多个Consumer会如何分配消息、Consumer会如何从Broker拉取一批消息。
425 11
RocketMQ原理—3.源码设计简单分析下
|
9月前
|
存储 消息中间件 网络协议
RocketMQ原理—1.RocketMQ整体运行原理
本文详细解析了RocketMQ的整体运行原理,涵盖从生产者到消费者的全流程。首先介绍生产者发送消息的机制,包括Topic与MessageQueue的关系及写入策略;接着分析Broker如何通过CommitLog和ConsumeQueue实现消息持久化,并探讨同步与异步刷盘的优缺点。同时,讲解基于DLedger技术的主从同步原理,确保高可用性。消费者部分则重点讨论消费模式(集群 vs 广播)、拉取消息策略及负载均衡机制。网络通信层面,基于Netty的高性能架构通过多线程池分工协作提升并发能力。最后,揭示mmap与PageCache技术优化文件读写的细节,总结了RocketMQ的核心运行机制。
RocketMQ原理—1.RocketMQ整体运行原理
|
9月前
|
消息中间件 Java 数据管理
RocketMQ原理—2.源码设计简单分析上
本文介绍了NameServer的启动脚本、启动时会解析哪些配置、如何初始化Netty网络服务器、如何启动Netty网络服务器,介绍了Broker启动时是如何初始化配置的、BrokerController的创建以及包含的组件、BrokerController的初始化、启动、Broker如何把自己注册到NameServer上、BrokerOuterAPI是如何发送注册请求的,介绍了NameServer如何处理Broker的注册请求、Broker如何发送定时心跳
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
消息中间件 存储 Kafka
RocketMQ 工作原理图解,看这篇就够了!
本文详细解析了 RocketMQ 的核心架构、消息领域模型、关键特性和应用场景,帮助深入理解消息中间件的工作原理。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
RocketMQ 工作原理图解,看这篇就够了!
|
消息中间件 存储 缓存
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
1472 7
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
消息中间件 负载均衡 算法
聊聊 RocketMQ中 Topic,Queue,Consumer,Consumer Group的关系
本文详细解析了RocketMQ中Topic、Queue、Consumer及Consumer Group之间的关系。文中通过图表展示了Topic可包含多个Queue,Queue分布在不同Broker上;Consumer组内多个消费者共享消息;并深入探讨了集群消费与广播消费模式下Queue与Consumer的关系,以及Rebalancing机制在实例增减时如何确保负载均衡。理解这些关系有助于更好地掌握RocketMQ的工作原理,提升系统运维效率。
3111 2