前面我们知道pmq的生产者和消费者需要进行生产和消费,需要有基础数据的支撑,也即元数据的支撑。这个过程首先需要有主题,然后创建消费组,在消费组中,我们根据我们的需要进行消息的订阅,订阅中也即绑定了消费组和主题的关系。
但是这样是不够的,因为想要进行消息的生产和消费,前面已经说到,需要首先注册消费组到MqClient中。以spring方式启动为例MqStartProcessor,其优先级比MqClientBootstrapListener要高,因此会先启动:
//重写postProcessBeanFactory,这里做了mq客户端启动的初始化publicvoidpostProcessBeanFactory(ConfigurableListableBeanFactorybeanFactory) throwsBeansException { if (environment!=null) { if (initFlag.compareAndSet(false, true)) { logger.info("消息客户端开始初始化!"); MqClient.setSubscriberResolver(newSubscriberResolver()); MqClientStartup.init(environment); //MqClientStartup.start();// statService.start();logger.info("消息客户端初始化完成!"); } } }
那启动做了什么事情?做了下面的事情:
1.设置订阅解析器2.创建MqConfig对象,同时进行属性填充2.执行mq客户端初始化,同时更新配置,可以看到执行初始化之后,需要执行一个初始化完整操作,由于此时需要的出了将消费者进行填充外,还有消费组需要执行注册操作,元数据、重平衡、队列偏移量需要关注。会执行fireInitEvent->getInitCompleted。3.MqClientBootstrapListener会执行onApplicationEvent,其主要执行两个操作:mq客户端启动->消费组注册,监控配置。
注册的过程如图所示:
做消费组注册,首先需要有消费组的信息,我们才能注册。也即此时必然要做一个xml解析的工作,将xml下配置的消费组配置文件进行解析。因为它不属于spring,spring不会帮助我们解析这个xml文件,需要我们自己进行解析。也即对自定义xml文件的解析,这里并没有用spring的自定义标签文件进行解析,作者采用一个自己写的工具类进行自己标签元素内容的解析,然后将其放入到了map中。首先通过配置助手拿到resources下配置信息consumer,然后填充到map中=>localConfig。
当然,如果想将相关的bean信息交给spring管理的话,则可以使用spring的自定义标签。如果采用自定义标签的方式对标签进行解析的话,继承NamespaceHandler,同时实现BeanDefinitionParser,然后进行标签的解析工作,从而达到获取的目的。而在dubbo中,其实使用的就是采用这种解析的方式。
1.解析消费组中的xml配置
可以看到下面的解析方式还是非常像spring解析bean的默认标签的。当然解析xml的工具Sax也可以解析。
/*** 获取消费者配置* * @param element* @return*/privateMap<String, ConsumerGroupVo>getConsumerConfig(finalElementelement) { if (element==null||!element.hasChildNodes()) { thrownewIllegalArgumentException("配置文件没有子节点异常"); } // 获取消费者节点,拿到节点后进行解析NodeListnodeList=element.getElementsByTagName("consumer"); if (nodeList==null||nodeList.getLength() <1) { nodeList=element.getElementsByTagName("consumers"); // 可以看到resource下,只有配置了messageQueue.xml或者mq.xml的话,才会找到对应的consumerGroup/*** <messageQueue> * <consumer groupName="test1Sub" alarmEmails="987773698@qq.com"> * <topics>* <topic name="test1" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic>* <topic name="test4" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic> * </topics> * </consumer> * </messageQueue>*/if (nodeList==null||nodeList.getLength() <1) { thrownewIllegalArgumentException("messageQueue.xml or mq.xml 文件中没有找到consumer节点"); } nodeList= ((Element)nodeList.item(0)).getElementsByTagName("consumer"); } Map<String, ConsumerGroupVo>data=newConcurrentHashMap<>(); intcount=nodeList.getLength(); // 遍历对应的节点,拿到消费组,将其放入到map中for (inti=0; i<count; i++) { Elementitem= (Element)nodeList.item(i); ConsumerGroupVoconsumerGroupVo=getConfig(item); data.put(consumerGroupVo.getMeta().getName(), consumerGroupVo); } returndata; }
当然,除了这种方式,还有一种我们能想到的方式是采用配置中心,不管是采用springboot还是使用配置中心,都可以看到springboot的自动配置。
有了消费组的数据,就可以执行注册操作了。
2.消费组注册
执行注册消费组操作的过程:
1.首先做校验,检验完成后,做版本号匹配,如果包含,则进行提示已经订阅。2.拿到topic,如果不为空,则将其放入到消费组中,因为从上面我们可以看到一个消费组中可以订阅多个主题,因此可以看到它是一个Map<String,List<String>>的数据结构,进行填充,如果存在,当然消费组也可以有多个。3.而这个注册过程可以在mqClient中看到,首先会执行消费者的注册操作,然后执行消费组的注册操作,而这个过程的体现是在ConsumerController中register操作和consumerGroupRegister,这个过程会触发重平衡和更新元数据。4.监控配置执行更新操作,设置重平衡等待时间4s、发送失败重试次数10次、异步队列容量2000、元数据模式是否是元数据模式、拉增量时间、消息执行超时告警时间、异步提交偏移间隔、异步发送最大等待时间
我们来看执行消费组注册这个操作,因为它包含了消费者注册、消费组注册,消费组注册成功后,则执行启动消费组轮询服务和执行mq检查服务、mq提交服务启动。
消费者注册
此时会执行注册消费者操作,然后启动mq心跳服务,激活注册事件。注册的过程中,会checkVaild执行检查,主要是为了检查元数据中消费者的属性信息是否为空,并进行对应的提示,如果通过校验,则将消费者的对应信息插入到数据库中,方便后续的查操作,如果不存在的话,后续流程就执行不下去了。
2.1消费组注册
更新消费者:更新消费者,注册消费组消费者,对消费组消费者进行注册。这个过程中会从缓存中拿到消费组,同时对其进行审计日志记录,同时执行notifyRb操作(更新重平衡版本,通知消息服务实体执行批量插入notifyMessage信息)。
2.2消费者轮询服务
执行长轮询,创建获取消费组请求对象中填充消费者id、消费组版本号,执行消费组处理。如果响应不为空,则设置broker模式为broker元数据模式,同时设置mq环境。而在处理组方法中,我们可以看到消费组中拿到消费者,进行遍历,如果mq执行器中不包含当前消费者的key,则将其放入到mq执行器中,同时可以看到value为mq工厂创建的mq组执行器服务。如果包含,则说明当前的mq执行器存在,则会执行重平衡或者更新操作。然后将当前的消费组版本放入对应的key和版本号,放入到mqContext中。而这里面,最主要的是包含时的处理,也即如果包含消费组,则需要做的处理是怎样的呢?此时会执行rbOrUpdate操作。
rbOrUpdate操作:
如果需要执行重平衡或者更新操作,此时首先会填充消费组信息,同时与本地消费组map进行比较。如果本地消费组map为空,则说明当前是第一次接收服务端接收到初始化数据,此时会设置版本号计数为0如果当前本地消费组map不为空,则是如果大于的情况,则需要执行更新操作,这里包含了前面说到表中两个版本号,一个是元数据版本号,一个是重平衡版本号,此时会执行更新操作,执行doRb和updateMeta操作。
doRb:
当重平衡版本号不一致的时候,需要先停止当前的任务。降低消息重复消费的概率,此时会执行更新重平衡版本号。设置为当前传入的版本号。同时设置当前的队列为本地消费组map中的队列信息。并提示从服务端收到重平衡数据。
updateMeta:
如果拿到的消费组map中填充元数据信息,同时如果当前传入的消费组中的队列属性不为空,而本地消费组map中的队列为空,则执行遍历,同时进行填充为当前的队列信息。防止此时queue定时服务还没有启动,更新执行线程的元数据信息,由具体的执行类来更新相关信息updateQueueMeta。
updateQueueMeta:
更新线程数,同时更新元数据为当前的元数据信息,更新元数据的前台是当前的队列的偏移量和拿到的消费者队列偏移量相等。否则数据当前的偏移量和拿到的队列偏移量不同,此时需要做偏移量更新操作。那此时必然需要考虑的是清掉消息,同时对慢消息进行清理。确保更新拉取消息的起始值,为偏移重置的值,加锁是防止拉取与重置同时操作,将当前的偏移量设置为偏移量,将上一次的偏移量设置为当前的偏移量。如果当前是运行状态,同时消费队列停止的标识是1,则此时将停止标识设置为0,同时执行提交操作doCommit提交偏移量。
doCommit->commitOffset:
如果提交偏移量不为空,同时拿到的队列偏移量不为空,则对队列偏移量信息进行遍历拿到消费队列版本对象,如果temp不为空,则使用doublecheck。如果flag1为true,同时当前版本小于拿到的版本号,则清掉老数据,同时将当前的放入到map中。否者如果当前版本等于拿到的,同时当前的偏移量小于拿到的,则清理老数据,同时放入当前的到map中。如果flag=1,则执行提交和更新操作commitAndUpdate。
提交和更新操作commitAndUpdate:
获取偏移量版本实体map,遍历提交偏移量。将消费组名称添加到HashSet<consumerGroupName>中,通过消费组服务调用通知元数据更新元数据通过消费组名称,notifyMeta插入元数据操作。这里面最重要的就是提交偏移量doCommitOffset。
doCommitOffset:
如果flag为1,则执行提交和更新版本号操作,如果提交成功,则执行更新当前的偏移量版本。否者直接提交偏移量。如果rs成功,则执行更新偏移量和偏移量版本操作。
2.3mq检查服务启动
执行检查数据,检查版本号map,检查消费组元数据版本号、重平衡版本号、总版本号是否为空,同时和本地消费组中的元数据版本号、重平衡版本、总版本号是否相等。
2.4mq提交服务启动
执行提交偏移量commitOffset->commitAndUpdate->doCommitOffset
3.监控配置
主要的配置信息:这些配置信息可以在mqConfig中看到
rbTimes重平衡等待时间、pbRetryTimes发送失败重试次数、readTimeOut消息发送拉取超时时间、pullDeltaTime数据拉取没有数据时,递增值、metaMode元数据模式、publishAsynTimeout异步发送最大等待时间、warnTimeout消息执行超时告警时间、aynCommitInterval异步提交偏移间隔、synCommit标识是否是同步提交偏移
总结一下:
首先启动的过程中,会去获取消费组中的配置信息,拿到消费组中的配置信息后,执行注册消费组操作,而执行注册消费组操作中,会首先注册消费者,然后执行消费组操作,然后执行启动消费者轮询服务,执行mq检查服务启动,mq提交服务启动。完成后,执行监控服务配置操作。
这里面最为重要的是启动长轮询服务操作。因为长轮询服务涉及到执行重平衡操作和执行更新元数据操作。更新元数据操作涉及到更新队列元数据操作,此时不可避免的涉及到对偏移量的更新操作。
下一篇我们来看看生产者和消费者执行操作的流程。