深入源码聊聊RocketMQ刷盘机制

简介: 大家好,我是Leo。今天聊一下RocketMQ的三种刷盘机制。

本章概括

image.png


同步刷盘


整个同步刷盘策略由 FlushCommitLogServiceGroupCommitService 实现。

FlushCommitLogServiceGroupCommitService 刷盘服务的父类,它是一个抽象类,本身没有实现,只是一个标记类,刷盘策略由子类负责完成。

同步刷盘时,只有消息被真正持久化到磁盘才会响应 ACK,可靠性非常高,但是性能会受到较大影响,适用于金融业务。(可参考图一)

由下列参数决定刷盘策略

FlushDiskType:SYNC_FLUSH(同步刷盘)

image.png

图一同步刷盘流程图

Broker 接收到消息后,会将消息交给 CommitLog 负责存储。CommitLog 先定位到最新的 MappedFile,然后将消息按照固定格式追加到其中。(可参考图二)

image.png

图二CommitLog构造函数加载流程


CommitLog

是一个构造函数

image.png

GroupCommitRequest


代表一个刷盘的请求

image.png

图三GroupCommitRequest函数


putRequest

通过加锁对每一个写请求都加到 requestWrite 集合

image.png

图四putRequest函数

swapRequests

每次提交完刷盘之后,需要对读写请求数据交换。

**刷盘请求为啥还要分读写两个列表呢?**这是用来做读写分离用的,Producer 发送消息的请求量是非常大的,GroupCommitService 的刷盘操作是同步的,刷盘期间仍然会有大量的刷盘请求被提交进来,拆分成两个读写列表,请求提交到写列表,刷盘时处理读列表,刷盘结束交换列表,循环往复,两者可以同时进行。

image.png

图五swapRequests函数

当有同步请求被提交进来,线程就会被唤醒,然后执行doCommit方法,刷盘的核心是调用了 MappedFileQueue 的flush方法。flush 方法需要传flushLeastPages参数,它代表刷盘的最小页数,对于同步刷盘来说,不允许消息丢失,只要写入数据就要刷盘,所以页数为 0。

doCommit

image.png

图六doCommit函数


flush

刷盘函数。flush 方法会根据刷盘的偏移量定位到对应的 MappedFile,然后调用flush开始刷盘,最后更新刷盘偏移量。

image.png

图七flush函数


isAbleToFlush


计算是否需要刷盘。只有新写入的数据超过指定页才刷盘,避免频繁无意义的刷盘。

刷盘最核心的方法自然是 MappedFile 的flush方法了,它会先根据flushLeastPages计算是否需要刷盘。例如最小刷盘页为 4 时,最小刷盘数据就是 16KB,如果写入的数据不足 16KB 就会跳过刷盘

image.png

图八isAbleToFlush函数


run

GroupCommitService子实现类的入口函数

image.png

图九run函数


总结


由CommitLog构造函数结构图可得知

  1. 如果是同步刷盘策略,会初始化 GroupCommitService 子实现类。最终继承的是Runnable接口,会先启动run线程。
  2. run线程主要做的一件事情就是不断监测当前服务是否停止,只要不停止就会一直执行,也就是每10毫秒会执行 doCommit 函数。
  3. doCommit函数主要处理的是,只要读请求不为空就一直处理,首先会判断文件刷新到的offset是否大于等于刷盘offset。并且考虑到一条消息可能存在两个文件中,因此最多可能存在两次刷盘。
  4. 成立之后会调用flush 函数,因为同步刷盘的规则是来一条就刷一条,所以是 flush(0)
  5. 刷新完毕后,唤醒用户线程,(通知其他线程,继续工作了)
  6. 收尾工作(刷新消息物理落盘时间,交互读写请求)
  1. 如果文件刷盘的偏移量 < 请求的下一个偏移量,则说明还没有刷新完,还需要继续刷新


异步刷盘


RocketMQ默认是采用异步刷盘的策略,因为异步刷盘既兼顾了性能,也兼顾了可靠性。

异步刷盘时,消息写入 PageCache 就会响应 ACK,然后由后台线程异步将 PageCache 里的内容持久化到磁盘,降低了读写延迟,提高了性能和吞吐量。服务宕机消息不丢失,机器断电少量消息丢失。

由下列参数决定刷盘策略

FlushDiskType:ASYNC_FLUSH(异步刷盘)

整个异步刷盘策略由 FlushCommitLogServiceFlushRealTimeService 实现。

FlushCommitLogService 是CommitLog 刷盘服务的父类,它是一个抽象类,本身没有实现,只是一个标记类,刷盘策略由子类负责完成。

对于异步刷盘,没有提交刷盘请求一说。它不像同步刷盘,只要有消息写入 CommitLog 就要执行刷盘操作,因为异步刷盘是定时执行的。

异步刷盘时,仅仅需要调用 wakeup 方法唤醒线程即可。所以,我们重点看它的run方法。

run

image.png

flush

刷盘函数。flush 方法会根据刷盘的偏移量定位到对应的 MappedFile,然后调用flush开始刷盘,最后更新刷盘偏移量。

