3分钟白话RocketMQ系列—— 如何存储消息

简介: 3分钟白话RocketMQ系列—— 如何存储消息

白话3分钟,快速了解RocketMQ如何存储消息。

看完如果不了解,欢迎来打我。

我们知道RocketMQ主要分为消息 生产、存储(消息堆积)、消费 三大块领域。

那接下来,我们白话一下,RocketMQ是如何存储消息的,揭秘消息存储全过程。

注意,如果白话中不小心提到相关代码配置与类名,请参考RocketMQ 4.9.4版本


关键字摘要


  • 存储模型与存储类型
  • 如何保证存储消息不丢失
  • 如何提高写入性能
  • 如何清理过期消息


存储模型是什么?有哪些存储类型?


RocketMQ使用了一种基于日志的存储方式,将消息以顺序写入的方式追加到文件中,从而实现高性能的消息存储和读取。

RocketMQ的消息存储方式可以分为两个类型:CommitLogConsumeQueue

640.png

还有一个文件类型是indexfile,主要用于控制台消息检索,不影响消息的写入与消费,我们就不展开了。


CommitLog


CommitLog文件存储了Producer端写入的消息主体内容,它以追加写入的方式将消息存储到磁盘上的文件中。

单个文件大小默认1G ,文件名长度为20位(左边补零,剩余为起始偏移量),当文件满了,写入下一个文件。

比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。

它的主要特点是:顺序写,但是随机读(被ConsumeQueue读取)。

虽然是随机读,但是利用package机制,可以批量地从磁盘读取,作为cache存到内存中,加速后续的读取速度。

Broker单个实例下所有的队列共用一个日志数据文件CommitLog来存储。而Kafka采用的是独立型的存储结构,每个队列一个文件。


ConsumeQueue


ConsumeQueue文件是用于支持消息消费的存储结构。保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。

消费者 通过 顺序读取ConsumeQueue文件,可以快速定位到消息在CommitLog中的物理存储位置,从而实现快速消息的拉取和消费。

从实际物理存储的角度来看,每个主题Topic下的每个队列Queue对应一个ConsumeQueue文件。

生产者端的消息是顺序写入CommitLog,消费者端是顺序读取ConsumeQueue

但是根据ConsumeQueue的起始物理位置偏移量offset读取消息真实内容,实际是随机读取CommitLog

实现了 消息生产与消息消费数据存储和数据索引 相互分离。


怎么保证存储消息不丢失?


刷盘机制


Broker在把消息写入日志文件的过程中,如果在刚收到消息时,Broker异常宕机了,那么内存中尚未写入磁盘的消息就会丢失了。

因此,RocketMQ持久化消息分为两种:同步刷盘和异步刷盘(默认配置)。

异步刷盘是指Broker收到消息后先存储到PageCache,然后立即通知Producer消息已存储成功,可以继续处理业务逻辑。

此后,Broker会启动一个异步线程将消息持久化到磁盘。然而,如果Broker在持久化到磁盘之前发生故障,消息将会丢失。

## 刷盘策略配置
flushDiskType = ASYNC_FLUSH

注意,写入PageCache后,应用服务宕机消息不丢失,只有机器断电或宕机会有少量消息丢失。

相比之下,同步刷盘的方式是在消息存储到缓存后不立即通知Producer,而是等待消息被持久化到磁盘后再通知Producer。

这种方式确保了消息不会丢失,但性能不如异步刷盘高。一般用于金融业务。

## 刷盘策略配置
flushDiskType = SYNC_FLUSH

在选择刷盘方式时,需要根据业务场景进行权衡。


主从同步机制


即使Broker采用同步刷盘策略,但如果刷盘完成后磁盘损坏,会导致所有存储在磁盘上的消息丢失。

即使采用了主从复制,如果主节点在刷盘完成后还没有来得及将数据同步给从节点就发生了磁盘故障,同样会导致数据丢失。

所以我们可以配置同步机制,等待从节点复制完成主节点的消息后,才去通知Producer完成了消息存储。

## 主从同步策略配置
brokerRole=SYNC_MASTER



怎么提高存储写入性能?


零拷贝技术


RocketMQ通过使用内存映射文件(包括CommitLog、 ConsumeQueue等文件)来提高IO访问性能,也就是我们常说的零拷贝技术。

Java在NIO包里,引入了sendFile(FileChannel类)和MMAP(MappedByteBuffer类)两种实现方式的零拷贝技术。

主流的MQ都会使用零拷贝技术,来提升IO:

  • Kafka:record 的读和写都是基于 FileChannel。index 的读写则基于 MMAP。
  • RocketMQ:读取数据基于 MMAP,写入数据默认使用 MMAP。但可以通过修改配置transientStorePoolEnable参数将其配置为使用 FileChannel。作者之所以这样设计,是为了避免 PageCache 的锁竞争,并通过两层架构实现读写分离。


缓冲池写入增强


在不开启RocketMQ的内存映射增强方案时,RocketMQ的读和写都只会简单直接使用MMAP。

