SOFAJRaft-RheaKV MULTI-RAFT-GROUP 实现分析 | SOFAJRaft 实现原理

简介: 《剖析 | SOFAJRaft 实现原理》第五篇,本文将从 RheaKV 是如何利用 MULTI-RAFT-GROUP 的方式实现 RheaKV 的高性能及容量的可扩展性的,从而进行全面的源码、实例剖析。欢迎阅读~

SOFAStack
Scalable Open Financial  Architecture Stack
是蚂蚁金服自主研发的金融级分布式架构,包含了构建金融级云原生架构所需的各个组件,是在金融场景里锤炼出来的最佳实践。

SOFAJRaft#5

SOFAJRaft 是一个基于 Raft 一致性算法的生产级高性能 Java 实现,支持 MULTI-RAFT-GROUP,适用于高负载低延迟的场景。

本文为《剖析 | SOFAJRaft 实现原理》第五篇,本篇作者袖扣,来自蚂蚁金服。

《剖析 | SOFAJRaft 实现原理》系列由 SOFA 团队和源码爱好者们出品,项目代号:<SOFA:JRaftLab/>,文章尾部有参与方式,欢迎同样对源码热情的你加入。

SOFAJRaft :https://github.com/sofastack/sofa-jraft

前言

RheaKV 是首个以 JRaft 为基础实现的一个原生支持分布式的嵌入式键值(key、value)数据库,现在本文将从 RheaKV 是如何利用 MULTI-RAFT-GROUP 的方式实现 RheaKV 的高性能及容量的可扩展性的,从而进行全面的源码、实例剖析。

MULTI-RAFT-GROUP

通过对 Raft 协议的描述我们知道:用户在对一组 Raft 系统进行更新操作时必须先经过 Leader,再由 Leader 同步给大多数 Follower。而在实际运用中,一组 Raft 的 Leader 往往存在单点的流量瓶颈,流量高便无法承载,同时每个节点都是全量数据,所以会受到节点的存储限制而导致容量瓶颈,无法扩展。

MULTI-RAFT-GROUP 正是通过把整个数据从横向做切分,分为多个 Region 来解决磁盘瓶颈,然后每个 Region 都对应有独立的 Leader 和一个或多个 Follower 的 Raft 组进行横向扩展,此时系统便有多个写入的节点,从而分担写入压力,图如下:

multi-raft-group-开始图

此时磁盘及 I/O 瓶颈解决了,那多个 Raft Group 是如何协作的呢,我们接着往下看。

选举及复制

RheaKV 主要由 3 个角色组成:PlacementDriver(以下成为 PD) 、Store、Region。由于 RheaKV 支持多组 Raft,所以比单组场景多出一个 PD 角色,用来调度以及收集每个 Store 及 Region 的基础信息。

raft 相关图

PlacementDriver

PD 负责整个集群的管理调度、Region ID 生成等。此组件非必须的,如果不使用 PD,设置 PlacementDriverOptions 的 fake 属性为 true 即可。PD 一般通过 Region 的心跳返回信息进行对 Region 调度,Region 处理完后,PD 则会在下一个心跳返回中收到 Region 的变更信息来更新路由及状态表。

Store

通常一个 Node 负责一个 Store,Store 可以被看作是 Region 的容器,里面存储着多个分片数据。Store 会向 PD 主动上报 StoreHeartbeatRequest 心跳,心跳交由 PD 的 handleStoreHeartbeat 处理,里面包含该 Store 的基本信息,比如,包含多少 Region,有哪些 Region 的 Leader  在该 Store 等。

Region

Region 是数据存储、搬迁的最小单元,对应的是 Store 里某个实际的数据区间。每个 Region 会有多个副本,每个副本存储在不同的 Store,一起组成一个Raft Group。Region 中的 Leader 会向 PD 主动上报 RegionHeartbeatRequest 心跳,交由 PD 的 handleRegionHeartbeat 处理,而 PD 是通过 Region 的 Epoch 感知 Region 是否有变化。

RegionRouteTable 路由表组件

Muti-Raft-Group 的多 Region 是通过 RegionRouteTable 路由表组件进行管理的,可通过 addOrUpdateRegion、removeRegion 进行添加、更新、移除 Region,也包括 Region 的拆分。目前暂时还未实现 Region 的聚合,后面会考虑实现。

分区逻辑与算法 Shard

分区逻辑与算法 Shard

“让每组 Raft 负责一部分数据。”

数据分区或者分片算法通常就是 Range 和 Hash,RheaKV 是通过 Range 进行数据分片的,分成一个个 Raft Group,也称为 Region。这里为何要设计成 Range 呢?原因是 Range 切分是按照对 Key 进行字节排序后再做每段每段切分,像类似 scan 等操作对相近 key 的查询会尽可能集中在某个 Region,这个是 Hash 无法支持的,就算遇到单个 Region 的拆分也会更好处理一些,只用修改部分元数据,不会涉及到大范围的数据挪动。

