异步刷盘说明|学习笔记

简介: 快速学习异步刷盘说明

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)异步刷盘说明】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12489


异步刷盘说明


异步刷盘

异步刷盘指的是在消息追加到内存后,立即返回给消息发送端。如果开启  transientStorePoolEnable ,  RocketMQ 会单独申请一个与目标物理文件(commitLog)同样大小的对外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到物理文件的内存映射中,然后刷写到磁盘。如果末开启 transientStorePoolEnable , 消息直接追加到物理文件直接映射文件中,然后刷写到磁盘中。

针对刚才的流程,如下图进行说明。 ByteBuffer(Direct) 就是堆外内存的空间, MappedByteBuffer 是物流文件所对应的映射文件。

image.png执行步骤,第一步是将消息直接追加到 ByteBuffer (堆外内存)。第二步 CommitRealTimeService 线程每隔 200ms 将 ByteBuffer 新追加内容提交到 MappedByteBuffer 中。

第三步 MappedByteBuffer 在内存中追加提交的内容, wrotePosition 指针向后移动,说明追加成功。

第四步 commit 操作成功返回,将 committedPosition 位置恢复。

第五步 FlushRealTimeService 线程默认每 500ms 将 MappedByteBuffer 中新追加的内存刷写到磁盘。

通过流程分析,这里面有两个服务类非常重要,第一个是 CommitRealTimeService ,第二个是 FlushRealTimeService ,一个是用来提交堆外的数据到内存映射文件当中去,另外一个是负责将内存映射文件刷到磁盘中去。

找到刷盘方法

else 这里执行的是 if 刷盘。 if 刷盘首先在这里做了一个判断,如果 isTransientStorePoolEnable 它没有打开就直接刷,如果打开就先进行 commitLogService.wakeup 提交这个操作。

else{if(!This.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()){

flushCommitLogService.wakeup();

} else {

commitLogService.wakeup();

}

}

进入下图所示位置,

image.png在这里会每隔 200ms 去进行一次数据的提交工作

private int commitIntervalCommitLog = 200;

数据的提交工作,首先先去获得了数据一次提交的至少页数,数据大于页数之后才去真正进行提交。

下面还设置了一个两次真实提交的最大时间间隔,默认200ms,意思是如果上次提交的时间超过最大的时间,下一次提交就会直接去提交不会去等待。

//间隔时间,默认200ms
int interval =
CommitLog. this. defaultMessagestore. getMessageStoreconfig(). getCommitInterva1commitLog();

//一次提交的至少页数
int commitDataLeastPages =
CommitLog. this. defaultMessagestore. getMessagestoreConfig() . getcommi tCommi tLogLeastPages();
//两次真实提交的最大间隔,默认200ms
int commitDataThoroughInterval =
CommitLog. this. defau1tMessagestore. detMessagestoreconfig(). getcommi tCommitLogThoroughInterval();

//上次提交间隔超过 commitDataThoroughInterval ,则忽略提交 commitDataThoroughInterval 参数,直接提交long begin = system. currentTimeMillis();
if (begin >= (this. 1astcommitTimestamp + commi tDataThoroughInterva1)) {
this. astcommitTimestamp = begin;
commitDataLeastPages = 0;
}

mappedFileQueue 就是正在进行提交工作,就是把它提交到物理文件所对应的内存映射文件当中去。提交完之后,进行了 flushCommitLogService 操作。

//执行提交操作 ,将待提交数据提交到物理文件的内存映射区
boolean result = CommitLog. this .mappedFileQueue. commit (commi tDataLeastPages);
Long end = system currentTimeMillis());
if (!resu1t) {
this. lastcommitTimestamp = end; //result = false means some data commi tted.
//now wake up flush thread.

//唤醒刷盘线程

flushcommitLogService . wakeup());

}
if (end- begin > 500) {
1og. info("Commit data to fi1e costs {} ms", end - begin);
}
this . waitForRunni ng(interva1);

目的是唤醒刷盘的线程。

flushCommitLogService.wakeup();

