请问你知道分布式系统的预写日志设计模式么?(下)

简介: 请问你知道分布式系统的预写日志设计模式么?(下)

RocketMQ 将消息存储在 Commitlog 文件后,异步更新 ConsumeQueue 还有 Index 文件。这个 ConsumeQueue 还有 Index 文件可以理解为存储状态,CommitLog 在这里扮演的就是 WAL 日志的角色:只有写入到 ConsumeQueue 的消息才会被消费者消费,只有 Index 文件中存在的记录才能被读取定位到。如果消息成功写入 CommitLog 但是异步更新还没执行,RocketMQ 进程挂掉了,这样就存在了不一致。所以在 RocketMQ 启动的时候,会通过如下机制保证 Commitlog 与 ConsumeQueue 还有 Index 的最终一致性.

入口是DefaultMessageStoreload方法:

public boolean load() {
    boolean result = true;
    try {
        //RocketMQ Broker启动时会创建${ROCKET_HOME}/store/abort文件,并添加JVM shutdownhook删除这个文件
        //通过这个文件是否存判断是否为正常退出
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
        //加载延迟队列消息,这里先忽略
        if (null != scheduleMessageService) {
            result = result && this.scheduleMessageService.load();
        }
        //加载 Commit Log 文件
        result = result && this.commitLog.load();
        //加载 Consume Queue 文件
        result = result && this.loadConsumeQueue();
        if (result) {
            //加载存储检查点
            this.storeCheckpoint =
                new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
            //加载 index,如果不是正常退出,销毁所有索引上次刷盘时间小于索引文件最大消息时间戳的文件
            this.indexService.load(lastExitOK);
            //进行 recover 恢复之前状态
            this.recover(lastExitOK);
            log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
        }
    } catch (Exception e) {
        log.error("load exception", e);
        result = false;
    }
    if (!result) {
        this.allocateMappedFileService.shutdown();
    }
    return result;
}

进行恢复是DefaultMessageStorerecover方法:

private void recover(final boolean lastExitOK) {
    long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
    //根据上次是否正常退出,采用不同的恢复方式
    if (lastExitOK) {
        this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
    } else {
        this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
    }
    this.recoverTopicQueueTable();
}

当上次正常退出时:

public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        //只扫描最后三个文件
        int index = mappedFiles.size() - 3;
        if (index < 0)
            index = 0;
        MappedFile mappedFile = mappedFiles.get(index);
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        long processOffset = mappedFile.getFileFromOffset();
        long mappedFileOffset = 0;
        while (true) {
            //检验存储消息是否有效
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            int size = dispatchRequest.getMsgSize();
            //如果有效,添加这个偏移
            if (dispatchRequest.isSuccess() && size > 0) {
                mappedFileOffset += size;
            }
            //如果有效,但是大小是0,代表到了文件末尾,切换文件
            else if (dispatchRequest.isSuccess() && size == 0) {
                index++;
                if (index >= mappedFiles.size()) {
                    // Current branch can not happen
                    log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
                    break;
                } else {
                    mappedFile = mappedFiles.get(index);
                    byteBuffer = mappedFile.sliceByteBuffer();
                    processOffset = mappedFile.getFileFromOffset();
                    mappedFileOffset = 0;
                    log.info("recover next physics file, " + mappedFile.getFileName());
                }
            }
            //只有有无效的消息,就在这里停止,之后会丢弃掉这个消息之后的所有内容
            else if (!dispatchRequest.isSuccess()) {
                log.info("recover physics file end, " + mappedFile.getFileName());
                break;
            }
        }
        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        //根据有效偏移量,删除这个偏移量以后的所有文件,以及所有文件(正常是只有最后一个有效文件,而不是所有文件)中大于这个偏移量的部分
        this.mappedFileQueue.truncateDirtyFiles(processOffset);
        //根据 commit log 中的有效偏移量,清理 consume queue
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    } else {
        //所有commit log都删除了,那么偏移量就从0开始
        log.warn("The commitlog files are deleted, and delete the consume queue files");
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}


当上次没有正常退出时:

public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        // 从最后一个文件开始,向前寻找第一个正常的可以恢复消息的文件
        // 从这个文件开始恢复消息,因为里面的消息有成功写入过 consumer queue 以及 index 的,所以从这里恢复一定能保证最终一致性
        // 但是会造成某些已经写入过 consumer queue 的消息再次写入,也就是重复消费。
        int index = mappedFiles.size() - 1;
        MappedFile mappedFile = null;
        for (; index >= 0; index--) {
            mappedFile = mappedFiles.get(index);
            //寻找第一个有正常消息的文件
            if (this.isMappedFileMatchedRecover(mappedFile)) {
                log.info("recover from this mapped file " + mappedFile.getFileName());
                break;
            }
        }
        //如果小于0,就恢复所有 commit log,或者代表没有 commit log
        if (index < 0) {
            index = 0;
            mappedFile = mappedFiles.get(index);
        }
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        long processOffset = mappedFile.getFileFromOffset();
        long mappedFileOffset = 0;
        while (true) {
            //验证消息有效性
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            int size = dispatchRequest.getMsgSize();
            //如果消息有效
            if (dispatchRequest.isSuccess()) {
                if (size > 0) {
                    mappedFileOffset += size;
                    if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                        //如果允许消息重复转发,则需要判断当前消息是否消息偏移小于已确认的偏移,只有小于的进行重新分发
                        if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                            //重新分发消息,也就是更新 consume queue 和 index
                            this.defaultMessageStore.doDispatch(dispatchRequest);
                        }
                    } else {
                        //重新分发消息,也就是更新 consume queue 和 index
                        this.defaultMessageStore.doDispatch(dispatchRequest);
                    }
                }
                //大小为0代表已经读完,切换下一个文件
                else if (size == 0) {
                    index++;
                    if (index >= mappedFiles.size()) {
                        // The current branch under normal circumstances should
                        // not happen
                        log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
            } else {
                log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
                break;
            }
        }
        //更新偏移
        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        this.mappedFileQueue.truncateDirtyFiles(processOffset);
        //清理
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    }
    // Commitlog case files are deleted
    else {
        log.warn("The commitlog files are deleted, and delete the consume queue files");
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}

