pmq再学习二

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 首先启动的过程中,会去获取消费组中的配置信息,拿到消费组中的配置信息后,执行注册消费组操作,而执行注册消费组操作中,会首先注册消费者,然后执行消费组操作,然后执行启动消费者轮询服务,执行mq检查服务启动,mq提交服务启动。完成后,执行监控服务配置操作。这里面最为重要的是启动长轮询服务操作。因为长轮询服务涉及到执行重平衡操作和执行更新元数据操作。更新元数据操作涉及到更新队列元数据操作,此时不可避免的涉及到对偏移量的更新操作。

前面我们知道pmq的生产者和消费者需要进行生产和消费,需要有基础数据的支撑,也即元数据的支撑。这个过程首先需要有主题,然后创建消费组,在消费组中,我们根据我们的需要进行消息的订阅,订阅中也即绑定了消费组和主题的关系。

但是这样是不够的,因为想要进行消息的生产和消费,前面已经说到,需要首先注册消费组到MqClient中。以spring方式启动为例MqStartProcessor,其优先级比MqClientBootstrapListener要高,因此会先启动:

//重写postProcessBeanFactory,这里做了mq客户端启动的初始化@OverridepublicvoidpostProcessBeanFactory(ConfigurableListableBeanFactorybeanFactory) throwsBeansException {
if (environment!=null) { 
if (initFlag.compareAndSet(false, true)) {  
logger.info("消息客户端开始初始化!");
MqClient.setSubscriberResolver(newSubscriberResolver());
MqClientStartup.init(environment);
//MqClientStartup.start();// statService.start();logger.info("消息客户端初始化完成!");
            }
        }
    }

那启动做了什么事情?做了下面的事情:

1.设置订阅解析器2.创建MqConfig对象,同时进行属性填充2.执行mq客户端初始化,同时更新配置,可以看到执行初始化之后,需要执行一个初始化完整操作,由于此时需要的出了将消费者进行填充外,还有消费组需要执行注册操作,元数据、重平衡、队列偏移量需要关注。会执行fireInitEvent->getInitCompleted。3.MqClientBootstrapListener会执行onApplicationEvent,其主要执行两个操作:mq客户端启动->消费组注册,监控配置。

注册的过程如图所示:

微信图片_20221214033523.png


做消费组注册,首先需要有消费组的信息,我们才能注册。也即此时必然要做一个xml解析的工作,将xml下配置的消费组配置文件进行解析。因为它不属于spring,spring不会帮助我们解析这个xml文件,需要我们自己进行解析。也即对自定义xml文件的解析,这里并没有用spring的自定义标签文件进行解析,作者采用一个自己写的工具类进行自己标签元素内容的解析,然后将其放入到了map中。首先通过配置助手拿到resources下配置信息consumer,然后填充到map中=>localConfig。

当然,如果想将相关的bean信息交给spring管理的话,则可以使用spring的自定义标签。如果采用自定义标签的方式对标签进行解析的话,继承NamespaceHandler,同时实现BeanDefinitionParser,然后进行标签的解析工作,从而达到获取的目的。而在dubbo中,其实使用的就是采用这种解析的方式。

1.解析消费组中的xml配置

可以看到下面的解析方式还是非常像spring解析bean的默认标签的。当然解析xml的工具Sax也可以解析。

/*** 获取消费者配置* * @param element* @return*/privateMap<String, ConsumerGroupVo>getConsumerConfig(finalElementelement) {
if (element==null||!element.hasChildNodes()) {
thrownewIllegalArgumentException("配置文件没有子节点异常");
    }
// 获取消费者节点,拿到节点后进行解析NodeListnodeList=element.getElementsByTagName("consumer");
if (nodeList==null||nodeList.getLength() <1) {
nodeList=element.getElementsByTagName("consumers");
// 可以看到resource下,只有配置了messageQueue.xml或者mq.xml的话,才会找到对应的consumerGroup/*** <messageQueue> *   <consumer groupName="test1Sub" alarmEmails="987773698@qq.com"> *     <topics>*        <topic name="test1" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic>*        <topic name="test4" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic> *    </topics> *   </consumer> * </messageQueue>*/if (nodeList==null||nodeList.getLength() <1) {
thrownewIllegalArgumentException("messageQueue.xml or mq.xml 文件中没有找到consumer节点");
        }
nodeList= ((Element)nodeList.item(0)).getElementsByTagName("consumer");
    }
