源码分析 RocketMQ DLedger 多副本存储实现

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 源码分析 RocketMQ DLedger 多副本存储实现

RocketMQ DLedger 的存储实现思路与 RocketMQ 的存储实现思路相似,本文就不再从源码角度详细剖析其实现,只是点出其实现关键点。我们不妨简单回顾一下 CommitLog 文件、ConsumeQueue 文件设计思想。


其文件组成形式如下:

f9329668e66307b3fc687490d5dd18cc.jpg


正如上图所示,多个 commitlog 文件组成一个逻辑上的连续文件,使用 MappedFileQueue 表示,单个 commitlog 文件使用 MappedFile 表示。


温馨提示:如果想详细了解 RocketMQ 关于存储部分的讲解,可以关注笔者的《RocketMQ 技术内幕》一书。

1、DLedger 存储相关类图


14e02191d12f1cca81607dc95408f8fd.jpg


1.1 DLedgerStore


存储抽象类,定义如下核心方法:


  • public abstract DLedgerEntry appendAsLeader(DLedgerEntry entry)
    向主节点追加日志(数据)。
  • public abstract DLedgerEntry appendAsFollower(DLedgerEntry entry, long leaderTerm, String leaderId)
    向从节点同步日志。
  • public abstract DLedgerEntry get(Long index)
    根据日志下标查找日志。
  • public abstract long getCommittedIndex()
    获取已提交的下标。
  • public abstract long getLedgerEndTerm()
    获取 Leader 当前最大的投票轮次。
  • public abstract long getLedgerEndIndex()
    获取 Leader 下一条日志写入的下标。
  • public abstract long getLedgerBeginIndex()
    获取 Leader 第一条消息的下标。
  • public void updateCommittedIndex(long term, long committedIndex)
    更新commitedIndex的值,为空实现,由具体的存储子类实现。
  • protected void updateLedgerEndIndexAndTerm()
    更新 Leader 维护的 ledgerEndIndex 和 ledgerEndTerm 。
  • public void flush()
    刷写,空方法,由具体子类实现。
  • public long truncate(DLedgerEntry entry, long leaderTerm, String leaderId)
    删除日志,空方法,由具体子类实现。
  • public void startup()
    启动存储管理器,空方法,由具体子类实现。
  • public void shutdown()
    关闭存储管理器,空方法,由具体子类实现。


1.2 DLedgerMemoryStore


Dledger 基于内存实现的日志存储。


1.3  DLedgerMmapFileStore


基于文件内存映射机制的存储实现。其核心属性如下:


  • long ledgerBeginIndex =  -1
    日志的起始索引,默认为 -1。
    l- ong ledgerEndIndex = -1
    下一条日志下标,默认为 -1。
  • long committedIndex = -1
    已提交的日志索引。
  • long ledgerEndTerm
    当前最大的投票轮次。
  • DLedgerConfig dLedgerConfig
    DLedger 的配置信息。
  • MemberState memberState
    状态机。
  • MmapFileList dataFileList
    日志文件(数据文件)的内存映射Queue。
  • MmapFileList indexFileList
    索引文件的内存映射文件集合。(可对标 RocketMQ MappedFIleQueue )。
  • ThreadLocal< ByteBuffer> localIndexBuffer
    本地线程变量,用来缓存索引ByteBuffer。
  • ThreadLocal< ByteBuffer> localEntryBuffer
    本地线程变量,用来缓存数据索引ByteBuffer。
  • FlushDataService flushDataService
    数据文件刷盘线程。
  • CleanSpaceService cleanSpaceService
    清除过期日志文件线程。
  • boolean isDiskFull = false
    磁盘是否已满。
  • long lastCheckPointTimeMs
    上一次检测点(时间戳)。
  • AtomicBoolean hasLoaded
    是否已经加载,主要用来避免重复加载(初始化)日志文件。
  • AtomicBoolean hasRecovered
     是否已恢复。


2、DLedger 存储(对标 RocketMQ)


存储部分主要包含存储映射文件、消息存储格式、刷盘、文件加载与文件恢复、过期文件删除等,由于这些内容在 RocketMQ 存储部分都已详细介绍,故本文点到为止,其对应的参考映射如下:


14e02191d12f1cca81607dc95408f8fd.jpg

在 RocketMQ 中使用 MappedFile 来表示一个物理文件,而在 DLedger 中使用 DefaultMmapFIle 来表示一个物理文件。


在 RocketMQ 中使用 MappedFile 来表示多个物理文件(逻辑上连续),而在 DLedger 中则使用MmapFileList。


在 RocketMQ 中使用 DefaultMessageStore 来封装存储逻辑,而在 DLedger 中则使用DLedgerMmapFileStore来封装存储逻辑。


