消息的写入和读取流程

简介: 接之前几篇消息中间件组件的模块划分,本篇内容讲述消息的写入和读取流程。 消息的队列模型 队列模型 在描述消息的写入和读取流程之前,首先要弄清楚消息队列的模型是怎么样的,包括消息是怎么存储的。

接之前几篇消息中间件组件的模块划分,本篇内容讲述消息的写入和读取流程。

消息的队列模型

队列模型

在描述消息的写入和读取流程之前,首先要弄清楚消息队列的模型是怎么样的,包括消息是怎么存储的。

简化的队列模型大致如上图所示。

  • 一个Topic由多个Partition组成,每个Partition是一个队列,即一个Partition内部的消息是先进先出的(顺序的)

  • Producer会写多个Partition,Consumer也会去读多个Partition

  • Producer不断的向Partition末尾追加消息,Consumer从队列的头开始一直向后读取消息

存储模型

为了便于理解,上面的队列模型中仅仅将消息的一个写入队列抽象成一个Topic Partition,但在实践中这是不够的。

消息有用户产生并写入消息队列,每一条消息都是不一样的,在实践中这样“一层”的结构是无法满足要求的。

如上图,直接采用“一层”的结构存储消息。那么:

  • 每个Consumer需要维护自己读取的进度,这个进度是一个偏移量且是不连续的,无法支持调整消费进度之类的操作;

  • 无法确认队列中的消息数量;

  • 需要多次读取队列才能获取一条消息(先按照存储协议读取数消息大小,再按照偏移量+大小读,读取一条完整的消息);

  • ......

“一层”的存储模型在实践中是无法使用的。实践中对消息存储的模型往往是分为索引+存储的两层结构,RocketMQ也是这种实现。

消息的存储模型分为两层,其中:

  1. Storage Queue为存储队列,存储实际的消息(完成的消息,包含各种属性和内容)

  2. Index Queue是消息的索引队列,元素长度是固定的,比如元素内容为消息位置和消息大小(这样12个字节可以完整的定位出一条消息)

这样做的优势:

  • Consumer维护的进度是一个连续的值(索引队列的读取进度),可以进行调整控制

  • 索引队列包含的元素数量即为消息数量

  • 存储队列可以是共享的,这样全局消息对磁盘来说都是顺序写

  • ......

消息的写入流程

有了消息队列模型的认识之后,来梳理消息写入流程会清晰的多。

几点共识:

  1. Topic是有多分区的,一条消息只会落到一个分区中,所以这里包含了一个路由策略;

  2. 消息存储包含了索引队列和存储队列,所以写入一条消息时除了存储消息本身,还需要构建消息索引

  3. 消息是写到Broker的磁盘上的,会涉及到刷盘操作

消息的写入流程大致如下:

(流程中忽略了非核心的步骤和错误的处理,比如消息合法性的验证、元数据获取失败的处理等)

其中1-4步为Producer上的操作;5-8步为服务端流程。

具体步骤和一些实现相关的说明:

  1. 元数据通过NameServer获取。元数据中需要包含Topic分区的分布情况,即Topic有多少个分区,每个分区落在哪台服务器上

  2. 路由部分实现了消息和分区的对应关系。因为消息队列只会保证分区内数据的顺序性,所以当一些消息需要保证顺序时,我们需要将这些消息写入到同一个分区,路由策略需要保证这一点

  3. 序列化和网络包的处理包含了消息存储协议的内容和网络相关协议的内容,这块看Kafka和RcoektMQ都是自定义协议,之后会专门抽篇幅讲怎么设计这块的协议

  4. 对Producer而言,写入消息就是向Broker发送一个请求,对Producer而言,这里需要支持异步写入和同步写入两种操作

  5. 接收写入请求和反序列就是按照上面的自定义协议获取到消息内容进行验证、写入等后续处理(这里会有很多优化,比如减少内存拷贝、减少内存开销等)

  6. 消息是写磁盘的,所以这里会写pagecache之后刷盘(这部分之后也会单独展开讲)

  7. 消息是需要持久化之后才能响应客户端写入完成的,所以这里需要等待刷盘;等待刷盘和索引队列是可以同时进行的,从Producer的角度来说它并不关心索引是否构建,只需要数据写入存储成功即可

消息的消费流程

Consumer相对Producer来说会多一个协同工作的部分,所以会有一个分区分配的过程(类似Producer的写入消息的分区路由)。另外,Consumer会涉及到消费语义(most-once、least-once、exactly-once),还有消息的获取模式(pull、push、long-polling、pull-push),不过这这部分的消费流程中不会展开讨论这些内容,而是简要的描述流程,现有大致的认识。