Map<String, ConsumerGroupVo>data=newConcurrentHashMap<>();
intcount=nodeList.getLength();
// 遍历对应的节点,拿到消费组,将其放入到map中for (inti=0; i<count; i++) {
Elementitem= (Element)nodeList.item(i);
ConsumerGroupVoconsumerGroupVo=getConfig(item);
data.put(consumerGroupVo.getMeta().getName(), consumerGroupVo);
    }
returndata;
}

当然,除了这种方式,还有一种我们能想到的方式是采用配置中心,不管是采用springboot还是使用配置中心,都可以看到springboot的自动配置。

有了消费组的数据,就可以执行注册操作了。

2.消费组注册

执行注册消费组操作的过程:

1.首先做校验,检验完成后,做版本号匹配,如果包含,则进行提示已经订阅。2.拿到topic,如果不为空,则将其放入到消费组中,因为从上面我们可以看到一个消费组中可以订阅多个主题,因此可以看到它是一个Map<String,List<String>>的数据结构,进行填充,如果存在,当然消费组也可以有多个。3.而这个注册过程可以在mqClient中看到,首先会执行消费者的注册操作,然后执行消费组的注册操作,而这个过程的体现是在ConsumerController中register操作和consumerGroupRegister,这个过程会触发重平衡和更新元数据。4.监控配置执行更新操作,设置重平衡等待时间4s、发送失败重试次数10次、异步队列容量2000、元数据模式是否是元数据模式、拉增量时间、消息执行超时告警时间、异步提交偏移间隔、异步发送最大等待时间

我们来看执行消费组注册这个操作,因为它包含了消费者注册、消费组注册,消费组注册成功后,则执行启动消费组轮询服务和执行mq检查服务、mq提交服务启动。

消费者注册

此时会执行注册消费者操作,然后启动mq心跳服务,激活注册事件。注册的过程中,会checkVaild执行检查,主要是为了检查元数据中消费者的属性信息是否为空,并进行对应的提示,如果通过校验,则将消费者的对应信息插入到数据库中,方便后续的查操作,如果不存在的话,后续流程就执行不下去了。

2.1消费组注册

更新消费者:更新消费者,注册消费组消费者,对消费组消费者进行注册。这个过程中会从缓存中拿到消费组,同时对其进行审计日志记录,同时执行notifyRb操作(更新重平衡版本,通知消息服务实体执行批量插入notifyMessage信息)

2.2消费者轮询服务

执行长轮询,创建获取消费组请求对象中填充消费者id、消费组版本号,执行消费组处理。如果响应不为空,则设置broker模式为broker元数据模式,同时设置mq环境。而在处理组方法中,我们可以看到消费组中拿到消费者,进行遍历,如果mq执行器中不包含当前消费者的key,则将其放入到mq执行器中,同时可以看到value为mq工厂创建的mq组执行器服务。如果包含,则说明当前的mq执行器存在,则会执行重平衡或者更新操作。然后将当前的消费组版本放入对应的key和版本号,放入到mqContext中。而这里面,最主要的是包含时的处理,也即如果包含消费组,则需要做的处理是怎样的呢?此时会执行rbOrUpdate操作。

rbOrUpdate操作:

如果需要执行重平衡或者更新操作,此时首先会填充消费组信息,同时与本地消费组map进行比较。如果本地消费组map为空,则说明当前是第一次接收服务端接收到初始化数据,此时会设置版本号计数为0如果当前本地消费组map不为空,则是如果大于的情况,则需要执行更新操作,这里包含了前面说到表中两个版本号,一个是元数据版本号,一个是重平衡版本号,此时会执行更新操作,执行doRb和updateMeta操作。

doRb:

当重平衡版本号不一致的时候,需要先停止当前的任务。降低消息重复消费的概率,此时会执行更新重平衡版本号。设置为当前传入的版本号。同时设置当前的队列为本地消费组map中的队列信息。并提示从服务端收到重平衡数据。

updateMeta:

如果拿到的消费组map中填充元数据信息,同时如果当前传入的消费组中的队列属性不为空,而本地消费组map中的队列为空,则执行遍历,同时进行填充为当前的队列信息。防止此时queue定时服务还没有启动,更新执行线程的元数据信息,由具体的执行类来更新相关信息updateQueueMeta。

updateQueueMeta:

