pmq再学习一

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 这个过程首先是创建主题,有了主题,创建消费组,然后基于消费组这个大前提,执行订阅操作,订阅需要进行消费的主题信息,然后在订阅的基础上,进行队列的分配。在这个过程中会执行元数据的变更和重平衡操作。而这些可以从审计日志中获取打印日志中可以看到很清楚。下一篇,我们来看生产者和消费者的细节。生产者和消费者在执行操作前会执行一个操作init初始化操作,而这个初始化操作会将信息注册到将信息注册到MqClient中,因为只有将其进行统一管理的时候,在创建客户端对象的时候才会方便管理,同时方便调用,此时会启动心跳服务,此时少不了还有一个重要的操作就是注册消费组,同时需要关注一个点就是长轮询操作。

首先基础数据的来源和怎么产生联系的?创建主题,有了主题,创建消费组,然后基于消费组这个大前提,执行订阅操作,订阅需要进行消费的主题信息,然后在订阅的基础上,进行队列的分配,而分配过程中,首先会去找到可分配的数据节点,然后根据条件进行匹配,然后进行返回。在这个过程中会执行元数据的变更和重平衡操作。特别的,这里需要重点关注队列是怎样产生和分配的。

1.准备工作:pmq相关服务启动

pmq中,启动mq-rest模块的时候,会根据观察者模式启动定时任务、启动broker定时任务、注册时间、注册report,同时在这个过程中,我们可以看到它使用了类似于spi的思想,使用

SpringUtil.getBeans(BrokerTimerService.class)
SpringUtil.getBeans(ConsumerGroupChangedListener.class)
SpringUtil.getBeans(TimerService.class)
...

使用spring工具类可以启动需要启动的服务,为什么要启动,这是因为需要在初始化的过程中需要启动服务,方便服务线程执行任务,比如定时任务。在这个过程中,如果需要在构造函数后需要执行启动,此时可以使用@ConstructPost来执行初始化。

2.mq-ui启动

启动mq-ui,可以看到ui界面中的信息:

微信图片_20221214033321.png


在发送消费、消息存储、消息消费前,需要填充数据,从而提供元数据信息。从审计日志中,我们可以看到pmq的初始化和消息生产和消费的过程。填充数据过程中,首先需要创建数据节点、消息主题、消息组,通过消息组进行消息订阅消息主题,从而初始化队列信息,进行队列分配。而在这个过程中,我们可以看到这个过程会执行一个操作就是首先从缓存map中获取信息,如果缓存map中不存在缓存信息,则执行查询操作,然后将其放入到缓存map中,下一次获取,则直接走缓存获取。但是这个过程走的就是强制刷新缓存操作,同时更新缓存过程为了防止出现问题,添加了cat监控。

也即你会看到在queue、consumerGroup、topic操作中,我们都会看到一个套路就是有一个run方法,在服务启动的时候就会执行run方法进行运行。而在运行的过程中,会执行更新缓存操作,这个过程中会检查是否需要进行更新,如果需要,则执行更新缓存操作,会经过cas进行比较,然后执行更新。在记录日志的过程中,可以看到在mq-biz中,使用了aop切面做全局controller的aop拦截处理增强操作,在输出日志的时候,方便查看,这个过程体现在ControllerAspect中。

同时为了日志的排查方便,可以看到在logback.xml还是LogFilter中,使用了slf4j的mdc来自定义日志的格式。

如果你留心查看pmq的数据表,可以看到其使用了mysql中提出的mvcc中版本号的概念,使用版本号执行更新操作。比如consumer_group中,我们可以看到三个版本号:

`rb_version`bigint(20) NOTNULLCOMMENT'重平衡版本号,每次发生重平衡的时候版本会进行升级,同时客户端提交的时候需要进行版本比对如果版本比对不成功说明发生重平衡了',
`meta_version`bigint(20) DEFAULT'0'COMMENT'元数据信息变更版本号',
`version`bigint(20) DEFAULT'0'COMMENT'为了操作方便,引入一个总的version 版本,eb_version 和meta_version发送变更 都会引发version版本变更,拿到相关信息后自行判断是发生了什么类型版本变化',

而在rb重平衡操作的时候,会插入一条消息notifyMessage操作时,会有一个messageType的概念,而从代码中,我们可以知道messageType有重平衡或者元数据变更操作都会有这个字段。

同时队列queue中有一个字段lock_version,也有一个版本号。这个是为了解决并发时出现的并发问题。

3.创建主题

从官方文档可以看到首先在ui界面上填充数据节点,然后填充主题信息,而填充主题信息的过程,我们可以看到其是TopicController主题控制层中的创建主题控制层这个方法。

