实时更新消息消费队列与索引文件流程说明|学习笔记

简介: 快速学习实时更新消息消费队列与索引文件流程说明

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)实时更新消息消费队列与索引文件流程说明】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12483


实时更新消息消费队列与索引文件流程说明

 

内容介绍:

一.消息数据分发流程

二.流程回顾

 

一.消息数据分发流程

实时更新消息消费队列与索引文件相关的知识,消息被发送到MQ 之后,首先会将消息存储到CommitLog, CommitLog 再负责将消息分发到 Consumerqueue 以及 indexFile 中,如果在消息分发时出现延迟,会造成当前的消费者不能够及时消费消息,根据indexFile 不会立即能够查找到对应的消息,RocketMQ 为了提高消息分发的准确性,专门有一个线程进行处理,通过ReputMessageService 这个专门的线程来处理消息数据的分发。

image.png

该线程的开启随着 DefaultMessageStore 这个类的启动会被开启。

Public void statrc() throws Exception{

This.reputMessageService.start();

image.png在 DefaultMessageStore 中消息存储 start方法被打开调用之后,有一个reportMessageService,这个其实是一个线程,开启了线程 start 方法本质上在调用线程中的 run 方法,run。中会每隔一毫秒执行数据分发,数据分发的业务逻辑都在 doReport 中。

image.png在 doReport 中首先根据当前 report FromOffset 转发的位移量开始查找全部有效的数据,先将数据从 CommitLog 中查找出来,对应代码就是如下一行:

DefaultMessageStore.this.comitlog.getData(report FromOffset );

根据 report FromOffset 从 CommitLog 中找到对应的数据,数据就是一条条的消息,消息有许多条,所以做一个循环遍历 result

For(int readSize = 0; readSize < result.getSize() && doNext;)

遍历完成之后进行分发时,分发到两个位置 Consumerqueue 以及 indexFile,所以有一个关键地方如下图

image.png

这个地方开始进行数据的分发的处理。在 doDispatch 方法中,又是一个遍历,该遍历中 CommitLog Dispatcher。是一个接口,它的实现类有两个,一个是CommitLogDispatcherBuildConsumeQueue(专门给 Consumerqueue 分发的实现类)一个是给 indexFile 专门进行分发的实现类

image.png

这两个类具体分发的业务对应构建消息队列然后调用CommitLogDispatcherBuildConsumeQueue 给Consumerqueue 分发,然后调用CommitLogDispatcherBuildIndex 给 index 索引文件进行分发处理。

 

二.流程回顾

数据分发基本流程如上。整个数据分发的服务在当前DefaultMessageStore 方法中有一个开启线程的 start 方法,然后在 start。中reputMessageService 是一个线程,会调用线程的 run 方法,在 run 中通过doReport 进行数据分发。在数据分发时第一步是查找数据,查找出来之后,遍历每一条消息,调用接口中的 doDispatch 方法进行分发。接口在调用时使用的是接口的实现类,实现类只有有两个CommitLogDispatcherBuildConsumeQueue 以及 CommitLogDispatcherBuildIndex。

相关文章
|
3月前
|
消息中间件 存储 Kafka
几种 MQ 顺序消息的实现方式
几种 MQ 顺序消息的实现方式
|
消息中间件 关系型数据库 MySQL
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
|
12月前
|
消息中间件 存储 网络协议
大厂都是如何处理重复消息的?
消息消费失败,很多框架会自动执行重试,而重试就产生了重复消息。 MQTT协议给出三种传递消息时能够提供的
226 0
|
消息中间件 存储 缓存
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
|
存储 消息中间件 Linux
多类型业务消息专题-顺序消息 | 学习笔记
快速学习多类型业务消息专题-顺序消息
62 0
多类型业务消息专题-顺序消息 | 学习笔记
|
消息中间件 存储 算法
多类型业务消息专题-定时消息| 学习笔记
快速学习多类型业务消息专题-定时消息
138 0
多类型业务消息专题-定时消息| 学习笔记
|
消息中间件 存储 RocketMQ
【视频】顺序消息| 学习笔记
快速学习【视频】顺序消息
63 0
【视频】顺序消息| 学习笔记
|
消息中间件 Java 开发者
消息类型-顺序消息|学习笔记
快速学习消息类型-顺序消息
90 0
消息类型-顺序消息|学习笔记
|
消息中间件 RocketMQ 开发者
顺序消息分析|学习笔记
快速学习顺序消息分析
70 0
顺序消息分析|学习笔记
|
消息中间件 RocketMQ 开发者
顺序消息消费者|学习笔记
快速学习顺序消息消费者
51 0