pmq再学习二

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: 首先启动的过程中,会去获取消费组中的配置信息,拿到消费组中的配置信息后,执行注册消费组操作,而执行注册消费组操作中,会首先注册消费者,然后执行消费组操作,然后执行启动消费者轮询服务,执行mq检查服务启动,mq提交服务启动。完成后,执行监控服务配置操作。这里面最为重要的是启动长轮询服务操作。因为长轮询服务涉及到执行重平衡操作和执行更新元数据操作。更新元数据操作涉及到更新队列元数据操作,此时不可避免的涉及到对偏移量的更新操作。

前面我们知道pmq的生产者和消费者需要进行生产和消费,需要有基础数据的支撑,也即元数据的支撑。这个过程首先需要有主题,然后创建消费组,在消费组中,我们根据我们的需要进行消息的订阅,订阅中也即绑定了消费组和主题的关系。

但是这样是不够的,因为想要进行消息的生产和消费,前面已经说到,需要首先注册消费组到MqClient中。以spring方式启动为例MqStartProcessor,其优先级比MqClientBootstrapListener要高,因此会先启动:

//重写postProcessBeanFactory,这里做了mq客户端启动的初始化@OverridepublicvoidpostProcessBeanFactory(ConfigurableListableBeanFactorybeanFactory) throwsBeansException {
if (environment!=null) { 
if (initFlag.compareAndSet(false, true)) {  
logger.info("消息客户端开始初始化!");
MqClient.setSubscriberResolver(newSubscriberResolver());
MqClientStartup.init(environment);
//MqClientStartup.start();// statService.start();logger.info("消息客户端初始化完成!");
            }
        }
    }

那启动做了什么事情?做了下面的事情:

1.设置订阅解析器2.创建MqConfig对象,同时进行属性填充2.执行mq客户端初始化,同时更新配置,可以看到执行初始化之后,需要执行一个初始化完整操作,由于此时需要的出了将消费者进行填充外,还有消费组需要执行注册操作,元数据、重平衡、队列偏移量需要关注。会执行fireInitEvent->getInitCompleted。3.MqClientBootstrapListener会执行onApplicationEvent,其主要执行两个操作:mq客户端启动->消费组注册,监控配置。

注册的过程如图所示:

微信图片_20221214033523.png


做消费组注册,首先需要有消费组的信息,我们才能注册。也即此时必然要做一个xml解析的工作,将xml下配置的消费组配置文件进行解析。因为它不属于spring,spring不会帮助我们解析这个xml文件,需要我们自己进行解析。也即对自定义xml文件的解析,这里并没有用spring的自定义标签文件进行解析,作者采用一个自己写的工具类进行自己标签元素内容的解析,然后将其放入到了map中。首先通过配置助手拿到resources下配置信息consumer,然后填充到map中=>localConfig。

当然,如果想将相关的bean信息交给spring管理的话,则可以使用spring的自定义标签。如果采用自定义标签的方式对标签进行解析的话,继承NamespaceHandler,同时实现BeanDefinitionParser,然后进行标签的解析工作,从而达到获取的目的。而在dubbo中,其实使用的就是采用这种解析的方式。

1.解析消费组中的xml配置

可以看到下面的解析方式还是非常像spring解析bean的默认标签的。当然解析xml的工具Sax也可以解析。

/*** 获取消费者配置* * @param element* @return*/privateMap<String, ConsumerGroupVo>getConsumerConfig(finalElementelement) {
if (element==null||!element.hasChildNodes()) {
thrownewIllegalArgumentException("配置文件没有子节点异常");
    }
// 获取消费者节点,拿到节点后进行解析NodeListnodeList=element.getElementsByTagName("consumer");
if (nodeList==null||nodeList.getLength() <1) {
nodeList=element.getElementsByTagName("consumers");
// 可以看到resource下,只有配置了messageQueue.xml或者mq.xml的话,才会找到对应的consumerGroup/*** <messageQueue> *   <consumer groupName="test1Sub" alarmEmails="987773698@qq.com"> *     <topics>*        <topic name="test1" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic>*        <topic name="test4" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic> *    </topics> *   </consumer> * </messageQueue>*/if (nodeList==null||nodeList.getLength() <1) {
thrownewIllegalArgumentException("messageQueue.xml or mq.xml 文件中没有找到consumer节点");
        }
nodeList= ((Element)nodeList.item(0)).getElementsByTagName("consumer");
    }
