开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):实时更新消息消费队列与索引文件流程说明】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12483
实时更新消息消费队列与索引文件流程说明
内容介绍:
一.消息数据分发流程
二.流程回顾
一.消息数据分发流程
实时更新消息消费队列与索引文件相关的知识,消息被发送到MQ 之后,首先会将消息存储到CommitLog, CommitLog 再负责将消息分发到 Consumerqueue 以及 indexFile 中,如果在消息分发时出现延迟,会造成当前的消费者不能够及时消费消息,根据indexFile 不会立即能够查找到对应的消息,RocketMQ 为了提高消息分发的准确性,专门有一个线程进行处理,通过ReputMessageService 这个专门的线程来处理消息数据的分发。
该线程的开启随着 DefaultMessageStore 这个类的启动会被开启。
Public void statrc() throws Exception{
This.reputMessageService.start();
在 DefaultMessageStore 中消息存储 start方法被打开调用之后,有一个reportMessageService,这个其实是一个线程,开启了线程 start 方法本质上在调用线程中的 run 方法,run。中会每隔一毫秒执行数据分发,数据分发的业务逻辑都在 doReport 中。
在 doReport 中首先根据当前 report FromOffset 转发的位移量开始查找全部有效的数据,先将数据从 CommitLog 中查找出来,对应代码就是如下一行:
DefaultMessageStore.this.comitlog.getData(report FromOffset );
根据 report FromOffset 从 CommitLog 中找到对应的数据,数据就是一条条的消息,消息有许多条,所以做一个循环遍历 result
For(int readSize = 0; readSize < result.getSize() && doNext;)
遍历完成之后进行分发时,分发到两个位置 Consumerqueue 以及 indexFile,所以有一个关键地方如下图
这个地方开始进行数据的分发的处理。在 doDispatch 方法中,又是一个遍历,该遍历中 CommitLog Dispatcher。是一个接口,它的实现类有两个,一个是CommitLogDispatcherBuildConsumeQueue(专门给 Consumerqueue 分发的实现类)一个是给 indexFile 专门进行分发的实现类
这两个类具体分发的业务对应构建消息队列然后调用CommitLogDispatcherBuildConsumeQueue 给Consumerqueue 分发,然后调用CommitLogDispatcherBuildIndex 给 index 索引文件进行分发处理。
二.流程回顾
数据分发基本流程如上。整个数据分发的服务在当前DefaultMessageStore 方法中有一个开启线程的 start 方法,然后在 start。中reputMessageService 是一个线程,会调用线程的 run 方法,在 run 中通过doReport 进行数据分发。在数据分发时第一步是查找数据,查找出来之后,遍历每一条消息,调用接口中的 doDispatch 方法进行分发。接口在调用时使用的是接口的实现类,实现类只有有两个CommitLogDispatcherBuildConsumeQueue
以及 CommitLogDispatcherBuildIndex。