在 RocketMQ 中使用 Commitlog 的内部类 FlushCommitLogService 来实现 commitlog 文件的刷盘,而在 DLedger 中则使用 DLedgerMmapFileStore 其内部类 FlushDataService 来实现文件刷盘。


在 RocketMQ 中使用 DefaultMessageStore 的内部类 CleanCommitlogService 来实现 commitlog 过期文件的删除,而 DLedger 中则使用 DLedgerMmapFileStore$CleanSpaceService 来实现。


由于其实现原理相同,上述部分已经在《RocketMQ 技术内幕》第4章中详细剖析,故这里就不重复分析了。


3、DLedger 数据存储格式


61e02cd9996ef4c6e963efc144e71cd6.png

存储格式字段的含义如下:


  • magic
    魔数,4字节。
  • size
    条目总长度,包含 Header(协议头) + 消息体,占4字节。
  • entryIndex
    当前条目的 index,占8字节。
  • entryTerm
    当前条目所属的 投票轮次,占8字节。
  • pos
    该条目的物理偏移量,类似于 commitlog 文件的物理偏移量,占8字节。
  • channel
    保留字段,当前版本未使用,占4字节。
  • chain crc
    当前版本未使用,占4字节。
  • body crc
    body 的 CRC 校验和,用来区分数据是否损坏,占4字节。
  • body size
    用来存储 body 的长度,占4个字节。
  • body
    具体消息的内容。


源码参考点:DLedgerMmapFileStore#recover、DLedgerEntry、DLedgerEntryCoder。


4、DLedger 索引存储格式


860439b7a493c0dfd12ef4373ba350cd.png

即一个索引条目占32个字节。


5、思考


DLedger 存储相关就介绍到这里,为了与大家增加互动,特提出如下两个思考题,欢迎与作者互动,这些问题将在该系列的后面文章专题探讨。


1、DLedger 如果整合 RocketMQ 中的 commitlog 文件,使之支持多副本?


2、从老版本如何升级到新版本,需要考虑哪些因素呢?


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
6月前
|
消息中间件 存储 Kafka
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
|
6月前
|
消息中间件 存储 负载均衡
MetaQ/RocketMQ 原理问题之避免重复消费问题如何解决
MetaQ/RocketMQ 原理问题之避免重复消费问题如何解决
146 1
|
8月前
|
消息中间件 Java API
MQ产品使用合集之RocketMQ dledger集群模式的dledgerpeers端口是集群之间通讯吗
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 中间件 Kafka
RocketMQ源码(一)RocketMQ消息生产及消费通信链路源码分析
**RocketMQ**的核心架构主要分为Broker、Producer、Consumer,通过阅读源码看到他们之间是通过Netty来通信的 ,具体来说Broker端是**Netty服务器**用来负责与客户端的连接请求处理,而Producer/Consumer端是**Netty客户端**用来负责与Netty服务器的通信及请求响应处理。
197 1
|
消息中间件 存储 负载均衡
RocketMQ 源码分析——NameServer
- 编写优雅、高效的代码。RocketMQ作为阿里双十一交易核心链路产品,支撑千万级并发、万亿级数据洪峰。读源码可以积累编写高效、优雅代码的经验。 - 提升微观的架构设计能力,重点在思维和理念。Apache RocketMQ作为Apache顶级项目,它的架构设计是值得大家借鉴的。 - 解决工作中、学习中的各种疑难杂症。在使用RocketMQ过程中遇到消费卡死、卡顿等问题可以通过阅读源码的方式找到问题并给予解决。 - 在BATJ一线互联网公司面试中展现优秀的自己。大厂面试中,尤其是阿里系的公司,你有RocketMQ源码体系化知识,必定是一个很大的加分项。
224 0
|
消息中间件 存储 Kafka
RocketMQ 源码分析——Broker
1. Broker启动流程分析 2. 消息存储设计 3. 消息写入流程 4. 亮点分析:NRS与NRC的功能号设计 5. 亮点分析:同步双写数倍性能提升的CompletableFuture 6. 亮点分析:Commitlog写入时使用可重入锁还是自旋锁? 7. 亮点分析:零拷贝技术之MMAP提升文件读写性能 8. 亮点分析:堆外内存机制
340 0
|
存储 消息中间件 缓存
RocketMQ 多级存储设计与实现
RocketMQ 多级存储设计与实现
511 0
RocketMQ 多级存储设计与实现
|
消息中间件 运维 监控
RocketMq-dashboard:topic 5min trend 原理和源码分析(一)
RocketMq-dashboard:topic 5min trend 原理和源码分析(一)
423 0