当然 Range 也会有一个问题那就是,可能会存在某个 Region 被频繁操作成为热点 Region。不过也有一些优化方案,比如 PD 调度热点 Region 到更空闲的机器上,或者提供 Follower 分担读的压力等。

Region 和 RegionEpoch 结构如下:

class Region {
        long              id;            // region id
    // Region key range [startKey, endKey)
        byte[]            startKey;      // inclusive
        byte[]            endKey;        // exclusive
        RegionEpoch       regionEpoch;   // region term
        List<Peer>        peers;         // all peers in the region
}
class RegionEpoch {
        // Conf change version, auto increment when add or remove peer
      long              confVer;
      // Region version, auto increment when split or merge
      long              version;
}
class Peer {
        long              id;
    long              storeId;
    Endpoint          endpoint;
}

Region.id:为 Region 的唯一标识,通过 PD 全局唯一分配。

Region.startKey、Region.endKey:这个表示的是 Region 的 key 的区间范围 [startKey, endKey),特别值得注意的是针对最开始 Region 的 startKey,和最后 Region 的 endKey 都为空。

Region.regionEpoch:当 Region 添加和删除 Peer,或者 split 等,此时 regionEpoch 就会发生变化,其中 confVer 会在配置修改后递增,version 则是每次有 split 、merge(还未实现)等操作时递增。

Region.peers:peers 则指的是当前 Region 所包含的节点信息,Peer.id 也是由 PD 全局分配的,Peer.storeId 代表的是 Peer 当前所处的 Store。

读与写 Read / Write

由于数据被拆分到不同 Region 上,所以在进行多 key 的读、写、更新操作时需要操作多个 Region,这时操作前我们需要得到具体的 Region,然后再单独对不同 Region 进行操作。我们以在多 Region上 scan 操作为例, 目标是返回某个 key 区间的所有数据: 

  1. 我们首先看 scan 方法的核心调用方法 internalScan 的异步实现:

例如:com.alipay.sofa.jraft.rhea.client.DefaultRheaKVStore#scan(byte[], byte[], boolean, boolean)

internalScan 的异步实现

我们很容易看到,在调用 scan 首先让 PD Client 通过 RegionRouteTable.findRegionsByKeyRange 检索 startKey、endKey 所覆盖的 Region,最后返回的可能为多个 Region,具体 Region 覆盖检索方法如下:

具体 Region 覆盖检索方法

检索相关变量定义如下:

检索相关变量定义

我们可以看到整个 RheaKV 的 range 路由表是通过 TreeMap 的进行存储的,正呼应我们前面讲过所有的 key 是通过对应字节进行排序存储。对应的 Value 为该 Region 的 RegionId,随后我们通过 Region 路由 regionTable 查出即可。

现在我们得到 scan 覆盖到的所有 Region:List<Region> 在循环查询中我们看到有一个“retryCause -> {}”的 Lambda 表达式很容易看出这里是加持异常重试处理,后面我们会讲到,接下来会通过 internalRegionScan 查询每个 Region 的结果。具体源码如下:

查询每个 Region 的结果

这里也同样有一个重试处理,可以看到代码中根据当前是否为 Region 节点来决定是本机查询还是通过RPC进行查询,如果是本机则调用 rawKVStore.scan() 进行本地直接查询,反之通过 rheaKVRpcService 进行 RPC 远程节点查询。最后每个 Region 查询都返回为一个 future,通过 FutureHelper.joinList 工具类 CompletableFuture.allOf 异步并发返回结果 List<KVEntry>

  1. 我们再看看写入具体流程。相比 scan 读,put 写相对比较简单,只需要针对 key 计算出对应 Region 再进行存储即可,我们可以看一个异步 put 的例子。

例如:com.alipay.sofa.jraft.rhea.client.DefaultRheaKVStore#put(java.lang.String, byte[])

异步 put

我们可以发现 put 基础方法是支持 batch 的,即可成批提交。如未使用 batch 即直接提交,具体逻辑如下:

未使用 batch 即直接提交

通过 pdClinet 查询对应存储的 Region,并且通过 regionId 拿到 RegionEngine,再通过对应存储引擎 KVStore 进行 put,整个过程同样支持重试机制。我们再回过去看看 batch 的实现,很容易发现利用到了 Disruptor 的 RingBuffer 环形缓冲区,无锁队列为性能提供了保障,代码现场如下:

无锁队列为性能提供了保障