但是,MappedByteBuffer也存在一些缺陷:

  • 使用虚拟内存,超过物理内存会导致内存交换,引起磁盘IO(可能非顺序IO)速度较慢。
  • 虚拟内存交换是受操作系统控制的,所以其他进程活动也会触发RocketMQ内存映射的交换。
  • 文件内存映射写入PageCache时存在锁竞争,直接写入内存可避免竞争,在异步刷盘场景下速度更快。

为此,RocketMQ通过transientStorePoolEnable参数控制,对写入进行了优化。

如果开启了这个参数,会将写入拆分为两步, 写入缓冲区 + 异步刷盘 的增强策略。

## 刷盘策略配置
flushDiskType = ASYNC_FLUSH 
transientStorePoolEnable = true

MappedFile会提前申请一块直接内存用作缓冲区,放弃使用mmap直接写文件。

数据先写入缓冲区,然后异步线程每200ms(且脏数据达到16K,commitCommitLogLeastPages = 4)将缓冲区的数据commit写入FileChannel

再唤醒定时服务(FlushRealTimeService类)将FileChannel里的数据持久化到磁盘。flush函数和commit一样也可以传入一个刷盘页数,当脏页数量达到16K时(flushLeastPages = 4),会进行刷盘操作,调用FileChannelforce将内存中的数据持久化到磁盘。

开启transientStorePoolEnable参数后,性能最好,但是相对来说持久化最不可靠


如何处理消息的过期和删除?


RocketMQ 使用存储时长作为消息存储的依据,即每个节点对外承诺消息的存储时长。在存储时长范围内的消息都会被保留,无论消息是否被消费;超过时长限制的消息则会被清理掉。

需要注意的是,在RocketMQ中,消息存储时长并不能完整控制消息的实际保存时间。

因为消息存储仍然使用本地磁盘,本地磁盘空间不足时,为保证服务稳定性消息仍然会被强制清理,导致消息的实际保存时长小于设置的保存时长。

建议在存储成本可控的前提下,尽可能延长消息存储时长。延长消息存储时长,可以为紧急故障恢复、应急问题排查和消息回溯带来更多的可操作空间。


总结


  • 存储模型与存储类型:commitLog文件存储消息物理文件,consumeQueue文件夹存储逻辑队列索引
  • 如何保证存储消息不丢失:同步&异步刷盘、主从消息同步
  • 如何提高写入性能:零拷贝技术MMAP和FileChannel、缓冲区增强 + 异步刷盘 策略
  • 如何清理过期消息:按存储时长清理消息

3分钟到了吗?应该对RocketMQ如何存储消息有全面了解了吧。

如果还想了解更多,欢迎关注下一期内容。


往期热门笔记合集推荐:

  • HBase原理与实战笔记合集
  • MySQL实战笔记合集
  • Canal/Otter源码与实战笔记合集
  • Java实战技巧笔记合集
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
消息中间件 存储 数据库
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
88634 18
|
7月前
|
存储 消息中间件 缓存
RocketMQ 5.0 分级存储背后技术优化包含那几个方面
RocketMQ 5.0 分级存储背后的技术优化是一个综合性的系统工程,需要考虑多个方面,包括存储介质的选择、数据读写策略的设计、数据压缩和解压缩技术的引入、自动的数据分级和迁移机制的实现,以及高可用性和容错性的保证等。这些技术优化的目的是为了实现存储成本和性能的最优平衡,提高系统的可靠性和可用性,满足大规模数据处理的需求。
194 1
|
消息中间件 存储 canal
3分钟白话RocketMQ系列—— 如何保证消息不丢失
3分钟白话RocketMQ系列—— 如何保证消息不丢失
3870 1
|
消息中间件 存储 缓存
3分钟白话RocketMQ系列—— 如何消费消息
3分钟白话RocketMQ系列—— 如何消费消息
785 0
|
7月前
|
消息中间件 存储 运维
|
4月前
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
7月前
|
存储 消息中间件 对象存储
谈谈 RocketMQ 5.0 分级存储背后一些有挑战的技术优化
谈谈 RocketMQ 5.0 分级存储背后一些有挑战的技术优化
132825 249
|
消息中间件 存储 canal
3分钟白话RocketMQ系列—— 如何发送消息
3分钟白话RocketMQ系列—— 如何发送消息
305 0
|
7月前
|
消息中间件 存储 数据库
深度剖析 RocketMQ 5.0,流存储:流场景的诉求是什么?
本文将从使用的角度出发,来更详细的展示一下流存储的场景,看看它和业务消息的场景有哪些区别。 RocketMQ 5.0 面向流存储的场景,提供了哪些特性。再结合两个数据集成的案例,来帮助大家了解流存储的用法。
3531 2
|
7月前
|
存储 消息中间件 负载均衡
RocketMQ 5.0 分级存储背后的技术优化与挑战
RocketMQ 5.0 分级存储背后的技术优化与挑战