分布式系统设计模式 - 预写日志(Write Ahead Log)(下)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 分布式系统设计模式 - 预写日志(Write Ahead Log)(下)

当消息来时,写入文件的核心方法是MappedFileappendMessagesInner方法:

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    assert messageExt != null;
    assert cb != null;
    //获取当前写入位置
    int currentPos = this.wrotePosition.get();
    //如果当前写入位置小于文件大小则尝试写入
    if (currentPos < this.fileSize) {
        //mappedByteBuffer是公用的,在这里不能修改其position影响读取
        //mappedByteBuffer是文件映射内存抽象出来的文件的内存ByteBuffer
        //对这个buffer的写入,就相当于对文件的写入
        //所以通过slice方法生成一个共享原有相同内存的新byteBuffer,设置position
        //如果writeBuffer不为空,则证明启用了TransientStorePool,使用其中缓存的内存写入
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        //分单条消息还有批量消息的情况
        if (messageExt instanceof MessageExtBrokerInner) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
        } else if (messageExt instanceof MessageExtBatch) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        //增加写入大小
        this.wrotePosition.addAndGet(result.getWroteBytes());
        //更新最新消息保存时间
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}


RocketMQ 将消息存储在 Commitlog 文件后,异步更新 ConsumeQueue 还有 Index 文件。这个 ConsumeQueue 还有 Index 文件可以理解为存储状态,CommitLog 在这里扮演的就是 WAL 日志的角色:只有写入到 ConsumeQueue 的消息才会被消费者消费,只有 Index 文件中存在的记录才能被读取定位到。如果消息成功写入 CommitLog 但是异步更新还没执行,RocketMQ 进程挂掉了,这样就存在了不一致。所以在 RocketMQ 启动的时候,会通过如下机制保证 Commitlog 与 ConsumeQueue 还有 Index 的最终一致性.


入口是DefaultMessageStoreload方法:

public boolean load() {
    boolean result = true;
    try {
        //RocketMQ Broker启动时会创建${ROCKET_HOME}/store/abort文件,并添加JVM shutdownhook删除这个文件
        //通过这个文件是否存判断是否为正常退出
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
        //加载延迟队列消息,这里先忽略
        if (null != scheduleMessageService) {
            result = result && this.scheduleMessageService.load();
        }
        //加载 Commit Log 文件
        result = result && this.commitLog.load();
        //加载 Consume Queue 文件
        result = result && this.loadConsumeQueue();
        if (result) {
            //加载存储检查点
            this.storeCheckpoint =
                new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
            //加载 index,如果不是正常退出,销毁所有索引上次刷盘时间小于索引文件最大消息时间戳的文件
            this.indexService.load(lastExitOK);
            //进行 recover 恢复之前状态
            this.recover(lastExitOK);
            log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
        }
    } catch (Exception e) {
        log.error("load exception", e);
        result = false;
    }
    if (!result) {
        this.allocateMappedFileService.shutdown();
    }
    return result;
}

进行恢复是DefaultMessageStorerecover方法:

private void recover(final boolean lastExitOK) {
    long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
    //根据上次是否正常退出,采用不同的恢复方式
    if (lastExitOK) {
        this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
    } else {
        this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
    }
    this.recoverTopicQueueTable();
}

当上次正常退出时:

public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        //只扫描最后三个文件
        int index = mappedFiles.size() - 3;
        if (index < 0)
            index = 0;
        MappedFile mappedFile = mappedFiles.get(index);
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        long processOffset = mappedFile.getFileFromOffset();
        long mappedFileOffset = 0;
        while (true) {
            //检验存储消息是否有效
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            int size = dispatchRequest.getMsgSize();
            //如果有效,添加这个偏移
            if (dispatchRequest.isSuccess() && size > 0) {
                mappedFileOffset += size;
            }
            //如果有效,但是大小是0,代表到了文件末尾,切换文件
            else if (dispatchRequest.isSuccess() && size == 0) {
                index++;
                if (index >= mappedFiles.size()) {
                    // Current branch can not happen
                    log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
                    break;
                } else {
                    mappedFile = mappedFiles.get(index);
                    byteBuffer = mappedFile.sliceByteBuffer();
                    processOffset = mappedFile.getFileFromOffset();
                    mappedFileOffset = 0;
                    log.info("recover next physics file, " + mappedFile.getFileName());
                }
            }
            //只有有无效的消息,就在这里停止,之后会丢弃掉这个消息之后的所有内容
            else if (!dispatchRequest.isSuccess()) {
                log.info("recover physics file end, " + mappedFile.getFileName());
                break;
            }
        }
        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        //根据有效偏移量,删除这个偏移量以后的所有文件,以及所有文件(正常是只有最后一个有效文件,而不是所有文件)中大于这个偏移量的部分
        this.mappedFileQueue.truncateDirtyFiles(processOffset);
        //根据 commit log 中的有效偏移量,清理 consume queue
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    } else {
        //所有commit log都删除了,那么偏移量就从0开始
        log.warn("The commitlog files are deleted, and delete the consume queue files");
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}