更新线程数,同时更新元数据为当前的元数据信息,更新元数据的前台是当前的队列的偏移量和拿到的消费者队列偏移量相等。否则数据当前的偏移量和拿到的队列偏移量不同,此时需要做偏移量更新操作。那此时必然需要考虑的是清掉消息,同时对慢消息进行清理。确保更新拉取消息的起始值,为偏移重置的值,加锁是防止拉取与重置同时操作,将当前的偏移量设置为偏移量,将上一次的偏移量设置为当前的偏移量。如果当前是运行状态,同时消费队列停止的标识是1,则此时将停止标识设置为0,同时执行提交操作doCommit提交偏移量。

doCommit->commitOffset:

如果提交偏移量不为空,同时拿到的队列偏移量不为空,则对队列偏移量信息进行遍历拿到消费队列版本对象,如果temp不为空,则使用doublecheck。如果flag1为true,同时当前版本小于拿到的版本号,则清掉老数据,同时将当前的放入到map中。否者如果当前版本等于拿到的,同时当前的偏移量小于拿到的,则清理老数据,同时放入当前的到map中。如果flag=1,则执行提交和更新操作commitAndUpdate。

提交和更新操作commitAndUpdate:

获取偏移量版本实体map,遍历提交偏移量。将消费组名称添加到HashSet<consumerGroupName>,通过消费组服务调用通知元数据更新元数据通过消费组名称,notifyMeta插入元数据操作。这里面最重要的就是提交偏移量doCommitOffset。

doCommitOffset:

如果flag为1,则执行提交和更新版本号操作,如果提交成功,则执行更新当前的偏移量版本。否者直接提交偏移量。如果rs成功,则执行更新偏移量和偏移量版本操作。

2.3mq检查服务启动

执行检查数据,检查版本号map,检查消费组元数据版本号、重平衡版本号、总版本号是否为空,同时和本地消费组中的元数据版本号、重平衡版本、总版本号是否相等。

2.4mq提交服务启动

执行提交偏移量commitOffset->commitAndUpdate->doCommitOffset

3.监控配置

主要的配置信息:这些配置信息可以在mqConfig中看到

rbTimes重平衡等待时间、pbRetryTimes发送失败重试次数、readTimeOut消息发送拉取超时时间、pullDeltaTime数据拉取没有数据时,递增值、metaMode元数据模式、publishAsynTimeout异步发送最大等待时间、warnTimeout消息执行超时告警时间、aynCommitInterval异步提交偏移间隔、synCommit标识是否是同步提交偏移

总结一下:

首先启动的过程中,会去获取消费组中的配置信息,拿到消费组中的配置信息后,执行注册消费组操作,而执行注册消费组操作中,会首先注册消费者,然后执行消费组操作,然后执行启动消费者轮询服务,执行mq检查服务启动,mq提交服务启动。完成后,执行监控服务配置操作。

这里面最为重要的是启动长轮询服务操作。因为长轮询服务涉及到执行重平衡操作和执行更新元数据操作。更新元数据操作涉及到更新队列元数据操作,此时不可避免的涉及到对偏移量的更新操作。

下一篇我们来看看生产者和消费者执行操作的流程。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
学习putpixel画点
学习putpixel画点。
60 20
|
2月前
学习使用按位取反~
学习使用按位取反~。
25 6
|
前端开发 NoSQL 算法
需要学习
需要学习
210 0
|
小程序 程序员 Windows
学习情况
一·自我情况,二·使用过程,三·心得体会
|
Java 关系型数据库 MySQL
学习
通过一些简单的入门试题之后,我便成功的领到了为期两周的阿里云Esc服务器。在进行简单的基础配置之后,就开始根据教程进行服务器的操作。这其中的教程十分的详细,根据教程中的指导,我完成了云服务器中Linux系统基础Java环境、MySql环境的相关的环境配置,之后一个简单的springboot项目程序便可以在服务器上成功的跑起来。虽然这期间也遇到了一些小问题,如未打开相应的端口号,导致页面无法正常的进入,但在查阅资料以后都能解决。总体而言,在这个过程中,我体验到了服务器的神奇,也学习到了许多相关的知识,也希望在接下去的时间中继续进行相关的知识学习。
|
Java 自然语言处理 cobar
|
Java
Java底层学习
最近在看几本Java的书,也做了很多笔记,主要是关于Java虚拟机、Java GC、Java 并发编程等方面,参考的主要几本书籍有: 《深入理解Java虚拟机》——周志明 《深入理解Java虚拟机 第二版》——美 Bi...
1665 0