本文阅读基础:使用或了解过rocketMq;想了解"topic 5min trend"背后的原理;想了解监控模式如何实现。
RocketMq的dashboard,有运维页面,驾驶舱,集群页面,主题页面,消费者页面,生产者页面,发布管理页面,消息查询页面等,为开发和运维提供了强大的监控功能。
中文文档:https://github.com/apache/rocketmq-dashboard/blob/master/docs/1_0_0/UserGuide_CN.md
这篇文章将介绍驾驶舱-topic 5min trend的实现和原理。
功能:
"topic 5min trend"表示当前时间/或选定时间,25小时内,rocketMq消息发送总量,如果是当天实时消息,可能有1h延迟。
原理:
1.整体分为3个模块:
1、broker起定时任务收集消息-核心;
2、dashboard起定时任务从broker拉取统计内容,汇总并保存到本地;
3、前端拉取消息并在本地创建当天统计文件;
2.代码版本:
broker:4.2.0
dashboard:1.0.1-SNAPSHOT
3.总览:
这是一张非常重
dashboard topic 5min trend活动图
4.源码分析:
我们从右往左看这张图,右边是前端调用,左边是数据收集。
整体上,dashboard通过定时任务,从broker拉取统计数据,按天保存到本地。
第6、7泳道,是将数据保存到本地,提供给前端页面展示;
第5条泳道中,生成sum,我们主要看下sum的生成逻辑:先取statsDay,如果没有,再依次取statsHour和statsMinute;
看到这里有点懵,brokerStatsData是从哪来的?又是如何处理的呢? 我们看第3,4条泳道,也就是本文的重点。
3号泳道,broker的StatsItemSet中,通过执行定时任务对写入的消息量采样汇总到StatsItem。
4号泳道,broker的AdminBrokerProcessor分别对StatsItem中的3个成员变量csListMinute,csListHour,csListDay处理,生成3个快照汇总到BrokerStatsData。
下面我们进入源码,看下3号泳道和4号泳道的实现原理。
3号泳道,broker-StatsItemSet 采样汇总
StatsItemSet类在实例化时,执行init方法,启动定时任务
具体的采样方法:
从图中我们看出,csListMinute,csListHour,csListDay,用来存放采样的数据。
csListMinute存放最近70s的数据快照;csListHour存放最近70min的数据快照;csListDay存放最近25h的数据快照。
同时,我们看到,三个取样器取的总量是来自同一个value:
value是写入的消息总量;times是写入次数;驾驶舱-topic 5min trend中的曲线表示的是消息总量,所以我们主要看value。
看到这里,大家可能和我有同样的疑惑,都取同一个值,要这么多采样器做什么?我们去找调用方看看怎么处理这些数据。
4号泳道:broker-AdminBrokerProcessor类ViewBrokerStatsData方法:
private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final ViewBrokerStatsDataRequestHeader requestHeader = (ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(null); DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore(); // 从Map中,根据topic获取statsItem StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey()); if (null == statsItem) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("The stats <%s> <%s> not exist", requestHeader.getStatsName(), requestHeader.getStatsKey())); return response; } BrokerStatsData brokerStatsData = new BrokerStatsData(); { BrokerStatsItem it = new BrokerStatsItem(); // StatsDataInMinute 有计算方法 StatsSnapshot ss = statsItem.getStatsDataInMinute(); it.setSum(ss.getSum()); it.setTps(ss.getTps()); it.setAvgpt(ss.getAvgpt()); brokerStatsData.setStatsMinute(it); } { BrokerStatsItem it = new BrokerStatsItem(); StatsSnapshot ss = statsItem.getStatsDataInHour(); it.setSum(ss.getSum()); it.setTps(ss.getTps()); it.setAvgpt(ss.getAvgpt()); brokerStatsData.setStatsHour(it); } { BrokerStatsItem it = new BrokerStatsItem(); StatsSnapshot ss = statsItem.getStatsDataInDay(); it.setSum(ss.getSum()); it.setTps(ss.getTps()); it.setAvgpt(ss.getAvgpt()); brokerStatsData.setStatsDay(it); } response.setBody(brokerStatsData.encode()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
dashboard调用broker的ViewBrokerStatsData()方法,获取统计快照BrokerStatsData。
BrokerStatsData类继承了RemotingSerializable类,有3个成员变量:statsMinute,statsHour,statsDay。
我们看下从哪里对这三个变量赋值的,以statsMinute为例:
快照ss来自于statsItem.getStatsDataInMinute():
从2,3两图我们看到,计算statsMinute快照时,computeStatsData的入参是csListMinute,就是我们采样的结果集。
computeStatsData是静态公共的计算方法,分别对csListMinute,csListHour,csListDay处理,计算出分钟级数据快照statsDataInMinute,小时级数据快照statsDataInHour,天级数据快照statsDataInDay;再将这些快照中的值,赋值给BrokerStatsItem和BrokerStatsData。
到这里,我们就梳理清楚了BrokerStatsData中的数据是怎么来的了,以及数据是如何处理的了。
下面我本地debug,结合dashboard来展开说下图3中的数据处理规则。已经明白的朋友可以跳过
5.图表和数据:
我们看到对csList写入和获取时,都对csList加锁。用于防止多线程请求下,csList动态变化产生数据不一致问题。因为用于监控,最快10s写入一次,调用方30s获取一次,所以可以忽略加锁对性能产生的影响。
计算sum时,是用最后一个节点的value-第一个节点的value得到。
这是我本地发送消息后的一段截图,我们debug看下,3000是怎么来的?
在文章开始处,代码分析,第5条泳道中,我们说过,sum的生成逻辑:从brokerStatsData中先取statsDay,如果没有,再依次取statsHour和statsMinute;所以,图中的3000,取自statsDay中的sum。
按照我们上面的推理,statsDay中sum,是用csListDay中的最后一个节点的value-第一个节点的value得到。
我们进入statsItem,看下是不是这样:
6000-3000=3000;
Q&A:
1、Q:为什么csListDay第一个节点到最后一个节点隔了10个小时左右,却只有5个节点。
A:我猜是我打断点或者电脑休眠导致的。你看,4和3节点之间间隔是1个小时的;按照代码的逻辑,一直运行的话,应该是25个节点。
2、Q:几天前统计的数据,是怎么保存的?
A:在文章开头的活动图中,泳道1中可以看到,收集到的数据会实时保存到本地文件。dashboard请求时,也是请求本地文件。将本地文件中的数据转成Map返回。
public Map<String, List<String>> getTopicCache(String date) { // 获取文件目录:/tmp/rocketmq-console/data/dashboard String dataLocationPath = configure.getDashboardCollectData(); // 生成文件:/tmp/rocketmq-console/data/dashboardyyyy-MM-dd_topic.json File file = new File(dataLocationPath + date + "_topic" + ".json"); if (!file.exists()) { log.info(String.format("No dashboard data for data: %s", date)); //throw Throwables.propagate(new ServiceException(1, "This date have't data!")); return Maps.newHashMap(); } return jsonDataFile2map(file); }
/tmp/rocketmq-console/data目录:
3、Q:标题叫驾驶舱-topic 5min trend。标题和分析的结果出入很大呢,5min没体现啊。
A:我分析的是4.2版本的代码,逻辑确实就是上面分析的结果,而且我结合dashboard反复验证过。
比如上面我们说,sum的生成逻辑:从brokerStatsData中先取statsDay,如果没有,再依次取statsHour和statsMinute;
也就是说,当statsDay不为空时,1个小时内即使有消息产生,只要没到samplingInHour任务执行的时间,在驾驶舱-topic 5min trend中就不会体现出来。
黄线标识的时刻我没有发送消息,从“Broker 5min trend”中可以看出,但“topic 5min trend”却上升了,是因为我本地的samplingInHour执行了,拿到了我在15:02到16:02的sum。在16:05和16:07左右,我分别发送了1000条消息,“主题 5min trend”却没有变化,预计在17:02时,才会上升。
4、Q:我们知道rocketMq可以集群部署,多主多从,一主多从。我们看到消息采集是分布在每个broker主节点上。那dashboard是如何收集到这些消息的呢?
A:非常简单,dashboard从namesrv可以获得到所有注册的broker主节点。从这些主节点挨个获取再加和就可以了。
总结:
到此,我们就将"topic 5min trend"中主要的实现结合源码,图表展示和数据分析完了。
"topic 5min trend"表示当前时间/或选定时间,25小时内,rocketMq消息发送总量,如果是当天实时消息,可能有1h延迟。
5min 没有体现出来。