开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):异步刷盘说明】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12489
异步刷盘说明
异步刷盘
异步刷盘指的是在消息追加到内存后,立即返回给消息发送端。如果开启 transientStorePoolEnable , RocketMQ 会单独申请一个与目标物理文件(commitLog)同样大小的对外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到物理文件的内存映射中,然后刷写到磁盘。如果末开启 transientStorePoolEnable , 消息直接追加到物理文件直接映射文件中,然后刷写到磁盘中。
针对刚才的流程,如下图进行说明。 ByteBuffer(Direct) 就是堆外内存的空间, MappedByteBuffer 是物流文件所对应的映射文件。
执行步骤,第一步是将消息直接追加到 ByteBuffer (堆外内存)。第二步 CommitRealTimeService 线程每隔 200ms 将 ByteBuffer 新追加内容提交到 MappedByteBuffer 中。
第三步 MappedByteBuffer 在内存中追加提交的内容, wrotePosition 指针向后移动,说明追加成功。
第四步 commit 操作成功返回,将 committedPosition 位置恢复。
第五步 FlushRealTimeService 线程默认每 500ms 将 MappedByteBuffer 中新追加的内存刷写到磁盘。
通过流程分析,这里面有两个服务类非常重要,第一个是 CommitRealTimeService ,第二个是 FlushRealTimeService ,一个是用来提交堆外的数据到内存映射文件当中去,另外一个是负责将内存映射文件刷到磁盘中去。
找到刷盘方法
else 这里执行的是 if 刷盘。 if 刷盘首先在这里做了一个判断,如果 isTransientStorePoolEnable 它没有打开就直接刷,如果打开就先进行 commitLogService.wakeup 提交这个操作。
else{
if(!This.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()){
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
进入下图所示位置,
在这里会每隔 200ms 去进行一次数据的提交工作
private int commitIntervalCommitLog = 200;
数据的提交工作,首先先去获得了数据一次提交的至少页数,数据大于页数之后才去真正进行提交。
下面还设置了一个两次真实提交的最大时间间隔,默认200ms,意思是如果上次提交的时间超过最大的时间,下一次提交就会直接去提交不会去等待。
//间隔时间,默认200msint interval =
CommitLog. this. defaultMessagestore. getMessageStoreconfig(). getCommitInterva1commitLog();
//一次提交的至少页数int commitDataLeastPages =
CommitLog. this. defaultMessagestore. getMessagestoreConfig() . getcommi tCommi tLogLeastPages();
//两次真实提交的最大间隔,默认200msint commitDataThoroughInterval =
CommitLog. this. defau1tMessagestore. detMessagestoreconfig(). getcommi tCommitLogThoroughInterval();
//上次提交间隔超过 commitDataThoroughInterval ,则忽略提交 commitDataThoroughInterval 参数,直接提交long begin = system. currentTimeMillis();
if (begin >= (this. 1astcommitTimestamp + commi tDataThoroughInterva1)) {
this. astcommitTimestamp = begin;
commitDataLeastPages = 0;
}
mappedFileQueue 就是正在进行提交工作,就是把它提交到物理文件所对应的内存映射文件当中去。提交完之后,进行了 flushCommitLogService 操作。
//执行提交操作 ,将待提交数据提交到物理文件的内存映射区boolean result = CommitLog. this .mappedFileQueue. commit (commi tDataLeastPages);
Long end = system currentTimeMillis());
if (!resu1t) {
this. lastcommitTimestamp = end; //result = false means some data commi tted.
//now wake up flush thread.
//唤醒刷盘线程
flushcommitLogService . wakeup());
}
if (end- begin > 500) {
1og. info("Commit data to fi1e costs {} ms", end - begin);
}
this . waitForRunni ng(interva1);
目的是唤醒刷盘的线程。
flushCommitLogService.wakeup();
让刷盘的线程负责进行刷盘的处理,FlushRealTimeService 就是负责刷盘。
如何进行刷盘,刷盘的方式和同步刷盘的基本流程是差不多的,只不过一个需要阻塞,一个不需要阻塞。线程的时间间隔,一次刷写任务至少包含的页数,设置了两次真实刷写任务的最大时间间隔,同样是判断,如果上次刷的时间太过长,那么下一次就不会等待这么长时间,直接刷就可以。
//表示 wait 方法等待,默认 falseboolean flushcommitLogTimed =
CommitLog. this. defaultMessagestore. getMessagestoreconfig().isFlushcommi tLogTimed();
//线程执行时间间隔int interval =
CommitLog. this. defaultMessagestore. getMessagestoreconfi g(). getFlushInterva commitLog();
//一次刷写任务至少包含页数int flushPhysicQueueLeastPages =
CommitLog. this. defaultMessagestore . getMessagestoreconfig(). getFlushCommitLogLeastPages ();
//两次真实刷写任务最大间隔int flushPhysicQueueThoroughInterval =
CommitLog. this . defaultMessagestore . getMessagestoreConfig(). getFlushCommi tLogThoroughInterval();
...
//距离上次提交间隔超过 flushPhysicQueueThoroughInterval ,则本次刷盘任务将忽略 flushPhysicQueueLeastPages ,直接提交1ong currentTimeMi11is = system. currentTimeMillis();
if (currentTimeMillis >= (this.1astF1ushTimestamp + flushPhysicQueueThoroughInterva1)) {
this. lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
pri ntFlushProgress = (printTimes++ % 10) == 0;
}
...
//执行一次刷盘前,先等待指定时间间隔if (flushCommitLogTimed) {
Thread. sleep(interval);
} else {
this. waitForRunni ng(interva1);
}
...
long begin = system. currentTimeMillis();
//刷写磁盘Commitlog. this . mappedFilequeue. flush(flushphysicQueueLeastpages);
long storeTimestamp = Commi tLog. this. mappedFilequeue . getStoreTimestamp();
if (storeTimestamp > 0) {
//更新存储监测点文件的时间戳Commi tlog. this . defaultMessagestore. getstorecheckpoint (). setPhysicMsgTimestamp(storeTimestamp);
然后执行一次刷盘之前,一般先让线程睡眠一会,就开始进行刷盘。CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
将内存映射文件当中的数据给它刷写到物理文件当中去。刷完之后又更新了一下文件的刷盘点。把刷盘的指针去更新,更新到 config 中。
异步刷盘中有两个服务类非常重要,一个是 CommitRealTimeService 负责数据提交,第二个是 FlushRealTimeService 负责刷盘操作的。其中第一个线程每隔200ms执行一次,第二个线程每隔500ms 执行一次。