当上次没有正常退出时:

public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        // 从最后一个文件开始,向前寻找第一个正常的可以恢复消息的文件
        // 从这个文件开始恢复消息,因为里面的消息有成功写入过 consumer queue 以及 index 的,所以从这里恢复一定能保证最终一致性
        // 但是会造成某些已经写入过 consumer queue 的消息再次写入,也就是重复消费。
        int index = mappedFiles.size() - 1;
        MappedFile mappedFile = null;
        for (; index >= 0; index--) {
            mappedFile = mappedFiles.get(index);
            //寻找第一个有正常消息的文件
            if (this.isMappedFileMatchedRecover(mappedFile)) {
                log.info("recover from this mapped file " + mappedFile.getFileName());
                break;
            }
        }
        //如果小于0,就恢复所有 commit log,或者代表没有 commit log
        if (index < 0) {
            index = 0;
            mappedFile = mappedFiles.get(index);
        }
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        long processOffset = mappedFile.getFileFromOffset();
        long mappedFileOffset = 0;
        while (true) {
            //验证消息有效性
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            int size = dispatchRequest.getMsgSize();
            //如果消息有效
            if (dispatchRequest.isSuccess()) {
                if (size > 0) {
                    mappedFileOffset += size;
                    if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                        //如果允许消息重复转发,则需要判断当前消息是否消息偏移小于已确认的偏移,只有小于的进行重新分发
                        if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                            //重新分发消息,也就是更新 consume queue 和 index
                            this.defaultMessageStore.doDispatch(dispatchRequest);
                        }
                    } else {
                        //重新分发消息,也就是更新 consume queue 和 index
                        this.defaultMessageStore.doDispatch(dispatchRequest);
                    }
                }
                //大小为0代表已经读完,切换下一个文件
                else if (size == 0) {
                    index++;
                    if (index >= mappedFiles.size()) {
                        // The current branch under normal circumstances should
                        // not happen
                        log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
            } else {
                log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
                break;
            }
        }
        //更新偏移
        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        this.mappedFileQueue.truncateDirtyFiles(processOffset);
        //清理
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    }
    // Commitlog case files are deleted
    else {
        log.warn("The commitlog files are deleted, and delete the consume queue files");
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}

总结起来就是:

  • 首先,根据 abort 文件是否存在判断上次是否正常退出。
  • 对于正常退出的:
  • 扫描倒数三个文件,记录有效消息的偏移
  • 扫描到某个无效消息结束,或者扫描完整个文件
  • 设置最新偏移,同时根据这个偏移量清理 commit log 和 consume queue


  • 对于没有正常退出的:
  • 从最后一个文件开始,向前寻找第一个正常的可以恢复消息的文件
  • 从这个文件开始恢复并重发消息,因为里面的消息有成功写入过 consumer queue 以及 index 的,所以从这里恢复一定能保证最终一致性。但是会造成某些已经写入过 consumer queue 的消息再次写入,也就是重复消费。
  • 更新偏移,清理


数据库


基本上所有的数据库都会有 WAL 类似的设计,例如 MySQL 的 Innodb redo log 等等。


微信图片_20220624194148.jpg


微信图片_20220624194151.jpg


一致性存储


