RocketMQ 消息存储

简介: 前面我们已经简单地介绍了 RocketMQ 的整体设计思路,本文着重其中消息存储部分的整体实现思路。

引言

前面我们已经简单地介绍了 RocketMQ 的整体设计思路,本文着重其中消息存储部分的整体实现思路,更多关于 RocketMQ 的文章均收录于<RocketMQ系列文章>;

消息存储

通过前面的知识,我们已经知道了topic是如何分配到Broker的,以及消息发送方是如何决定把消息发送给哪个Broker的,接下来我们看一看Broker介绍到消息后,是怎么存储消息的。

RocketMQ主要存储的文件包括CommitLog文件、ConsumeQueue文件、IndexFile文件。RocketMQ将所有主题的消息存储在同一个文件中,确保消息发送时顺序写文件,尽最大的能力确保消息发送的高性能与高吞吐量。但由于消息中间件一般是基于消息主题的订阅机制,这样便给按照消息主题检索消息带来了极大的不便。为了提高消息消费的效率,RocketMQ引入了ConsumeQueue消息队列文件,每个消息主题包含多个消息消费队列,每一个消息队列有一个消息文件。IndexFile索引文件,其主要设计理念就是为了加速消息的检索性能,根据消息的属性快速从CommitLog文件中检索消息。

磁盘有时候会比你想象的快很多,有时候也会比你想象的慢很多,关键在如何使用,使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s,超过了一般网卡的传输速度,这是磁盘比想象的快的地方。但是磁盘随机写的速度只有大概1OOKB/s, 和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。

存储方案

store-architecture
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的,消息真正的物理存储文件是CommitLog, ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

CommitLog以物理文件的方式存放,每台Broker上的CommitLog被本机器所有ConsumeQueue共享。在CommitLog中,一个消息的存储长度是不固定的,RocketMQ采取一些机制,尽量向CommitLog中顺序写,但是随机读。ConsumeQueue的内容也会被写到磁盘里作持久存储,只不过是通过异步刷盘的方式进行。

这样设计的优点:

  1. CommitLog顺序写,可以大大提高写入效率。接收到消息时,只有CommitLog是需要同步刷盘的(根据配置,可能也不需要同步刷盘),其他文件都是异步保存,如果发生了宕机,RocketMQ可以根据CommitLog恢复ConsumeQueue文件和IndexFile。
  2. 虽然CommitLog是随机读,但是利用操作系统的page cache机制,可以批量地从磁盘读取,cache存到内存中之后,加速后续的读取速度。
  3. 为了保证完全的顺序写,需要ConsumeQueue这个中间结构,因为ConsumeQueue里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的ConsumeQueue能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证CommitLog和ConsumeQueue的一致性,CommitLog里存储了Consume Queues、Message keys、Tags等所有信息,即使ConsumeQueue丢失,也可以通过commitLog完全恢复出来。

下图是一个Broker在文件系统中存储的各个文件。我们可以看到CommitLog文件夹、ConsumeQueue文件夹,还有在config文件夹中Topic、Consumer的相关信息。最下面那个文件夹index存的是索引文件,这个文件用来加快消息查询的速度。
store-files

  • commitlog:消息存储目录。
  • config:运行期间一些配置信息,主要包括下列信息。

    • consumerFilter.json:主题消息过滤信息。
    • consumerOffset.json:集群消费模式消息消费进度。
    • delayOffset.json:延时消息队列拉取进度。
    • subscriptionGroup:消息消费组配置信息。
    • topic.json:topic配置属性。
  • consumequeue:消息消费队列存储目录。
  • index:消息索引文件存储目录。
  • abort:如果存在 abort文件说明Broker非正常关闭,该文件默认启动时创建,正常退出之前删除。
  • checkpoint:文件检测点存储commitlog文件最后一次刷盘时间戳、consumequeue最后一次刷盘时间、index索引文件最后一次刷盘时间戳。

