异步刷盘说明|学习笔记

简介: 快速学习异步刷盘说明

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)异步刷盘说明】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12489


异步刷盘说明


异步刷盘

异步刷盘指的是在消息追加到内存后,立即返回给消息发送端。如果开启  transientStorePoolEnable ,  RocketMQ 会单独申请一个与目标物理文件(commitLog)同样大小的对外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到物理文件的内存映射中,然后刷写到磁盘。如果末开启 transientStorePoolEnable , 消息直接追加到物理文件直接映射文件中,然后刷写到磁盘中。

针对刚才的流程,如下图进行说明。 ByteBuffer(Direct) 就是堆外内存的空间, MappedByteBuffer 是物流文件所对应的映射文件。

image.png执行步骤,第一步是将消息直接追加到 ByteBuffer (堆外内存)。第二步 CommitRealTimeService 线程每隔 200ms 将 ByteBuffer 新追加内容提交到 MappedByteBuffer 中。

第三步 MappedByteBuffer 在内存中追加提交的内容, wrotePosition 指针向后移动,说明追加成功。

第四步 commit 操作成功返回,将 committedPosition 位置恢复。

第五步 FlushRealTimeService 线程默认每 500ms 将 MappedByteBuffer 中新追加的内存刷写到磁盘。

通过流程分析,这里面有两个服务类非常重要,第一个是 CommitRealTimeService ,第二个是 FlushRealTimeService ,一个是用来提交堆外的数据到内存映射文件当中去,另外一个是负责将内存映射文件刷到磁盘中去。

找到刷盘方法

else 这里执行的是 if 刷盘。 if 刷盘首先在这里做了一个判断,如果 isTransientStorePoolEnable 它没有打开就直接刷,如果打开就先进行 commitLogService.wakeup 提交这个操作。

else{if(!This.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()){

flushCommitLogService.wakeup();

} else {

commitLogService.wakeup();

}

}

进入下图所示位置,

image.png在这里会每隔 200ms 去进行一次数据的提交工作

private int commitIntervalCommitLog = 200;

数据的提交工作,首先先去获得了数据一次提交的至少页数,数据大于页数之后才去真正进行提交。

下面还设置了一个两次真实提交的最大时间间隔,默认200ms,意思是如果上次提交的时间超过最大的时间,下一次提交就会直接去提交不会去等待。

//间隔时间,默认200ms
int interval =
CommitLog. this. defaultMessagestore. getMessageStoreconfig(). getCommitInterva1commitLog();

//一次提交的至少页数
int commitDataLeastPages =
CommitLog. this. defaultMessagestore. getMessagestoreConfig() . getcommi tCommi tLogLeastPages();
//两次真实提交的最大间隔,默认200ms
int commitDataThoroughInterval =
CommitLog. this. defau1tMessagestore. detMessagestoreconfig(). getcommi tCommitLogThoroughInterval();

//上次提交间隔超过 commitDataThoroughInterval ,则忽略提交 commitDataThoroughInterval 参数,直接提交long begin = system. currentTimeMillis();
if (begin >= (this. 1astcommitTimestamp + commi tDataThoroughInterva1)) {
this. astcommitTimestamp = begin;
commitDataLeastPages = 0;
}

mappedFileQueue 就是正在进行提交工作,就是把它提交到物理文件所对应的内存映射文件当中去。提交完之后,进行了 flushCommitLogService 操作。

//执行提交操作 ,将待提交数据提交到物理文件的内存映射区
boolean result = CommitLog. this .mappedFileQueue. commit (commi tDataLeastPages);
Long end = system currentTimeMillis());
if (!resu1t) {
this. lastcommitTimestamp = end; //result = false means some data commi tted.
//now wake up flush thread.

//唤醒刷盘线程

flushcommitLogService . wakeup());

}
if (end- begin > 500) {
1og. info("Commit data to fi1e costs {} ms", end - begin);
}
this . waitForRunni ng(interva1);

目的是唤醒刷盘的线程。

flushCommitLogService.wakeup();

让刷盘的线程负责进行刷盘的处理,FlushRealTimeService 就是负责刷盘。

image.png如何进行刷盘,刷盘的方式和同步刷盘的基本流程是差不多的,只不过一个需要阻塞,一个不需要阻塞。线程的时间间隔,一次刷写任务至少包含的页数,设置了两次真实刷写任务的最大时间间隔,同样是判断,如果上次刷的时间太过长,那么下一次就不会等待这么长时间,直接刷就可以。

//表示 wait 方法等待,默认 false
boolean flushcommitLogTimed =
CommitLog. this. defaultMessagestore. getMessagestoreconfig().isFlushcommi tLogTimed();
//线程执行时间间隔
int interval =
CommitLog. this. defaultMessagestore. getMessagestoreconfi g(). getFlushInterva commitLog();
//一次刷写任务至少包含页数
int flushPhysicQueueLeastPages =
CommitLog. this. defaultMessagestore . getMessagestoreconfig(). getFlushCommitLogLeastPages ();
//两次真实刷写任务最大间隔
int flushPhysicQueueThoroughInterval =
CommitLog. this . defaultMessagestore . getMessagestoreConfig(). getFlushCommi tLogThoroughInterval();
...