Split / Merge

  1. 什么时候 Region 会拆分?

前面我们有讲过,PD 会在 Region 的 heartBeat 里面对 Region 进行调度,当某个 Region 里的 keys 数量超过预设阀值,我们即可对该 Region 进行拆分,Store 的状态机 KVStoreStateMachine 即收到拆分消息进行拆分处理。具体拆分源码如下:

KVStoreStateMachine.doSplit 源码如下:

KVStoreStateMachine.doSplit 源码

StoreEngine.doSplit 源码如下:

StoreEngine.doSplit 源码

我们可以轻易的看到从原始 parentRegion 切分成 region 和 pRegion,并重设了 startKey、endKey 和版本号,并添加到 RegionEngineTable 注册到 RegionKVService,同时调用 pdClient.getRegionRouteTable().splitRegion() 方法进行更新存储在 PD 的 Region 路由表。

  1. 什么时候需要对 Region 进行合并?

既然数据过多需要进行拆分,那 Region 进行合并那就肯定是 2 个或者多个连续的 Region 数据量明显小于绝大多数 Region 容量则我们可以对其进行合并。这一块后面会考虑实现。

RegionKVService 结构及实现分析

StoreEngine

通过上面我们知道,一个 Store 即为一个节点,里面包含着一个或者多个 RegionEngine,一个 StoreEngine 通常通过 PlacementDriverClient 对 PD 进行调用,同时拥有 StoreEngineOptions 配置项,里面配置着存储引擎和节点相关配置。

  1. 我们以默认的 DefaultRheaKVStore 加载 StoreEngine 为例,DefaultRheaKVStore 实现了 RheaKVStore 接口的基础功能,从最开始 init 方法,根据 RheaKVStoreOptions 加载了 pdClinet 实例,随后加载 storeEngine。
  2. 在 StoreEngine 启动的时候,首先会去加载对应的 StoreEngineOptions 配置,构建对应的 Store 配置,并且生成一致性读的线程池 readIndexExecutor、快照线程池 snapshotExecutor、RPC 的线程池 cliRpcExecutor、Raft 的 RPC 线程池 raftRpcExecutor,以及存储 RPC 线程池 kvRpcExecutor、心跳发送器 HeartbeatSender 等,如果打开代码,我们还能看到 metricsReportPeriod,打开配置可以进行性能指标监控。
  3. 在 DefaultRheaKVStore 加载完所有工序之后,便可使用 get、set、scan 等操作,还包含对应同步、异步操作。

在这个过程中里面的 StoreEngine 会记录着 regionKVServiceTable、regionEngineTable,它们分别掌握着具体每个不同的 Region 存储的操作功能,对应的 key 即为 RegionId。

RegionEngine

每个在 Store 里的 Region 副本中,RegionEngine 则是一个执行单元。它里面记录着关联着的 StoreEngine 信息以及对应的 Region 信息。由于它也是一个选举节点,所以也包含着对应状态机 KVStoreStateMachine,以及对应的 RaftGroupService,并启动里面的 RpcServer 进行选举同步。

这个里面有个transferLeadershipTo方法,这个可被调用用于平衡当前节点分区的Leader,避免压力重叠。

DefaultRegionKVService 是 RegionKVService 的默认实现类,主要处理对 Region 的具体操作。

RheaKV FailoverClosure 解读

需要特别讲到的是,在具体的 RheaKV 操作时,FailoverClosure 担任着比较重要的角色,也给整个系统增加了一定的容错性。假如在一次 scan 操作中,如果跨 Store 需要多节点 scan 数据的时候,任何网络抖动都会造成数据不完整或者失败情况,所以允许一定次数的重试有利于提高系统的可用性,但是重试次数不宜过高,如果出现网络堵塞,多次 timeout 级别失败会给系统带来额外的压力。这里只需要在 DefaultRheaKVStore 中,进行配置 failoverRetries 设置次数即可。

RheaKV PD 之 PlacementDriverClient 

PlacementDriverClient 接口主要由 AbstractPlacementDriverClient 实现,然后 FakePlacementDriverClient、RemotePlacementDriverClient 为主要功能。FakePlacementDriverClient 是当系统不需要 PD 的时候进行 PD 对象的模拟,这里主要讲到 RemotePlacementDriverClient。

  1. RemotePlacementDriverClient 通过PlacementDriverOptions 进行加载,并根据基础配置刷新路由表;
  2. RemotePlacementDriverClient 承担着对路由表RegionRouteTable 的管控,例如获取Store、路由、Leader节点信息等;
  3. RemotePlacementDriverClient 还包含着 CliService,通过 CliService 外部可对复制节点进行操作运维,如addReplica、removeReplica、transferLeader。

总结

