开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):消息队列和索引文件恢复】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12486
消息队列和索引文件恢复
为什么要做消息队列和索引文件恢复。因为由于RocketMQ将信息全量存储在 CommitLog 文件中,然后异步生成转发任务更新 ConsumerQueue 和 Index 文件。如果消息成功存储到 CommitLog 文件中,转发任务未成功执行,此时消息服务器Broker由于某个愿意宕机,导致 CommitLog . ConsumerQueue IndexFile 文件数据不一 致。 如果不加以人工修复,会有一部分消息即便在 CommitLog 中文件中存在,但由于没有转发到 ConsumerQueue ,这部分消息将永远复发被消费者消费。所以我们要进行消息队列和索引文件的恢复,其实恢复的主要目的是将 CommitLog 中的数据给转发到 ConsumerQueue 和 IndexFile 。不论上一次是正常退出还是异常退出,我们都得要做这个事,保证这几个数据的同步性。
文件恢复的入口在 DefaultMessageStore 中 public boolean load() 方法这。 在 load 方法中的 boolean lastExitok = !this.isTempFileExist(); 语句中首先判断了一下上一次退出的状态,观察是正常还是异常, isTempFileExist 是如何判断上一次退出的状态?
在 dataDir 中有个 abort 文件。 abort文件在 RocketMQ 启动的时候会把它创建出来,创建出来里面什么都不写是0 KB 。在正常关闭的时候,它就会去删除文件。如果异常关闭,就没有来得及删除文件,所以可以根据 abort 文件它在或者不在去判断上一次 RocketMQ 退出的状态。如果不在,就是正常退出;如果在说明上一次就是异常退出。
拿到一个标识 lastExitOK ,这个标识就会正常去使用。紧接着会加载一系列文件到内存里面:
if (null!= scheduleMessageservice) {
result = result && this.scheduleMessageService.load();
}
//加入延时队列 scheduleMessageService
result = result && this.commitLog. load();
//加载 commitLog
result = result && this.loadConsumeQueue();
//加载 ConsumeQueue
最后加载 StoreCheckpoint 文件监测点。
文件检测点里存放了它上一次所转发的位置。这个文件一定要去加载,就是在程序运行的时候所产生的一些配置信息。
然后紧接着加载索引文件把 dataDir 里面的文件信息全都去加载。
进行文件的恢复。第一种是正常退出的文件恢复this.indexService.load(lastExitOK);
,第二种是异常退出的文件恢复——this.recover(lastExitOK);。文件恢复的入口在 recover ,传了上一次 lastExitOK 这个标志。在这里面进行一个文件恢复的处理。在以下代码位置做了判断:
if (lastExitoK) {
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
//正常退出恢复逻辑
} else {
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
//异常退出恢复逻辑
}
上面是正常退出,下面是异常退出恢复的逻辑。