例如 ZK 还有 ETCD 这样的一致性中间件。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
18天前
|
存储 缓存 关系型数据库
图解MySQL【日志】——Redo Log
Redo Log(重做日志)是数据库中用于记录数据页修改的物理日志,确保事务的持久性和一致性。其主要作用包括崩溃恢复、提高性能和保证事务一致性。Redo Log 通过先写日志的方式,在内存中缓存修改操作,并在适当时候刷入磁盘,减少随机写入带来的性能损耗。WAL(Write-Ahead Logging)技术的核心思想是先将修改操作记录到日志文件中,再择机写入磁盘,从而实现高效且安全的数据持久化。Redo Log 的持久化过程涉及 Redo Log Buffer 和不同刷盘时机的控制参数(如 `innodb_flush_log_at_trx_commit`),以平衡性能与数据安全性。
27 5
图解MySQL【日志】——Redo Log
|
4月前
|
XML 安全 Java
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
本文介绍了Java日志框架的基本概念和使用方法,重点讨论了SLF4J、Log4j、Logback和Log4j2之间的关系及其性能对比。SLF4J作为一个日志抽象层,允许开发者使用统一的日志接口,而Log4j、Logback和Log4j2则是具体的日志实现框架。Log4j2在性能上优于Logback,推荐在新项目中使用。文章还详细说明了如何在Spring Boot项目中配置Log4j2和Logback,以及如何使用Lombok简化日志记录。最后,提供了一些日志配置的最佳实践,包括滚动日志、统一日志格式和提高日志性能的方法。
1154 31
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
|
3月前
|
监控 安全 Apache
什么是Apache日志?为什么Apache日志分析很重要?
Apache是全球广泛使用的Web服务器软件,支持超过30%的活跃网站。它通过接收和处理HTTP请求,与后端服务器通信,返回响应并记录日志,确保网页请求的快速准确处理。Apache日志分为访问日志和错误日志,对提升用户体验、保障安全及优化性能至关重要。EventLog Analyzer等工具可有效管理和分析这些日志,增强Web服务的安全性和可靠性。
|
1月前
|
存储 SQL 关系型数据库
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log、原理、写入过程;binlog与redolog区别、update语句的执行流程、两阶段提交、主从复制、三种日志的使用场景;查询日志、慢查询日志、错误日志等其他几类日志
105 35
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log
|
2月前
|
SQL 关系型数据库 MySQL
MySQL事务日志-Undo Log工作原理分析
事务的持久性是交由Redo Log来保证,原子性则是交由Undo Log来保证。如果事务中的SQL执行到一半出现错误,需要把前面已经执行过的SQL撤销以达到原子性的目的,这个过程也叫做"回滚",所以Undo Log也叫回滚日志。
119 7
MySQL事务日志-Undo Log工作原理分析
|
20天前
|
存储 关系型数据库 MySQL
图解MySQL【日志】——Undo Log
Undo Log(回滚日志)是 MySQL 中用于实现事务原子性和一致性的关键机制。在默认的自动提交模式下,MySQL 隐式开启事务,每条增删改语句都会记录到 Undo Log 中。其主要作用包括:
32 0
|
3月前
|
存储 监控 安全
什么是事件日志管理系统?事件日志管理系统有哪些用处?
事件日志管理系统是IT安全的重要工具,用于集中收集、分析和解释来自组织IT基础设施各组件的事件日志,如防火墙、路由器、交换机等,帮助提升网络安全、实现主动威胁检测和促进合规性。系统支持多种日志类型,包括Windows事件日志、Syslog日志和应用程序日志,通过实时监测、告警及可视化分析,为企业提供强大的安全保障。然而,实施过程中也面临数据量大、日志管理和分析复杂等挑战。EventLog Analyzer作为一款高效工具,不仅提供实时监测与告警、可视化分析和报告功能,还支持多种合规性报告,帮助企业克服挑战,提升网络安全水平。
131 2
|
3月前
|
存储 运维 数据可视化
如何为微服务实现分布式日志记录
如何为微服务实现分布式日志记录
163 1
|
4月前
|
存储 监控 安全
什么是日志管理,如何进行日志管理?
日志管理是对IT系统生成的日志数据进行收集、存储、分析和处理的实践,对维护系统健康、确保安全及获取运营智能至关重要。本文介绍了日志管理的基本概念、常见挑战、工具的主要功能及选择解决方案的方法,强调了定义管理目标、日志收集与分析、警报和报告、持续改进等关键步骤,以及如何应对数据量大、安全问题、警报疲劳等挑战,最终实现日志数据的有效管理和利用。
463 0
|
5月前
|
Python
log日志学习
【10月更文挑战第9天】 python处理log打印模块log的使用和介绍
120 0

热门文章

最新文章