(同样这里忽略了一些非核心的流程)

Consumer端流程包括1-5及9、10,6-8位Broker端流程。

具体每一步操作的内容如下:

  1. 元数据的获取和Producer类似

  2. Consumer因为需要和其他相同Group的Consumer协同工作,所以需要知道有多少个同组的Consumer存在

  3. Consumer需要“固定”分区消费,这里有一个分配策略需要实现,即根据存在的Consumer实例和Topic的元数据,计算出每个Consumer需要消费的分区,Consumer和分区的对应关系在正常情况下是不应该发生变动的

  4. Consumer在每次获取消息的时候都需要告知Broker从哪个位点开始获取,所以在初始化时需要获取到读取的位置(之后直接从内存获取每次要读取的位置即可)

  5. 这里也是一个交互协议的部分,可以采用自定义协议,也可以采用json之类的协议(可以和元数据操作之类的保持一致)

  6. Consumer提交的消费进度是IndexQueue的序列号,IndexQueue元素是定长的,所以可以直接计算出读取的偏移量,然后读取IndexQueue的元素

  7. IndexQueue的元素包含了消息存储的信息,通过这些信息可以读取到完整的一条消息(这里会一次读取一批消息给客户端,所以会按照IndexQueue的元素顺序读取StorageQueue的内容,然后返回;为了权衡延迟,在读取不到下一条消息的时候也会返回,这里会有很多策略)

  8. 按照协议将读取的StorageQueue的内容返回给Consumer(这里会涉及到Zero Copy的内容来优化性能,之后再讲)

  9. Consumer需要知道存储协议,然后按照协议解析出消息内容

  10. 消费和获取消息是异步的过程,获取消息的线程在获取消息提交到Consumer的Buffer后就可以开始读取下一批消息,而消费线程异步来从Buffer获取消息进行消费(这里消费后需要提交消费进度到Broker,也可以在获取消息的请求中将消费进度带上去)

结语

以上是消息写入和消息读取的简要流程,在写入流程中会涉及到一些内存池、MMAP的技术,读取时会有ZeroCopy等,这些都会在之后进行分析。

在清楚了流程之后,下一步将进行核心实体的定义,下一篇会写《消息中间件核心实体分析》。

 

往期内容:

 

《什么是分布式消息中间件》

《消息中间件的一些概念》

《业务方对消息中间件的需求》

《消息中间件架构讨论》

《Broker模块划分》

《Client模块划分》

《NameServer模块划分》

 

欢迎关注此公众号,将坚持不懈的写MQ相关的技术文章,希望能和更多的朋友交流。 

 

如果本文对您有帮助,点一下右下角的“推荐”
目录
相关文章
|
3月前
|
SQL 存储 数据库连接
【Azure Stream Analystics】流分析服务执行遇见警告错误消息,导致上游数据堆积,下游无任何输出
【Azure Stream Analystics】流分析服务执行遇见警告错误消息,导致上游数据堆积,下游无任何输出
【Azure Stream Analystics】流分析服务执行遇见警告错误消息,导致上游数据堆积,下游无任何输出
|
5月前
|
消息中间件 Java 测试技术
【消息队列开发】 测试MessageFileManager(对硬盘中的消息操作)类
【消息队列开发】 测试MessageFileManager(对硬盘中的消息操作)类
|
5月前
|
消息中间件 存储
【消息队列开发】 实现消息删除逻辑
【消息队列开发】 实现消息删除逻辑
|
消息中间件 关系型数据库 MySQL
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
|
消息中间件 存储 缓存
RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)2
RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)2
70 0
|
消息中间件
RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)1
RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)1
69 0
|
消息中间件 存储 网络协议
大厂都是如何处理重复消息的?
消息消费失败,很多框架会自动执行重试,而重试就产生了重复消息。 MQTT协议给出三种传递消息时能够提供的
283 0
|
消息中间件 存储 网络协议
多类型业务消息专题-事务消息 | 学习笔记
快速学习多类型业务消息专题-事务消息
141 0
多类型业务消息专题-事务消息 | 学习笔记
|
存储 消息中间件 Linux
多类型业务消息专题-顺序消息 | 学习笔记
快速学习多类型业务消息专题-顺序消息
多类型业务消息专题-顺序消息 | 学习笔记
|
消息中间件 存储 RocketMQ
实时更新消息消费队列与索引文件流程说明|学习笔记
快速学习实时更新消息消费队列与索引文件流程说明
实时更新消息消费队列与索引文件流程说明|学习笔记