创建主题的过程:

1.首先更新主题中的缓存信息,使用springUtils提供的工具执行匹配需要更新的主题的缓存数据信息,更新主题和队列的数据信息。2.对主题的名称进行校验,主题名称的长度大于4同时主题名称截取出来的名称转为小写与fail相同的话,此时抛异常进行提示topic名称不能以fail结尾3.创建topic实体对象,并填充topic对象信息,添加用户id,方便权限控制4.查看当前的操作是更新操作还是插入操作,如果是更新操作直接执行更新主题信息操作,同时可以看到会填充token信息在主题对象中,进行更新后,记下审计日志信息5.如果是插入操作,则创建新建的主题信息,创建前执行查询当前的主题是否存在,如果存在,则直接进行提示,否者执行插入操作,新增完成后,记下审计日志,然后计算需要分配的队列数,同时计算队列的数量,执行分配队列操作。6.分配队列操作distributeQueueWithLock:分布式队列通过此时会通过版本号来进行控制,可以看到队列表中有一个字段lock_version.这里可以通过前面创建的信息看到表中:topic中name字段表示的是正常主题名称,而orginName是失败队列的名称。分配队列的过程,我们可以看到其首先会获取可分配的节点,而这个过程中,进行分配,可以看到分配节点类型是成功类型,同时在节点类型枚举中可以看到成功类型和失败类型的code和编码。同时分的成功类型的队列数量为200*10000=200万,也即一个主题分配成功队列200万个,而其中的一个200万/100=2个队列,分配的成功队列数2个,类型是成功队列类型,如果分配失败队列数也是2个。7.获取可分配的节点:根据数据节点列表和队列列表。此时需要留下可读可写的数据节点,因为数据节点有两种,一种是普通节点1,一种是特殊节点0只有手工匹配才能匹配到,因此需要筛选出满足条件的普通节点。数据节点的属性中:可读可写、只读、不可读不可写,因此这里需要筛选出其是可读可写,这里做了一个编码和code的判断。筛选出了满足条件的数据节点之后,进行队列判断,首先队列中的节点id和数据节点id相匹配,同时队列中拿到的主题id为0,此时进行数据节点添加,同时计数大于0的。此时可以从筛选出来的节点中筛掉已分配的节点。剩下的就是没有分配的队列。如果筛选的可分配节点为空,则返回创建对象的列表,如果不为空,则返回正常节点的。8.对可分配节点中剩余队列进行倒序,获取所以可分配节点下的未分配队列。考虑到分配队列可能会出现失败,此时会进行计数,同时进行遍历操作,方便选出可分配的队列。9.分配时,如果出现分配失败,则进行提示。创建成功,则更新队列中的topic字段,同时更新未分配的队列数,方便下一次分配队列。

创建完主题之后,就可以进行消费组的创建,同时进行消费组的订阅操作,订阅主题。

4.创建消费组

执行创建消费组操作:

1.创建消费组的套路和创建主题的套路是一样的,首先去缓存中查看是否有相关信息,如果没有则执行查询,然后放入缓存map中。2.创建消费组实体对象,然后填充消费组信息,然后对请求的环境进行判断,如果没有,则进行默认。3.对消费组的模式进行判断,分为三种模式:集群模式、广播模式、代理模式,如果为广播模式,会对它的原始名称进行处理,如果是其它两种模式不需要做处理,直接添加到消费组对象中即可。4.这里会对ipFlag进行判断,如果是0时,填充的是白名单ip列表,如果是1,则填充的列表是黑名单列表5.如果获取的消费组不为空,则执行更新操作,此时更新消费组,如果是广播模式,需要更新原始消费组和进行消费组。如果修改了appId,则更新失败主题。如果修改了消费模式,则对应的队列偏移量的消费模式也修改了,执行更新。6.修改黑白名单触发重平衡操作7.执行元数据更新,此时可以看到首先会更新元数据版本,然后创建通知消息notifyMessage对象,这里有一个设置消息模式,分为两种,一种是同步数据meta,一种是重平衡rb,这里是meta插入操作8.插入元数据,记录审计日志9.如果id为空,则执行插入操作,执行插入操作后,执行更新元数据操作

5.订阅操作

然后点击界面的订阅,进行订阅操作ConsumerGroupTopicController#create