//距离上次提交间隔超过 flushPhysicQueueThoroughInterval ,则本次刷盘任务将忽略 flushPhysicQueueLeastPages ,直接提交
1ong currentTimeMi11is = system. currentTimeMillis();
if (currentTimeMillis >= (this.1astF1ushTimestamp + flushPhysicQueueThoroughInterva1)) {
this. lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
pri ntFlushProgress = (printTimes++ % 10) == 0;

}

...

//执行一次刷盘前,先等待指定时间间隔
if (flushCommitLogTimed) {
Thread. sleep(interval);
} else {
this. waitForRunni ng(interva1);
}

...
long begin = system. currentTimeMillis();
//刷写磁盘
Commitlog. this . mappedFilequeue. flush(flushphysicQueueLeastpages);
long storeTimestamp = Commi tLog. this. mappedFilequeue . getStoreTimestamp();
if (storeTimestamp > 0) {
//更新存储监测点文件的时间戳
Commi tlog. this . defaultMessagestore. getstorecheckpoint (). setPhysicMsgTimestamp(storeTimestamp);

然后执行一次刷盘之前,一般先让线程睡眠一会,就开始进行刷盘。CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);

将内存映射文件当中的数据给它刷写到物理文件当中去。刷完之后又更新了一下文件的刷盘点。把刷盘的指针去更新,更新到 config 中。

image.png

异步刷盘中有两个服务类非常重要,一个是 CommitRealTimeService 负责数据提交,第二个是 FlushRealTimeService 负责刷盘操作的。其中第一个线程每隔200ms执行一次,第二个线程每隔500ms 执行一次。

相关文章
|
存储 算法 数据库
阿里云InfluxDB®基于TSI索引的数据查询流程
简介 时间序列数据库是目前技术发展最快的数据库类型之一。作为业界最为流行的时序数据库InfluxDB,其部署运行十分简洁方便,支持高性能时序数据的读写,在应用程序监控、物联网(IoT)领域有着广泛的应用。
4120 1
|
存储 NoSQL 数据库
时序数据库连载系列: 时序数据库一哥InfluxDB之存储机制解析
InfluxDB 的存储机制解析 本文介绍了InfluxDB对于时序数据的存储/索引的设计。由于InfluxDB的集群版已在0.12版就不再开源,因此如无特殊说明,本文的介绍对象都是指 InfluxDB 单机版 1. InfluxDB 的存储引擎演进 尽管InfluxDB自发布以来历时三年多,其存储引擎的技术架构已经做过几次重大的改动, 以下将简要介绍一下InfluxDB的存储引擎演进的过程。
7027 0
|
10天前
|
人工智能 数据可视化 机器人
零基础搭建AI应用:Coze与Dify对比指南
Coze和Dify是当前主流的AI应用开发平台,两者定位和特点差异显著。Coze适合快速搭建聊天机器人,尤其适合非技术人员和需要快速集成的场景;Dify则更侧重高度定制和企业级需求,支持私有部署和复杂工作流。选择时应根据项目需求、技术能力及数据控制要求综合考虑,没有绝对优劣,关键看是否契合实际场景。
|
10月前
|
缓存 监控 数据挖掘
C# 一分钟浅谈:性能测试与压力测试
【10月更文挑战第20天】本文介绍了性能测试和压力测试的基础概念、目的、方法及常见问题与解决策略。性能测试关注系统在正常条件下的响应时间和资源利用率,而压力测试则在超出正常条件的情况下测试系统的极限和潜在瓶颈。文章通过具体的C#代码示例,详细探讨了忽视预热阶段、不合理测试数据和缺乏详细监控等常见问题及其解决方案,并提供了如何避免这些问题的建议。
232 7
|
10月前
|
存储 安全 开发工具
CSGHub 开源大模型资产管理平台
CSGHub是一个开源的大模型资产管理平台,支持LLM及其应用的全生命周期管理。用户可通过Web界面、Git命令或Chatbot进行资产的上传、下载、存储、校验和分发。他们刚发不到了阿里云云市场
|
存储 网络协议 算法
TCP的局限性
【8月更文挑战第20天】
482 3
|
11月前
|
机器学习/深度学习 人工智能 自然语言处理
【AI大模型】LLM主流开源大模型介绍
【AI大模型】LLM主流开源大模型介绍
|
11月前
|
缓存 UED 网络架构
网站404该怎么解决
网站404错误通常表示用户尝试访问的网页不存在或无法找到
1593 0
|
11月前
|
机器学习/深度学习 自然语言处理 机器人
深度剖析模型微调与RAG技术的完美融合:从理论到实践,带你全面了解如何利用RAG提升特定领域任务性能并附带代码示例
【10月更文挑战第2天】随着深度学习的发展,预训练模型因通用表示能力和高效性备受关注。模型微调通过在已训练模型基础上进行再训练,使其适应特定任务或数据集,提升性能。RAG(Retrieval-Augmented Generation)结合检索与生成技术,在生成响应前检索相关信息,特别适用于需要背景知识的任务。本文通过构建医学问答机器人的示例,展示如何初始化RAG模型并利用实际数据集进行微调,从而提升生成答案的准确性和可信度。
664 4