让刷盘的线程负责进行刷盘的处理,FlushRealTimeService 就是负责刷盘。

image.png如何进行刷盘,刷盘的方式和同步刷盘的基本流程是差不多的,只不过一个需要阻塞,一个不需要阻塞。线程的时间间隔,一次刷写任务至少包含的页数,设置了两次真实刷写任务的最大时间间隔,同样是判断,如果上次刷的时间太过长,那么下一次就不会等待这么长时间,直接刷就可以。

//表示 wait 方法等待,默认 false
boolean flushcommitLogTimed =
CommitLog. this. defaultMessagestore. getMessagestoreconfig().isFlushcommi tLogTimed();
//线程执行时间间隔
int interval =
CommitLog. this. defaultMessagestore. getMessagestoreconfi g(). getFlushInterva commitLog();
//一次刷写任务至少包含页数
int flushPhysicQueueLeastPages =
CommitLog. this. defaultMessagestore . getMessagestoreconfig(). getFlushCommitLogLeastPages ();
//两次真实刷写任务最大间隔
int flushPhysicQueueThoroughInterval =
CommitLog. this . defaultMessagestore . getMessagestoreConfig(). getFlushCommi tLogThoroughInterval();
...

//距离上次提交间隔超过 flushPhysicQueueThoroughInterval ,则本次刷盘任务将忽略 flushPhysicQueueLeastPages ,直接提交
1ong currentTimeMi11is = system. currentTimeMillis();
if (currentTimeMillis >= (this.1astF1ushTimestamp + flushPhysicQueueThoroughInterva1)) {
this. lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
pri ntFlushProgress = (printTimes++ % 10) == 0;

}

...

//执行一次刷盘前,先等待指定时间间隔
if (flushCommitLogTimed) {
Thread. sleep(interval);
} else {
this. waitForRunni ng(interva1);
}

...
long begin = system. currentTimeMillis();
//刷写磁盘
Commitlog. this . mappedFilequeue. flush(flushphysicQueueLeastpages);
long storeTimestamp = Commi tLog. this. mappedFilequeue . getStoreTimestamp();
if (storeTimestamp > 0) {
//更新存储监测点文件的时间戳
Commi tlog. this . defaultMessagestore. getstorecheckpoint (). setPhysicMsgTimestamp(storeTimestamp);

然后执行一次刷盘之前,一般先让线程睡眠一会,就开始进行刷盘。CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);

将内存映射文件当中的数据给它刷写到物理文件当中去。刷完之后又更新了一下文件的刷盘点。把刷盘的指针去更新,更新到 config 中。

image.png

异步刷盘中有两个服务类非常重要,一个是 CommitRealTimeService 负责数据提交,第二个是 FlushRealTimeService 负责刷盘操作的。其中第一个线程每隔200ms执行一次,第二个线程每隔500ms 执行一次。

相关文章
|
前端开发 JavaScript UED
|
前端开发
异步转同步的几种方法
在循环等待中,我们可以使用一个变量来指示异步操作是否已完成。然后,我们可以在循环中检查该变量,如果它指示异步操作已完成,则退出循环。
552 0
|
6月前
|
负载均衡 算法 前端开发
同步和异步
同步和异步
86 0
|
6月前
|
Java 数据处理 调度
异步、半同步、同步
异步、半同步、同步
138 0
|
6月前
|
存储 JavaScript 前端开发
|
C#
C#异步详解
c#异步编程原理,await asnyc的使用方法
61 0
|
API
59 # 异步串行和异步并发删除目录
59 # 异步串行和异步并发删除目录
46 0
|
消息中间件 缓存 前端开发
同步转异步处理|学习笔记
快速学习同步转异步处理
344 0
同步转异步处理|学习笔记
|
消息中间件 缓存 前端开发
同步转异步处理|学习笔记
快速学习同步转异步处理
同步转异步处理|学习笔记
|
消息中间件 存储 Java
同步刷盘分析|学习笔记
快速学习同步刷盘分析
同步刷盘分析|学习笔记