总结起来就是:

  • 首先,根据 abort 文件是否存在判断上次是否正常退出。
  • 对于正常退出的:
  • 扫描倒数三个文件,记录有效消息的偏移
  • 扫描到某个无效消息结束,或者扫描完整个文件
  • 设置最新偏移,同时根据这个偏移量清理 commit log 和 consume queue


  • 对于没有正常退出的:
  • 从最后一个文件开始,向前寻找第一个正常的可以恢复消息的文件
  • 从这个文件开始恢复并重发消息,因为里面的消息有成功写入过 consumer queue 以及 index 的,所以从这里恢复一定能保证最终一致性。但是会造成某些已经写入过 consumer queue 的消息再次写入,也就是重复消费。
  • 更新偏移,清理



数据库


基本上所有的数据库都会有 WAL 类似的设计,例如 MySQL 的 Innodb redo log 等等。


微信图片_20220625123541.jpg


微信图片_20220625123544.jpg


一致性存储


例如 ZK 还有 ETCD 这样的一致性中间件。


相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
存储 设计模式 NoSQL
分布式系统设计模式,你用过哪些?(2)
分布式系统设计模式,你用过哪些?
103 0
|
消息中间件 机器学习/深度学习 设计模式
分布式系统设计模式,你用过哪些?(1)
分布式系统设计模式,你用过哪些?
109 0
|
设计模式 存储 消息中间件
请问你知道分布式系统设计模式的最低水位线思想么?
请问你知道分布式系统设计模式的最低水位线思想么?
|
存储 消息中间件 设计模式
请问你知道分布式系统设计模式的分割日志思想么?
请问你知道分布式系统设计模式的分割日志思想么?
请问你知道分布式系统设计模式的分割日志思想么?
|
存储 设计模式 消息中间件
请问你知道分布式系统的预写日志设计模式么?(上)
请问你知道分布式系统的预写日志设计模式么?(上)
请问你知道分布式系统的预写日志设计模式么?(上)
|
消息中间件 存储 设计模式
分布式系统设计模式 - 预写日志(Write Ahead Log)(下)
分布式系统设计模式 - 预写日志(Write Ahead Log)(下)
分布式系统设计模式 - 预写日志(Write Ahead Log)(下)
|
22天前
|
安全 Linux 网络安全
/var/log/secure日志详解
Linux系统的 `/var/log/secure` 文件记录安全相关消息,包括身份验证和授权尝试。它涵盖用户登录(成功或失败)、`sudo` 使用、账户锁定解锁及其他安全事件和PAM错误。例如,SSH登录成功会显示&quot;Accepted password&quot;,失败则显示&quot;Failed password&quot;。查看此文件可使用 `tail -f /var/log/secure`,但通常只有root用户有权访问。
67 4
|
24天前
|
监控 Linux 网络安全
/var/log/auth.log日志说明
`/var/log/auth.log`是Linux系统记录身份验证和授权事件的日志文件,包括登录尝试、SSH连接、sudo操作等。系统管理员可通过它监控用户登录、检查失败尝试、跟踪SSH活动、查看sudo/su操作及PAM活动。日志内容可能因系统配置而异,可能存在于其他日志文件中。分析这些日志可使用`tail`、`grep`等命令或专用日志分析工具。了解系统和其服务详情有助于提取有用信息。
38 2
|
1天前
|
XML Java Maven
Springboot整合与使用log4j2日志框架【详解版】
该文介绍了如何在Spring Boot中切换默认的LogBack日志系统至Log4j2。首先,需要在Maven依赖中排除`spring-boot-starter-logging`并引入`spring-boot-starter-log4j2`。其次,创建`log4j2-spring.xml`配置文件放在`src/main/resources`下,配置包括控制台和文件的日志输出、日志格式和文件切分策略。此外,可通过在不同环境的`application.yml`中指定不同的log4j2配置文件。最后,文章提到通过示例代码解释了日志格式中的各种占位符含义。
|
1天前
|
运维 监控 Go
Golang深入浅出之-Go语言中的日志记录:log与logrus库
【4月更文挑战第27天】本文比较了Go语言中标准库`log`与第三方库`logrus`的日志功能。`log`简单但不支持日志级别配置和多样化格式,而`logrus`提供更丰富的功能,如日志级别控制、自定义格式和钩子。文章指出了使用`logrus`时可能遇到的问题,如全局logger滥用、日志级别设置不当和过度依赖字段,并给出了避免错误的建议,强调理解日志级别、合理利用结构化日志、模块化日志管理和定期审查日志配置的重要性。通过这些实践,开发者能提高应用监控和故障排查能力。
8 1