Map<String, ConsumerGroupVo>data=newConcurrentHashMap<>();
intcount=nodeList.getLength();
// 遍历对应的节点,拿到消费组,将其放入到map中for (inti=0; i<count; i++) {
Elementitem= (Element)nodeList.item(i);
ConsumerGroupVoconsumerGroupVo=getConfig(item);
data.put(consumerGroupVo.getMeta().getName(), consumerGroupVo);
    }
returndata;
}

当然,除了这种方式,还有一种我们能想到的方式是采用配置中心,不管是采用springboot还是使用配置中心,都可以看到springboot的自动配置。

有了消费组的数据,就可以执行注册操作了。

2.消费组注册

执行注册消费组操作的过程:

1.首先做校验,检验完成后,做版本号匹配,如果包含,则进行提示已经订阅。2.拿到topic,如果不为空,则将其放入到消费组中,因为从上面我们可以看到一个消费组中可以订阅多个主题,因此可以看到它是一个Map<String,List<String>>的数据结构,进行填充,如果存在,当然消费组也可以有多个。3.而这个注册过程可以在mqClient中看到,首先会执行消费者的注册操作,然后执行消费组的注册操作,而这个过程的体现是在ConsumerController中register操作和consumerGroupRegister,这个过程会触发重平衡和更新元数据。4.监控配置执行更新操作,设置重平衡等待时间4s、发送失败重试次数10次、异步队列容量2000、元数据模式是否是元数据模式、拉增量时间、消息执行超时告警时间、异步提交偏移间隔、异步发送最大等待时间

我们来看执行消费组注册这个操作,因为它包含了消费者注册、消费组注册,消费组注册成功后,则执行启动消费组轮询服务和执行mq检查服务、mq提交服务启动。

消费者注册

此时会执行注册消费者操作,然后启动mq心跳服务,激活注册事件。注册的过程中,会checkVaild执行检查,主要是为了检查元数据中消费者的属性信息是否为空,并进行对应的提示,如果通过校验,则将消费者的对应信息插入到数据库中,方便后续的查操作,如果不存在的话,后续流程就执行不下去了。

2.1消费组注册

更新消费者:更新消费者,注册消费组消费者,对消费组消费者进行注册。这个过程中会从缓存中拿到消费组,同时对其进行审计日志记录,同时执行notifyRb操作(更新重平衡版本,通知消息服务实体执行批量插入notifyMessage信息)

2.2消费者轮询服务

执行长轮询,创建获取消费组请求对象中填充消费者id、消费组版本号,执行消费组处理。如果响应不为空,则设置broker模式为broker元数据模式,同时设置mq环境。而在处理组方法中,我们可以看到消费组中拿到消费者,进行遍历,如果mq执行器中不包含当前消费者的key,则将其放入到mq执行器中,同时可以看到value为mq工厂创建的mq组执行器服务。如果包含,则说明当前的mq执行器存在,则会执行重平衡或者更新操作。然后将当前的消费组版本放入对应的key和版本号,放入到mqContext中。而这里面,最主要的是包含时的处理,也即如果包含消费组,则需要做的处理是怎样的呢?此时会执行rbOrUpdate操作。

rbOrUpdate操作:

如果需要执行重平衡或者更新操作,此时首先会填充消费组信息,同时与本地消费组map进行比较。如果本地消费组map为空,则说明当前是第一次接收服务端接收到初始化数据,此时会设置版本号计数为0如果当前本地消费组map不为空,则是如果大于的情况,则需要执行更新操作,这里包含了前面说到表中两个版本号,一个是元数据版本号,一个是重平衡版本号,此时会执行更新操作,执行doRb和updateMeta操作。

doRb:

当重平衡版本号不一致的时候,需要先停止当前的任务。降低消息重复消费的概率,此时会执行更新重平衡版本号。设置为当前传入的版本号。同时设置当前的队列为本地消费组map中的队列信息。并提示从服务端收到重平衡数据。

updateMeta:

如果拿到的消费组map中填充元数据信息,同时如果当前传入的消费组中的队列属性不为空,而本地消费组map中的队列为空,则执行遍历,同时进行填充为当前的队列信息。防止此时queue定时服务还没有启动,更新执行线程的元数据信息,由具体的执行类来更新相关信息updateQueueMeta。

updateQueueMeta:

更新线程数,同时更新元数据为当前的元数据信息,更新元数据的前台是当前的队列的偏移量和拿到的消费者队列偏移量相等。否则数据当前的偏移量和拿到的队列偏移量不同,此时需要做偏移量更新操作。那此时必然需要考虑的是清掉消息,同时对慢消息进行清理。确保更新拉取消息的起始值,为偏移重置的值,加锁是防止拉取与重置同时操作,将当前的偏移量设置为偏移量,将上一次的偏移量设置为当前的偏移量。如果当前是运行状态,同时消费队列停止的标识是1,则此时将停止标识设置为0,同时执行提交操作doCommit提交偏移量。