由于很多传统存储中间件并不原生支持分布式,所以一直少有体感,Raft 协议是一套比较比较好理解的共识协议,SOFAJRaft 通俗易懂是一个非常好的代码和工程范例,同时 RheaKV 也是一套非常轻量化支持多存储结构可分片的嵌入式数据库。写一篇代码分析文章也是一个学习和进步的过程,由此我们也可以窥探到了一些数据库的基础实现,祝愿社区能在 SOFAJRaft / RheaKV 基础上构建更加灵活和自治理的系统和应用。

SOFAJRaft 源码解析系列阅读

目录
相关文章
|
11月前
|
前端开发 安全 Java
2025春招,Spring 面试题汇总
本文详细整理了2025年春招必备的Spring面试题,分为基础和高级两大部分,帮助求职者全面掌握Spring相关知识点,结合实际项目经验,提升面试成功率。内容涉及Spring框架、AOP、事务管理、数据库集成、Spring Boot、Spring Security、微服务架构等,助力你在春招中脱颖而出。
2232 0
|
机器学习/深度学习 人工智能 算法
未来已来:探索量子计算在Web开发中的应用
在这篇文章中,我们将穿越技术的迷雾,一窥未来。量子计算,这一曾经只存在于理论中的技术,正逐渐走近现实,它的革命性潜力正在被探索其在Web开发中的潜在应用。本文将带你了解量子计算的基本概念,以及它可能如何重塑我们构建和交互Web应用的方式。准备好,让我们的想象力随着量子比特一起跳跃。
|
10月前
|
存储 运维 监控
首次!阿里云可观测技术论文登上两大国际顶会
首次!阿里云可观测技术论文登上两大国际顶会
242 1
|
监控 负载均衡 分布式数据库
Region 的分裂和合并是如何影响 Region 性能的
Region 的分裂和合并是如何影响 Region 性能的
|
12月前
|
存储 弹性计算 运维
端到端的ECS可观测性方案,助力云上业务安全稳定
本文介绍了云原生时代保障业务系统可靠性的方法和挑战,重点探讨了阿里云ECS在提升业务稳定性、性能监控及自动化恢复方面的能力。文章分为以下几个部分:首先,阐述了业务可靠性的三个阶段(事前预防、事中处理、事后跟进);其次,分析了云上业务系统面临的困难与挑战,并提出了通过更实时的监测和自动化工具有效规避风险;接着,详细描述了ECS实例稳定性和性能问题的解决方案;然后,介绍了即将发布的ECS Lens产品,它将全面提升云上业务的洞察能力和异常感知能力;最后,通过具体案例展示了如何利用OS自动重启和公网带宽自适应调节等功能确保业务连续性。总结部分强调了ECS致力于增强性能和稳定性的目标。
|
Kubernetes 负载均衡 Docker
【专栏】构建高效微服务架构:Docker和Kubernetes在构建微服务架构中的应用
【4月更文挑战第27天】本文介绍了Docker和Kubernetes在构建微服务架构中的应用。Docker是开源容器引擎,用于打包和分发应用,实现隔离和封装,提升可扩展性和可维护性。Kubernetes是容器编排平台,自动化部署、扩展和管理容器,提供负载均衡和故障转移。二者结合,能高效支持微服务架构。文中通过实例展示了如何将用户、商品和订单服务用Docker打包,再用Kubernetes部署和管理,确保微服务稳定运行。
427 4
|
缓存 负载均衡 网络协议
Linux的TCP连接数量与百万千万并发应对策略
【8月更文挑战第15天】在Linux系统中,关于TCP连接数量的一个常见误解是认为其最大不能超过65535个。这一数字实际上是TCP端口号的上限,而非TCP连接数的直接限制。实际上,Linux服务器能够处理的TCP连接数远远超过这一数字,关键在于理解TCP连接的标识方式、系统配置优化以及应用架构设计。
1333 2
|
人工智能 Cloud Native 调度
阿里云容器服务在AI智算场景的创新与实践
2024年云栖大会,我们总结过往支持AI智算基础底座的实践经验、发现与思考,给出《容器服务在AI智算场景的创新与实践》的演讲。不仅希望将所做所想与客户和社区分享,也期待引出更多云原生AI领域的交流和共建。
|
前端开发 Java API
阿里云百炼模型入门篇-大语言模型
本文主要介绍如何快速的通过阿里云百炼,带你如何快速入门通义千问系列大语言模型。
2684 6
|
消息中间件 缓存 负载均衡
这些年背过的面试题——分布式篇
本文是技术人面试系列分布式篇,面试中关于分布式都需要了解哪些基础?一文带你详细了解,欢迎收藏!
这些年背过的面试题——分布式篇

热门文章

最新文章