同步刷盘分析|学习笔记

简介: 快速学习同步刷盘分析

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)同步刷盘分析】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12488


同步刷盘分析

 

同步刷盘

RocketMQ 的存储是基于 JDK NIO 的内存映射机制(MappedByteBuffer)的,消息存储首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘。再根据刷盘的策略来决定到底是同步刷盘还是异步刷盘。

消息追加的入口在 CommitLog 中有个 putMessages 里。前面做的都是将数据追加到内存里去,再根据不同的刷盘策略,再去将数据刷到磁盘当中。

image.png

handleDiskFlush(result,putMessageResult,messageExtBatch);//刷盘入口

进入到putMessageResult。先判断 MessageStoreConfig 当中刷盘的策略。SYNC_FLUSH 是同步刷盘。

同步刷盘的意思是消息追加到内存后,立即将数据刷写到磁盘文件。同步刷盘的逻辑是这样的。

finalGroupCommitService service = (GroupCommitService) this.flushCommitLogService;

//拿到同步刷盘的服务 GroupCommitService ,

GroupCommitRequest request = new

GroupcommitRequest(nextOffset:result.getWroteOffset() +

result.getWroteBytes());

//创建刷盘的请求对象 GroupCommitRequest

service.putRequest(request); //将请求对象提交到 service ,在 service 当中进行了处理

boolean flushOK =

request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());

// waitForFlush 这个请求等待刷盘的结果。

private int syncFlushTimeout =1000 * 5;

// 等待超时时间 getSyncFlushTimeout 为5秒,就是会同步阻塞5秒。

把 request 提交到 service.putRequest 当中, service 用了等待唤醒机制去进行一个处理。

public synchronized void putRequest(final GroupCommitRequest request){

synchronized (this.requestsWrite) {

this.requestsWrite.add(request);

}

if (hasNotified.compareAndSet(expect:false,update:true)){

waitPoint.countDown(); //唤醒线程notify

}

这里首先是把这个请求去附着到一个集合当中,

private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<~>(); //写集合

private volatile List<GroupCommitRequest> requestsRead = new ArrayList<~>(); //读集合

首先将 request 对象放到 requestsWrite 当中去,然后在处理的时候,为了提高读写的效率,将数据会复制到 requestsRead 中去,然后前后分开去处理。我们将请求对象提交到 requestWrite 这个集合当中,这里面唤醒了一下这个线程 notify 。

线程唤醒之后看run 方法,run 方法最核心地方在 doCommit ,处理请求的时候,会间隔10毫秒去处理一次,有请求就会去处理。

while  (!this.isStopped()) {

try {

this.waitForRunning( interval: 10);//处理请求间隔10毫秒

this.doCommit(); //run 方法核心

} catch (Exception e) {

CommitLog.Log.warn(this.getserviceName() + " service has exception. ", e);

}

}

在处理的时候,把 While循环开启,然后在下边去做一个 swapRequests 方式

private void swapRequests() {

List<GroupCommitRequest> tmp = this.requestsWrite;

this.requestsWrite = this.requestsRead;

this.requestsRead = tmp;

}

swapRequests 这个方式就是在交换读集合和写集合。

最终再去处理的时候是从 requestsRead 集合当中去取。

image.png

所以从 run 方法中可以看到,doCommit 被触发了之后,就会取这个数据,如果 requestsRead 这里没有数据,就会去阻塞。

一旦有请求到来之后,它先将两个集合的数据进行一个交换,然后再让 doCommit 去处理。

synchronized (this) {

this.swapRequests();

}

image.png

如上图,首先去遍历当前集合 req 当中所有的请求对象

然后还有一个遍历 flushOK 会遍历两次,

For (int i = 0;i < 2 && !flushOK; i++){

flushOK = CommitLog.this.MappedFileQueue.getFlushedWhere() >= rep.getNextOffset();

If (!flushOK){

CommitLog.this.mappedFileQueue.flush(flushLeastPages:0);

如果文件数据存在跨文件存储,那么它要把前一个文件给它去拿到,再把后一个文件也拿到,然后这两个文件拼接起来之后才是一个完整的消息数据。这里主要考虑到消息的跨文件的问题。

然后就进行刷盘的处理

flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); //进行判断

刷完盘之后会进行一个判断,判断刷盘的位置 getFlushedWhere 如果大于当前写的位置 req.getNextOffset ,那就代表当前刷盘成功,之后正常返回。

CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);

//更新刷盘存储点

最终更新到磁盘中如下图所示位置。

image.png

相关文章
|
6月前
|
存储 关系型数据库 MySQL
数据同步大事务同步延迟
数据同步大事务同步延迟
81 6
|
4月前
|
消息中间件 Kafka 微服务
微服务数据问题之MetaQ设置同步异步刷盘如何解决
微服务数据问题之MetaQ设置同步异步刷盘如何解决
|
6月前
|
Java 数据处理 调度
异步、半同步、同步
异步、半同步、同步
138 0
|
11月前
|
NoSQL Cloud Native Redis
【性能优化下】组织结构同步优化二,全量同步/增量同步,断点续传实现方式
【性能优化下】组织结构同步优化二,全量同步/增量同步,断点续传实现方式
|
算法 小程序 调度
同步
同步
72 0
|
监控 关系型数据库 MySQL
如何避免主从不同步
如何避免主从不同步
113 0
|
存储 数据库
数据复制系统设计(2)-同步复制与异步复制
复制的重要可选项: 同步复制,synchronously 异步复制,asynchronously
188 0
|
算法 编译器 调度
程序并发操作中,解决数据同步的四种方法
程序并发操作中,解决数据同步的四种方法
172 0
程序并发操作中,解决数据同步的四种方法
|
消息中间件 存储 RocketMQ
异步刷盘说明|学习笔记
快速学习异步刷盘说明
异步刷盘说明|学习笔记
|
开发框架 Java 程序员
并发与并行 同步或异步
我们都知道,程序猿是一种逻辑性极强的生物,他们不擅言辞,不擅表达,但是他们能够用一种神秘的语言与机器进行沟通,知道怎么让机器听他们的。
109 0