doCommit->commitOffset:

如果提交偏移量不为空,同时拿到的队列偏移量不为空,则对队列偏移量信息进行遍历拿到消费队列版本对象,如果temp不为空,则使用doublecheck。如果flag1为true,同时当前版本小于拿到的版本号,则清掉老数据,同时将当前的放入到map中。否者如果当前版本等于拿到的,同时当前的偏移量小于拿到的,则清理老数据,同时放入当前的到map中。如果flag=1,则执行提交和更新操作commitAndUpdate。

提交和更新操作commitAndUpdate:

获取偏移量版本实体map,遍历提交偏移量。将消费组名称添加到HashSet<consumerGroupName>,通过消费组服务调用通知元数据更新元数据通过消费组名称,notifyMeta插入元数据操作。这里面最重要的就是提交偏移量doCommitOffset。

doCommitOffset:

如果flag为1,则执行提交和更新版本号操作,如果提交成功,则执行更新当前的偏移量版本。否者直接提交偏移量。如果rs成功,则执行更新偏移量和偏移量版本操作。

2.3mq检查服务启动

执行检查数据,检查版本号map,检查消费组元数据版本号、重平衡版本号、总版本号是否为空,同时和本地消费组中的元数据版本号、重平衡版本、总版本号是否相等。

2.4mq提交服务启动

执行提交偏移量commitOffset->commitAndUpdate->doCommitOffset

3.监控配置

主要的配置信息:这些配置信息可以在mqConfig中看到

rbTimes重平衡等待时间、pbRetryTimes发送失败重试次数、readTimeOut消息发送拉取超时时间、pullDeltaTime数据拉取没有数据时,递增值、metaMode元数据模式、publishAsynTimeout异步发送最大等待时间、warnTimeout消息执行超时告警时间、aynCommitInterval异步提交偏移间隔、synCommit标识是否是同步提交偏移

总结一下:

首先启动的过程中,会去获取消费组中的配置信息,拿到消费组中的配置信息后,执行注册消费组操作,而执行注册消费组操作中,会首先注册消费者,然后执行消费组操作,然后执行启动消费者轮询服务,执行mq检查服务启动,mq提交服务启动。完成后,执行监控服务配置操作。

这里面最为重要的是启动长轮询服务操作。因为长轮询服务涉及到执行重平衡操作和执行更新元数据操作。更新元数据操作涉及到更新队列元数据操作,此时不可避免的涉及到对偏移量的更新操作。

下一篇我们来看看生产者和消费者执行操作的流程。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1月前
|
存储 Kubernetes 持续交付
k8s学习
【10月更文挑战第1天】
82 4
|
5月前
学习putpixel画点
【6月更文挑战第30天】学习putpixel画点。
33 1
|
分布式计算 架构师 前端开发
IT学习视频
一、架构师: N学教育 N学教育P7架构师|价值14999元课程由一线大厂资深架构师(阿里 P8+)进行主讲。业内首次将分布式架构全部技术点串联,并结合大厂真实案例实践讲解,将后端架构技术全面系统的展现,帮助同学们从全局视角掌握分布式架构设计方法,成为一名合格的架构师。他们用名师+好课来改变世界,打破业界严重缺乏顶层架构思维且仅靠疯狂堆叠知识点常见做法,初心不变,让每个人持续提升职业能力!让每个程序人获得抵御寒冬的能力!
79 0
|
NoSQL Java jenkins
|
弹性计算 Linux 虚拟化
选择正确,不断学习
对于学计算机的,对于我的专业,学习并掌握Linux操作系统是必须的,但是一开始在自己的电脑用VMware在自己的电脑搭建虚拟机学习,但是这样会导致自己的计算机变得很卡,因为会占用主机很大的内存。在我的老师的引荐下,认识了阿里云服务器,而且他推荐我们去参加“飞天加速计划,高校学生在家实践活动”,那样可以先体验阿里云服务器ECS,看看是否适合自己。于是我便去完成了练习和答题拿到了体验资格。
|
存储 缓存 网络协议
学习总结
学习总结
|
小程序 程序员 Windows
学习情况
一·自我情况,二·使用过程,三·心得体会
|
弹性计算 算法 小程序
我是自愿学习的
沉迷学习 日渐消瘦
我是自愿学习的
|
C# Windows 程序员
|
C# 图形学 Windows