本章概括
同步刷盘
整个同步刷盘策略由 FlushCommitLogService
与 GroupCommitService
实现。
FlushCommitLogService
是 GroupCommitService
刷盘服务的父类,它是一个抽象类,本身没有实现,只是一个标记类,刷盘策略由子类负责完成。
同步刷盘时,只有消息被真正持久化到磁盘才会响应 ACK,可靠性非常高,但是性能会受到较大影响,适用于金融业务。(可参考图一)
由下列参数决定刷盘策略
FlushDiskType:SYNC_FLUSH(同步刷盘)
图一同步刷盘流程图
Broker 接收到消息后,会将消息交给 CommitLog 负责存储。CommitLog 先定位到最新的 MappedFile,然后将消息按照固定格式追加到其中。(可参考图二)
图二CommitLog构造函数加载流程
CommitLog
是一个构造函数
GroupCommitRequest
代表一个刷盘的请求
图三GroupCommitRequest函数
putRequest
通过加锁对每一个写请求都加到 requestWrite
集合
图四putRequest函数
swapRequests
每次提交完刷盘之后,需要对读写请求数据交换。
**刷盘请求为啥还要分读写两个列表呢?**这是用来做读写分离用的,Producer 发送消息的请求量是非常大的,GroupCommitService 的刷盘操作是同步的,刷盘期间仍然会有大量的刷盘请求被提交进来,拆分成两个读写列表,请求提交到写列表,刷盘时处理读列表,刷盘结束交换列表,循环往复,两者可以同时进行。
图五swapRequests函数
当有同步请求被提交进来,线程就会被唤醒,然后执行doCommit
方法,刷盘的核心是调用了 MappedFileQueue 的flush
方法。flush 方法需要传flushLeastPages
参数,它代表刷盘的最小页数,对于同步刷盘来说,不允许消息丢失,只要写入数据就要刷盘,所以页数为 0。
doCommit
图六doCommit函数
flush
刷盘函数。flush 方法会根据刷盘的偏移量定位到对应的 MappedFile,然后调用flush
开始刷盘,最后更新刷盘偏移量。
图七flush函数
isAbleToFlush
计算是否需要刷盘。只有新写入的数据超过指定页才刷盘,避免频繁无意义的刷盘。
刷盘最核心的方法自然是 MappedFile 的flush
方法了,它会先根据flushLeastPages
计算是否需要刷盘。例如最小刷盘页为 4 时,最小刷盘数据就是 16KB,如果写入的数据不足 16KB 就会跳过刷盘
图八isAbleToFlush函数
run
GroupCommitService子实现类的入口函数
图九run函数
总结
由CommitLog构造函数结构图可得知
- 如果是同步刷盘策略,会初始化
GroupCommitService
子实现类。最终继承的是Runnable接口,会先启动run线程。 - run线程主要做的一件事情就是不断监测当前服务是否停止,只要不停止就会一直执行,也就是每10毫秒会执行
doCommit
函数。 - doCommit函数主要处理的是,只要读请求不为空就一直处理,首先会判断文件刷新到的offset是否大于等于刷盘offset。并且考虑到一条消息可能存在两个文件中,因此最多可能存在两次刷盘。
- 成立之后会调用
flush
函数,因为同步刷盘的规则是来一条就刷一条,所以是flush(0)
- 刷新完毕后,唤醒用户线程,(通知其他线程,继续工作了)
- 收尾工作(刷新消息物理落盘时间,交互读写请求)
- 如果文件刷盘的偏移量 < 请求的下一个偏移量,则说明还没有刷新完,还需要继续刷新
异步刷盘
RocketMQ默认是采用异步刷盘的策略,因为异步刷盘既兼顾了性能,也兼顾了可靠性。
异步刷盘时,消息写入 PageCache 就会响应 ACK,然后由后台线程异步将 PageCache 里的内容持久化到磁盘,降低了读写延迟,提高了性能和吞吐量。服务宕机消息不丢失,机器断电少量消息丢失。
由下列参数决定刷盘策略
FlushDiskType:ASYNC_FLUSH(异步刷盘)
整个异步刷盘策略由 FlushCommitLogService
与 FlushRealTimeService
实现。
FlushCommitLogService
是CommitLog 刷盘服务的父类,它是一个抽象类,本身没有实现,只是一个标记类,刷盘策略由子类负责完成。
对于异步刷盘,没有提交刷盘请求一说。它不像同步刷盘,只要有消息写入 CommitLog 就要执行刷盘操作,因为异步刷盘是定时执行的。
异步刷盘时,仅仅需要调用 wakeup
方法唤醒线程即可。所以,我们重点看它的run
方法。
run
flush
刷盘函数。flush 方法会根据刷盘的偏移量定位到对应的 MappedFile,然后调用flush
开始刷盘,最后更新刷盘偏移量。
图十一flush函数
isAbleToFlush
计算是否需要刷盘。只有新写入的数据超过指定页才刷盘,避免频繁无意义的刷盘。
刷盘最核心的方法自然是 MappedFile 的flush
方法了,它会先根据flushLeastPages
计算是否需要刷盘。例如最小刷盘页为 4 时,最小刷盘数据就是 16KB,如果写入的数据不足 16KB 就会跳过刷盘
图十二isAbleToFlush函数
waitForRunning
总结
- 如果是异步刷盘策略,会初始化
FlushRealTimeService
子实现类。最终继承的是Runnable接口,会先启动run线程。 - run线程会一直执行,直到服务被迫停止,或者人为干预停止
- 首先获取RocketMQ当前配置是否定时刷新日志,如果是,sleep睡眠自定义的时间。如果不是,执行
waitForRunning
函数等待自定义的时间。 - 根据自定义的刷盘页数进行
flush
刷盘。更新后修改消息落盘时间。 - 第五点主要关键,为了保证所有的数据一致性,如果服务停止,那么把剩余的没有刷新到磁盘的消息刷盘,重复次数为10次
- 获取是否定时刷新日志的设定
- 获取刷新到磁盘的时间间隔
- 获取一次刷新到磁盘的最少页数
- 获取刷新CommitLog的频率
异步刷盘+缓冲区
异步刷盘+缓冲区,消息先写入直接内存缓冲区,然后由后台线程异步将缓冲区里的内容持久化到磁盘,性能最好。但是最不可靠,服务宕机和机器断电都会丢失消息。
整个异步刷盘策略由 FlushCommitLogService
与 CommitRealTimeService
实现。
FlushCommitLogService
是CommitLog 刷盘服务的父类,它是一个抽象类,本身没有实现,只是一个标记类,刷盘策略由子类负责完成。
这是最激进的一种刷盘策略,性能最好,但是也最不可靠。同样没有刷盘请求一说,只需要唤醒线程即可开始工作。
和 FlushRealTimeService 流程差不多,区别仅仅是将flush
换成commit
了,先将直接内存缓冲区的数据写入 FileChannel,然后唤醒 FlushRealTimeService 对 FIleChannel 做持久化。
run
commit
关键点在 MappedFileQueue 的commit
方法,它会根据 Commit 偏移量定位到 MappedFile,然后调用它的commit0
方法。它仅仅是将直接内存缓冲区的数据写入 FileChannel,此时数据并没有真正持久化到磁盘。
commit0
总结
- 从Broker发送给CommitLog存储时,就已经创建了
CommitRealTimeService
实现类,只是用不用的问题,如果用到了就是执行开始处理run函数里的逻辑 - 它跟异步刷盘的不同点是,异步刷盘是定时
flush
。 这里没有进行flush,而且通过先按照文件提交的offset查找数据。然后提交 - 这里的提交分两类,一类是没有使用临时存储池。使用的是mappedByteBuffer也就是内存映射的方式。直接写到映射区域中的,那么这个时候就不需要写入的fileChannel了。直接返回写入的位置作为已经提交的位置。
- 另一类是临时存储池,使用的堆外内存,那么这个时候需要先把信息提交到fileChannel中
结尾
RocketMQ 针对 CommitLog 文件支持三种持久化策略。
- 同步刷盘时,每次消息写入都会提交刷盘请求给 GroupCommitService,调用 MappedByteBuffer 的
force
方法将内核缓冲区的数据强制刷新到磁盘,成功才响应 ACK。 - 异步刷盘时,消息写入 PageCache 立即响应 ACK,由 FlushRealTimeService 线程每隔 500ms 对 CommitLog 文件进行一次刷盘操作,流程和上述一样。
- 异步刷盘且开启缓冲区时,RocketMQ 申请一块直接内存用作数据缓冲区,消息先写入缓冲区,然后由 CommitRealTimeService 线程定时将缓冲区数据写入 FileChannel,再唤醒 FlushRealTimeService 将 FileChannel 缓冲区数据强制刷新到磁盘。
开启缓冲区有什么用?类似在内存层面做了读写分离,写数据走直接内存,读数据走 PageCache,最大程度的消除了 PageCache 锁竞争,避免 PageCache 被交换到 Swap 分区,导致服务响应耗时出现毛刺。
有些不懂的地方或者不对的地方,麻烦各位指出,一定修改优化!