1.如果是广播模式,并且为原始消费组,则为进行消费组添加订阅。如果消费组map中拿到的value不为空,则创建消费组主题创建请求对象,填充请求对象信息:消费组名称、消费组id、主题id、主题名称、原始主题名称、主题类型、重试次数线程大小、最大堆积量、延迟处理事件、拉取批量大小、超时时间、消费者批次大小。创建消费组主题和失败主题。2.执行创建消费组主题和失败主题。3.创建消费组主题和失败主题。首先获取用户权限,进行权限校验,进行消费组主题校验,如果为空,返回消费组主题创建响应,进行提示。4.拿到正常topic和失败topic实体,然后填充消费组主题创建请求对象。填充消费组实体对象信息:消费组id、消费组名称、主题id、主题名称、原始主题名称、主题类型、最大拉取时间、超时时间。如果重试次数、线程大小、最大lag、tag、延迟处处理事件、拉取批量大小、消费组批量大小不为空,则进行设置。5.如果消费组主题map中key值包含当前的消费组名称_主题名称,则记录审计日志,同时进行value获取,进行返回,记录审计日志6.否者,说明不存在,此时插入到消费组主题中,记录审计日志7.创建正常主题和失败主题的偏移量:首先获取队列通过主题id,获取消费组信息,如果队列列表为空,则进行审计日志进行提示。8.变量队列实体列表,创建队列偏移对象,进行填充,message01服务类中设置dbId,获取队列中的最大id,正常主题的起始偏移量为当前的最大id。主题类型分为正常队列1,失败队列2。如果是正常队列,则设置偏移量为最大id-1,同时开始的偏移量为最大id-1.9.获取是否包含消费组名称_主题名称_队列id,如果不包含,则执行插入到队列偏移量表中。同时记录审计日志。如果包含,则直接记录审计日志,进行返回响应。创建好正常队列和失败主题的队列后,添加主题名称到消费组中。10.获取消费组实体,拿到消费组主题名称,如果mq3的group的订阅关系中,没有该topic则添加。否者进行设置,执行更新消费组操作。11.通知元数据执行更新元数据操作。12.通知重平衡操作,更新消费组中rb的版本,如果是批量,则执行批量操作,插入到notifyMessage表中。

有了这些关系和基础数据元数据信息,接下来就是进行消息的生产和消息的消费了。

总结

这个过程首先是创建主题,有了主题,创建消费组,然后基于消费组这个大前提,执行订阅操作,订阅需要进行消费的主题信息,然后在订阅的基础上,进行队列的分配。在这个过程中会执行元数据的变更和重平衡操作。而这些可以从审计日志中获取打印日志中可以看到很清楚。

下一篇,我们来看生产者和消费者的细节。生产者和消费者在执行操作前会执行一个操作init初始化操作,而这个初始化操作会将信息注册到将信息注册到MqClient中,因为只有将其进行统一管理的时候,在创建客户端对象的时候才会方便管理,同时方便调用,此时会启动心跳服务,此时少不了还有一个重要的操作就是注册消费组,同时需要关注一个点就是长轮询操作。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
学习使用按位取反~
学习使用按位取反~。
25 6
|
设计模式 人工智能
二零二二年十二月学习总结
二零二二年十二月学习总结
85 2
|
算法 Oracle Java
IT学习深入学习必备的技术网站
IT学习深入学习必备的技术网站
91 0
|
存储 Kubernetes 安全
k8s安全学习
k8s安全学习
385 0
|
程序员 编译器 C++
C++学习——前进(三)
C++学习——前进(三)
92 0
C++学习——前进(三)
|
弹性计算 Linux 数据安全/隐私保护
学习
学习
|
弹性计算 Linux 虚拟化
选择正确,不断学习
对于学计算机的,对于我的专业,学习并掌握Linux操作系统是必须的,但是一开始在自己的电脑用VMware在自己的电脑搭建虚拟机学习,但是这样会导致自己的计算机变得很卡,因为会占用主机很大的内存。在我的老师的引荐下,认识了阿里云服务器,而且他推荐我们去参加“飞天加速计划,高校学生在家实践活动”,那样可以先体验阿里云服务器ECS,看看是否适合自己。于是我便去完成了练习和答题拿到了体验资格。
了解自己的学习
关于“学习”的相关总结
509 0
|
弹性计算 Java 关系型数据库
学习介绍
解压tomcat压缩包 tar -zxvf apache-tomcat-8.5.70.tar.gz。 解 在idea将下面打包成war,通过命令传到服务器的tomcat里面,传进去后将会自己解压 通过这次的云服务器ECS的使用,我收获颇丰,第一次将项目放到了服务器上进行访问,在过程中,因为我目前使用的springboot,所以在tomcat上面使用较少,遇到大多数问题是部署到云服务器上端口以及连接的错误和Linux指令的不熟悉,我经常通过网络找各种解决办法,配置端口,删除重新解压一下,最后才能使用,将这次的心得体会写下来,以后再次使用阿里云服务器的时候,我看看通过这个文章,可以更快的部署服务器

相关实验场景

更多