请问你知道分布式系统的预写日志设计模式么?(下)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 请问你知道分布式系统的预写日志设计模式么?(下)

RocketMQ 将消息存储在 Commitlog 文件后,异步更新 ConsumeQueue 还有 Index 文件。这个 ConsumeQueue 还有 Index 文件可以理解为存储状态,CommitLog 在这里扮演的就是 WAL 日志的角色:只有写入到 ConsumeQueue 的消息才会被消费者消费,只有 Index 文件中存在的记录才能被读取定位到。如果消息成功写入 CommitLog 但是异步更新还没执行,RocketMQ 进程挂掉了,这样就存在了不一致。所以在 RocketMQ 启动的时候,会通过如下机制保证 Commitlog 与 ConsumeQueue 还有 Index 的最终一致性.

入口是DefaultMessageStoreload方法:

public boolean load() {
    boolean result = true;
    try {
        //RocketMQ Broker启动时会创建${ROCKET_HOME}/store/abort文件,并添加JVM shutdownhook删除这个文件
        //通过这个文件是否存判断是否为正常退出
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
        //加载延迟队列消息,这里先忽略
        if (null != scheduleMessageService) {
            result = result && this.scheduleMessageService.load();
        }
        //加载 Commit Log 文件
        result = result && this.commitLog.load();
        //加载 Consume Queue 文件
        result = result && this.loadConsumeQueue();
        if (result) {
            //加载存储检查点
            this.storeCheckpoint =
                new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
            //加载 index,如果不是正常退出,销毁所有索引上次刷盘时间小于索引文件最大消息时间戳的文件
            this.indexService.load(lastExitOK);
            //进行 recover 恢复之前状态
            this.recover(lastExitOK);
            log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
        }
    } catch (Exception e) {
        log.error("load exception", e);
        result = false;
    }
    if (!result) {
        this.allocateMappedFileService.shutdown();
    }
    return result;
}

进行恢复是DefaultMessageStorerecover方法:

private void recover(final boolean lastExitOK) {
    long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
    //根据上次是否正常退出,采用不同的恢复方式
    if (lastExitOK) {
        this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
    } else {
        this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
    }
    this.recoverTopicQueueTable();
}

当上次正常退出时:

public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        //只扫描最后三个文件
        int index = mappedFiles.size() - 3;
        if (index < 0)
            index = 0;
        MappedFile mappedFile = mappedFiles.get(index);
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        long processOffset = mappedFile.getFileFromOffset();
        long mappedFileOffset = 0;
        while (true) {
            //检验存储消息是否有效
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            int size = dispatchRequest.getMsgSize();
            //如果有效,添加这个偏移
            if (dispatchRequest.isSuccess() && size > 0) {
                mappedFileOffset += size;
            }
            //如果有效,但是大小是0,代表到了文件末尾,切换文件
            else if (dispatchRequest.isSuccess() && size == 0) {
                index++;
                if (index >= mappedFiles.size()) {
                    // Current branch can not happen
                    log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
                    break;
                } else {
                    mappedFile = mappedFiles.get(index);
                    byteBuffer = mappedFile.sliceByteBuffer();
                    processOffset = mappedFile.getFileFromOffset();
                    mappedFileOffset = 0;
                    log.info("recover next physics file, " + mappedFile.getFileName());
                }
            }
            //只有有无效的消息,就在这里停止,之后会丢弃掉这个消息之后的所有内容
            else if (!dispatchRequest.isSuccess()) {
                log.info("recover physics file end, " + mappedFile.getFileName());
                break;
            }
        }
        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        //根据有效偏移量,删除这个偏移量以后的所有文件,以及所有文件(正常是只有最后一个有效文件,而不是所有文件)中大于这个偏移量的部分
        this.mappedFileQueue.truncateDirtyFiles(processOffset);
        //根据 commit log 中的有效偏移量,清理 consume queue
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    } else {
        //所有commit log都删除了,那么偏移量就从0开始
        log.warn("The commitlog files are deleted, and delete the consume queue files");
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}


当上次没有正常退出时:

public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        // 从最后一个文件开始,向前寻找第一个正常的可以恢复消息的文件
        // 从这个文件开始恢复消息,因为里面的消息有成功写入过 consumer queue 以及 index 的,所以从这里恢复一定能保证最终一致性
        // 但是会造成某些已经写入过 consumer queue 的消息再次写入,也就是重复消费。
        int index = mappedFiles.size() - 1;
        MappedFile mappedFile = null;
        for (; index >= 0; index--) {
            mappedFile = mappedFiles.get(index);
            //寻找第一个有正常消息的文件
            if (this.isMappedFileMatchedRecover(mappedFile)) {
                log.info("recover from this mapped file " + mappedFile.getFileName());
                break;
            }
        }
        //如果小于0,就恢复所有 commit log,或者代表没有 commit log
        if (index < 0) {
            index = 0;
            mappedFile = mappedFiles.get(index);
        }
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        long processOffset = mappedFile.getFileFromOffset();
        long mappedFileOffset = 0;
        while (true) {
            //验证消息有效性
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            int size = dispatchRequest.getMsgSize();
            //如果消息有效
            if (dispatchRequest.isSuccess()) {
                if (size > 0) {
                    mappedFileOffset += size;
                    if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                        //如果允许消息重复转发,则需要判断当前消息是否消息偏移小于已确认的偏移,只有小于的进行重新分发
                        if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                            //重新分发消息,也就是更新 consume queue 和 index
                            this.defaultMessageStore.doDispatch(dispatchRequest);
                        }
                    } else {
                        //重新分发消息,也就是更新 consume queue 和 index
                        this.defaultMessageStore.doDispatch(dispatchRequest);
                    }
                }
                //大小为0代表已经读完,切换下一个文件
                else if (size == 0) {
                    index++;
                    if (index >= mappedFiles.size()) {
                        // The current branch under normal circumstances should
                        // not happen
                        log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
            } else {
                log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
                break;
            }
        }
        //更新偏移
        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        this.mappedFileQueue.truncateDirtyFiles(processOffset);
        //清理
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    }
    // Commitlog case files are deleted
    else {
        log.warn("The commitlog files are deleted, and delete the consume queue files");
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}