image.png

图十一flush函数

isAbleToFlush

计算是否需要刷盘。只有新写入的数据超过指定页才刷盘,避免频繁无意义的刷盘。

刷盘最核心的方法自然是 MappedFile 的flush方法了,它会先根据flushLeastPages计算是否需要刷盘。例如最小刷盘页为 4 时,最小刷盘数据就是 16KB,如果写入的数据不足 16KB 就会跳过刷盘

image.png

图十二isAbleToFlush函数

waitForRunning

image.png

总结


  1. 如果是异步刷盘策略,会初始化 FlushRealTimeService 子实现类。最终继承的是Runnable接口,会先启动run线程。
  2. run线程会一直执行,直到服务被迫停止,或者人为干预停止
  3. 首先获取RocketMQ当前配置是否定时刷新日志,如果是,sleep睡眠自定义的时间。如果不是,执行 waitForRunning 函数等待自定义的时间。
  4. 根据自定义的刷盘页数进行 flush 刷盘。更新后修改消息落盘时间。
  5. 第五点主要关键,为了保证所有的数据一致性,如果服务停止,那么把剩余的没有刷新到磁盘的消息刷盘,重复次数为10次
  1. 获取是否定时刷新日志的设定
  2. 获取刷新到磁盘的时间间隔
  3. 获取一次刷新到磁盘的最少页数
  4. 获取刷新CommitLog的频率


异步刷盘+缓冲区


异步刷盘+缓冲区,消息先写入直接内存缓冲区,然后由后台线程异步将缓冲区里的内容持久化到磁盘,性能最好。但是最不可靠,服务宕机和机器断电都会丢失消息。

整个异步刷盘策略由 FlushCommitLogServiceCommitRealTimeService 实现。

FlushCommitLogService 是CommitLog 刷盘服务的父类,它是一个抽象类,本身没有实现,只是一个标记类,刷盘策略由子类负责完成。

这是最激进的一种刷盘策略,性能最好,但是也最不可靠。同样没有刷盘请求一说,只需要唤醒线程即可开始工作。

和 FlushRealTimeService 流程差不多,区别仅仅是将flush换成commit了,先将直接内存缓冲区的数据写入 FileChannel,然后唤醒 FlushRealTimeService 对 FIleChannel 做持久化。

run

image.png

commit

image.png

关键点在 MappedFileQueue 的commit方法,它会根据 Commit 偏移量定位到 MappedFile,然后调用它的commit0方法。它仅仅是将直接内存缓冲区的数据写入 FileChannel,此时数据并没有真正持久化到磁盘。

commit0

image.png

总结


  1. 从Broker发送给CommitLog存储时,就已经创建了 CommitRealTimeService 实现类,只是用不用的问题,如果用到了就是执行开始处理run函数里的逻辑
  2. 它跟异步刷盘的不同点是,异步刷盘是定时 flush 。 这里没有进行flush,而且通过先按照文件提交的offset查找数据。然后提交
  3. 这里的提交分两类,一类是没有使用临时存储池。使用的是mappedByteBuffer也就是内存映射的方式。直接写到映射区域中的,那么这个时候就不需要写入的fileChannel了。直接返回写入的位置作为已经提交的位置。
  4. 另一类是临时存储池,使用的堆外内存,那么这个时候需要先把信息提交到fileChannel中


2022年文章目录整理

RocketMQ性能提升


结尾


RocketMQ 针对 CommitLog 文件支持三种持久化策略。

  • 同步刷盘时,每次消息写入都会提交刷盘请求给 GroupCommitService,调用 MappedByteBuffer 的force方法将内核缓冲区的数据强制刷新到磁盘,成功才响应 ACK。
  • 异步刷盘时,消息写入 PageCache 立即响应 ACK,由 FlushRealTimeService 线程每隔 500ms 对 CommitLog 文件进行一次刷盘操作,流程和上述一样。
  • 异步刷盘且开启缓冲区时,RocketMQ 申请一块直接内存用作数据缓冲区,消息先写入缓冲区,然后由 CommitRealTimeService 线程定时将缓冲区数据写入 FileChannel,再唤醒 FlushRealTimeService 将 FileChannel 缓冲区数据强制刷新到磁盘。

开启缓冲区有什么用?类似在内存层面做了读写分离,写数据走直接内存,读数据走 PageCache,最大程度的消除了 PageCache 锁竞争,避免 PageCache 被交换到 Swap 分区,导致服务响应耗时出现毛刺。

有些不懂的地方或者不对的地方,麻烦各位指出,一定修改优化!


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
打赏
0
0
0
0
10
分享
相关文章
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
118 0
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
89 12
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
59 3
深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
49 2
Rabbitmq 搭建使用案例 [附源码]
Rabbitmq 搭建使用案例 [附源码]
58 0
DIY可视化整合MQTT生成UniApp源码
DIY可视化整合MQTT生成UniApp源码
63 0
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
119 0
消息队列 MQ使用问题之过期删除机制的触发条件是什么
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之过期删除机制的触发条件是什么
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
91 0
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
85 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等