开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):正常恢复和异常恢复】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12487
正常恢复和异常恢复
内容介绍:
一.正常恢复
二.异常恢复
一.正常恢复
两种恢复的机制。第一种是正常恢复,在恢复的时候,并没有从第一个文件去恢复。是从倒数第三个开始恢复。正常退出的时候,数据基本上是同步的。从拿到第三文件的索引 index ,就开始进行恢复。在恢复的时候,首先去把 mappedFile 这个文件拿出来,再根据它获得 byteBuffer ,然后拿到上一次处理的位置 processOffset 。pub1ic void recoverNormally(long maxPhyOffsetofConsumeQueue) {
final List<MappedFile> mappedFiles = this . mappedFileQueue . getMappedFiles();
if (!mappedFiles. isEmpty()) {
// Broker 正常停止再重启时,从倒数第三个开始恢复,如果不足3个文件,则从第一个文件开始恢复。int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
MappedFile mappedFile = mappedFiles . get(index);
ByteBuffer bytBuffer = mappedFile. sliceByteBuffer();
long processoffset = mappedFile. getFileFromoffset();
//代表当前已校验通过的 offset 1ong mappedFileOffset = 0;
先去查找 byteBuffer 当中的消息。取出消息的长度,判断消息的长度如果查找的结果为 ture ,并且消息长度大于0,表示消息正确,已经同步过的消息,不需要去进行对应的处理。然后当前消息的偏移量 mappedFileOffset 往后移除当前消息长度就可以了。
如果查找结果为 true 且消息长度等于0,说明已经检测到消息的末尾。如果还有下一个文件就取出下一个文件,重置 processOffset 和 MappedFileOffset 。
while (true) {
//查找消息 DispatchRequest dispatchRequest = this. checkMessageAndReturnsize (byteBuffer,checkCRConRecover);
//消息长度int size = dispatchRequest. getMsgsize();
//查找结果为 true ,并且消息长度大于0,表示消息正确 . mappedFileoffset 向前移动本消息长度if (dispatchRequest. Issuccess() && size > 0) {
mappedFileoffset += size;
}
//如果查找结果为 true 且消息长度等于0,表示已到该文件末尾,如果还有下一个文件,则重置 processoffset 和 MappedFileoffset 重复查找下一个文件,否则跳出循环。
else if (dispatchRequest. IsSuccess() && size== 0) {
index++;
if (index >= mappedFiles.size() {
// Current branch can not happen
break;
} else {
//取出每个文件mappedFile = mappedFiles. get(index);
byteBuffer = mappedFile. sliceByteBuffer();
processoffset = mappedFile. getFileFromoffset();
mappedFileoffset = 0;
}
}
如果查找结果为 false ,说明这个文件还未被填满,直接跳出循环,结束循环。
//查找结果为 false ,表名该文件未填满所有消息,跳出循环,结束循环。
else if (!dispatchRequest. issuccess()) {
1og. info("recover physics file end, " + mappedFile. getFileName());
break;
}
}
更新 MappedFileQueue 的 flushedWhere 和 committedWhere 指针。
//更新 MappedFileQueue 的 flushedWhere 和 committedWhere 指针
processoffset += mappedFileoffset;
this. mappedFileQueue . setFlushedwhere (processoffset);
this. mappedFileQueue . setCommi ttedwhere(processoffset);
整个的这些如果都恢复完了之后,紧接着就是将 offset 之后的所有文件删除。
//删除 offset 之后的所有文件this . mappedFileQueue . truncateDi rtyFiles (processoffset);
if (maxPhyoffsetofconsumeQueue , = processoffset) {
this. defaultMessageStore. truncateDi rtyLogicFiles (processoffset);
}
} else {
this . mappedFileQueue . setFlushedwhere(0);
this . mappedFileQueue . setCommittedwhere(0) ;
this. defaultMessagestore. destroyLogics();
}
}
整个恢复逻辑在 recoverNormally 方法当中。
二、异常恢复
相比正常恢复,异常恢复实现的逻辑就在 recoverAbnormally 里面去处理。异常文件恢复步骤与正常停止文件恢复流程基本相同,主要的差别有两个。首先,正常默认从倒数第三个文件开始恢复,而异常文件停止则需要从最后一个文件往前走,找到第一个消息存储正常的文件。其次,如果 CommitLog 目录没有消息文件,如果消息消费队列目录下存在文件,则需要销毁。
异常恢复所对应的位置
if (!mappedFiles. isEmpty()) {
// Looking beginning to recover from which file
int index = mappedFiles .size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {
mappedFile = mappedFiles. get(index);
发现index - 1 ,说明从最后一个文件去进行恢复处理。
然后判断消息文件是否是一个正确的文件,从正常的文件开始慢慢地去恢复。根据索引取出 mappedFile 文件,去进行一个恢复的处理,
//判断消息文件是否是一个正确的文件if (this. isMappedFileMatchedRecover (mappedFile)){
1og. info("recover from this mapped file " + mappedFile. getFileName());
break;
}
}
//根据索引取出 ma
ppedFile文件if (index < 0) {
index = 0;
mappedFile = mappedFiles . get(index);
}
//...验证消息的合法性,并将消息转发到消息消费队列和索引文件
} else {
//未找到 mappedFile ,重置 flushwhere 、 commi ttedwhe re 都为0,销毁消息队列文件this. mappedFi lequeue. setFlushedwhere(0);
this. mappedFi lequeue. setCommi ttedwhere(0) ;
this. defaultMessagestore. destroyLogics();
}
正常恢复和异常恢复主要的区别,正常恢复是从倒数第三个,异常恢复是从倒数第一个开始去处理。