总结起来就是:

  • 首先,根据 abort 文件是否存在判断上次是否正常退出。
  • 对于正常退出的:
  • 扫描倒数三个文件,记录有效消息的偏移
  • 扫描到某个无效消息结束,或者扫描完整个文件
  • 设置最新偏移,同时根据这个偏移量清理 commit log 和 consume queue


  • 对于没有正常退出的:
  • 从最后一个文件开始,向前寻找第一个正常的可以恢复消息的文件
  • 从这个文件开始恢复并重发消息,因为里面的消息有成功写入过 consumer queue 以及 index 的,所以从这里恢复一定能保证最终一致性。但是会造成某些已经写入过 consumer queue 的消息再次写入,也就是重复消费。
  • 更新偏移,清理



数据库


基本上所有的数据库都会有 WAL 类似的设计,例如 MySQL 的 Innodb redo log 等等。


微信图片_20220625123541.jpg


微信图片_20220625123544.jpg


一致性存储


例如 ZK 还有 ETCD 这样的一致性中间件。


相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
19天前
|
设计模式 存储 算法
分布式系统架构5:限流设计模式
本文是小卷关于分布式系统架构学习的第5篇,重点介绍限流器及4种常见的限流设计模式:流量计数器、滑动窗口、漏桶和令牌桶。限流旨在保护系统免受超额流量冲击,确保资源合理分配。流量计数器简单但存在边界问题;滑动窗口更精细地控制流量;漏桶平滑流量但配置复杂;令牌桶允许突发流量。此外,还简要介绍了分布式限流的概念及实现方式,强调了限流的代价与收益权衡。
60 11
|
21天前
|
设计模式 监控 Java
分布式系统架构4:容错设计模式
这是小卷对分布式系统架构学习的第4篇文章,重点介绍了三种常见的容错设计模式:断路器模式、舱壁隔离模式和重试模式。断路器模式防止服务故障蔓延,舱壁隔离模式通过资源隔离避免全局影响,重试模式提升短期故障下的调用成功率。文章还对比了这些模式的优缺点及适用场景,并解释了服务熔断与服务降级的区别。尽管技术文章阅读量不高,但小卷坚持每日更新以促进个人成长。
45 11
|
1月前
|
存储 运维 数据可视化
如何为微服务实现分布式日志记录
如何为微服务实现分布式日志记录
70 1
|
3月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
66 1
|
4月前
|
设计模式 SQL 安全
PHP中的设计模式:单例模式的深入探索与实践在PHP的编程实践中,设计模式是解决常见软件设计问题的最佳实践。单例模式作为设计模式中的一种,确保一个类只有一个实例,并提供全局访问点,广泛应用于配置管理、日志记录和测试框架等场景。本文将深入探讨单例模式的原理、实现方式及其在PHP中的应用,帮助开发者更好地理解和运用这一设计模式。
在PHP开发中,单例模式通过确保类仅有一个实例并提供一个全局访问点,有效管理和访问共享资源。本文详细介绍了单例模式的概念、PHP实现方式及应用场景,并通过具体代码示例展示如何在PHP中实现单例模式以及如何在实际项目中正确使用它来优化代码结构和性能。
61 2
|
4月前
|
运维 NoSQL Java
SpringBoot接入轻量级分布式日志框架GrayLog技术分享
在当今的软件开发环境中,日志管理扮演着至关重要的角色,尤其是在微服务架构下,分布式日志的统一收集、分析和展示成为了开发者和运维人员必须面对的问题。GrayLog作为一个轻量级的分布式日志框架,以其简洁、高效和易部署的特性,逐渐受到广大开发者的青睐。本文将详细介绍如何在SpringBoot项目中接入GrayLog,以实现日志的集中管理和分析。
322 1
|
5月前
|
消息中间件 JSON 自然语言处理
Python多进程日志以及分布式日志的实现方式
python日志模块logging支持多线程,但是在多进程下写入日志文件容易出现下面的问题: PermissionError: [WinError 32] 另一个程序正在使用此文件,进程无法访问。 也就是日志文件被占用的情况,原因是多个进程的文件handler对日志文件进行操作产生的。
|
5月前
|
存储 监控 数据可视化
性能监控之JMeter分布式压测轻量日志解决方案
【8月更文挑战第11天】性能监控之JMeter分布式压测轻量日志解决方案
112 0
性能监控之JMeter分布式压测轻量日志解决方案
|
5月前
|
存储 消息中间件 缓存
Waltz 一种分布式预写日志系统
Waltz 一种分布式预写日志系统
56 1
|
6月前
|
消息中间件 JSON 自然语言处理
python多进程日志以及分布式日志的实现方式
python日志在多进程环境下的问题 python日志模块logging支持多线程,但是在多进程下写入日志文件容易出现下面的问题: PermissionError: [WinError 32] 另一个程序正在使用此文件,进程无法访问。 也就是日志文件被占用的情况,原因是多个进程的文件handler对日志文件进行操作产生的。