存储流程

  1. 如果当前Broker停止工作或Broker为SLAVE角色或当前Broker不支持写入则拒绝消息写入;如果消息主题长度超过256个字符、消息属性长度超过65536个字符将拒绝该消息写入。
  2. 如果消息的延迟级别大于0,将消息的原主题名称与原消息队列ID存入消息属性中,用延迟消息主题SCHEDULE_TOPIC、消息队列ID更新原先消息的主题与队列。
  3. 获取当前可以写入的CommitLog文件
  4. 在写入CommitLog之前,先申请putMessageLock,也就是将消息存储到CommitLog文件中是串行的
  5. 设置消息的存储时间,如果CommitLog文件不存在就需要创建新的文件
  6. 创建全局唯一消息ID
  7. 获取该消息在消息队列的偏移量
  8. 计算消息总长度,并写入CommitLog
  9. 如果计算发现CommitLog无法存储所有内容,则创建新的CommitLog,文件名为即将插入消息的偏移
  10. 将消息内容写入CommitLog文件后根据配置进行同步或者异步刷盘
  11. 更新逻辑偏移量,并释放putMessageLock
  12. 根据CommitLog偏移量,消息存储大小,tag的hash值插入一条Message到ConsumeQueue
  13. 根据Key的hash值,CommitLog偏移量,插入一条数据到IndexFile
  14. ConsumerQueue每隔一段时间自动刷盘、IndexFile在每次创建新indexFile时刷盘之前的索引文件、checkpoint文件在刷盘CommotLog,ConsumeQueue和IndexFile时进行更新

store-architecture2

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1]《RocketMQ技术内幕》
[2]《RocketMQ实战与原理解析》
[3] 老生常谈——利用消息队列处理分布式事务
[4] RocketMQ架构解析
[5] MappedByteBuffer VS FileChannel 孰强孰弱?
[6] 海量数据处理之Bloom Filter详解
[7] rocketmq GitHub Wiki

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 存储 数据库
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
88625 14
|
7月前
|
存储 消息中间件 缓存
RocketMQ 5.0 分级存储背后技术优化包含那几个方面
RocketMQ 5.0 分级存储背后的技术优化是一个综合性的系统工程,需要考虑多个方面,包括存储介质的选择、数据读写策略的设计、数据压缩和解压缩技术的引入、自动的数据分级和迁移机制的实现,以及高可用性和容错性的保证等。这些技术优化的目的是为了实现存储成本和性能的最优平衡,提高系统的可靠性和可用性,满足大规模数据处理的需求。
190 1
|
7月前
|
消息中间件 存储 运维
|
4月前
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
7月前
|
存储 消息中间件 对象存储
谈谈 RocketMQ 5.0 分级存储背后一些有挑战的技术优化
谈谈 RocketMQ 5.0 分级存储背后一些有挑战的技术优化
132796 247
|
7月前
|
消息中间件 存储 数据库
深度剖析 RocketMQ 5.0,流存储:流场景的诉求是什么?
本文将从使用的角度出发,来更详细的展示一下流存储的场景,看看它和业务消息的场景有哪些区别。 RocketMQ 5.0 面向流存储的场景,提供了哪些特性。再结合两个数据集成的案例,来帮助大家了解流存储的用法。
3522 2
|
7月前
|
存储 消息中间件 负载均衡
RocketMQ 5.0 分级存储背后的技术优化与挑战
RocketMQ 5.0 分级存储背后的技术优化与挑战
|
7月前
|
存储 消息中间件 Apache
谈谈 RocketMQ 5.0 分级存储背后一些有挑战的技术优化
谈谈 RocketMQ 5.0 分级存储背后一些有挑战的技术优化
|
消息中间件 存储 NoSQL
SpringBoot项目整合Redis,Rabbitmq发送、消费、存储邮件
SpringBoot项目整合Redis,Rabbitmq发送、消费、存储邮件
671 10
|
7月前
|
消息中间件 存储 Java
RocketMQ-初体验RocketMQ(06)-使用API操作RocketMQ ,理解RocketMQ的存储结构
RocketMQ-初体验RocketMQ(06)-使用API操作RocketMQ ,理解RocketMQ的存储结构
198 0