通过Consul Raft库打造自己的分布式系统

简介: 通用的CP系统有etcd和consul, 通用的对立面就是专用系统. 所以在某些场合是有这种需求的.然而etcd embed的可用性极差, Windows上面跑会出现各种问题, 而且不能定制协议, 你必须得用etcd定义好的协议和客户端来和etcd集群通讯. 所以这时候的选择:忍着自己实现一个raft算法库, 在这上面做应用有一定的可能性, 起码MIT 6.824可以做出来, 但是和工业应用还是有很大的差距找一个工业级raft库, 然后在这上面做应用

通用的CP系统有etcd和consul, 通用的对立面就是专用系统. 所以在某些场合是有这种需求的.

然而etcd embed的可用性极差, Windows上面跑会出现各种问题, 而且不能定制协议, 你必须得用etcd定义好的协议和客户端来和etcd集群通讯. 所以这时候的选择:


  1. 忍着
  2. 自己实现一个raft算法库, 在这上面做应用

有一定的可能性, 起码MIT 6.824可以做出来, 但是和工业应用还是有很大的差距

  1. 找一个工业级raft库, 然后在这上面做应用


这时候到Raft Consensus Algorithm上面看看就能找到几个可选的Raft算法库, 例如braft, hashicorp/raft, lni/dragonboat.

但是呢, C++代码比较难写的, 所以就pass掉了braft. 就剩下consul raft和dragonboat.

本文就用consul raft做一个简单的KeyValue服务.

首先前端用的gin, 提供put/get/inc/delete几个接口, 三个接口都走raft状态机, 因为要支持多节点, 所以内部非leader节点就需要把请求转发给leader节点.

前端的代码类似于这样:


func (this *ApiService) Start() error {
        //转发请求给leader节点
    this.router.Use(this.proxyHandler())
    this.router.POST("/get", this.Get)
    this.router.POST("/put", this.Put)
    this.router.POST("/delete", this.Delete)
    this.router.POST("/inc", this.Inc)
    address := fmt.Sprintf(":%d", this.port)
    return this.router.Run(address)
}


请求都很简单, 就是直接把命令, 或者叫服务提供的原语塞到Raft状态机里面等候Raft状态Apply, 然后才能拿到结果(future/promise模式), 例如put命令:

func (this *ApiService) Put(ctx *gin.Context) {
    req := &Request{}
    if err := ctx.ShouldBindJSON(req); err != nil {
        ctx.JSON(http.StatusBadRequest, Response{
            Error: err.Error(),
        })
        return
    }
    result, err := this.raft.ApplyCommand(raft.CommandPut, req.Key, req.Value)
    if err != nil {
        ctx.JSON(http.StatusInternalServerError, Response{
            Error: err.Error(),
        })
        return
    }
    ctx.JSON(http.StatusOK, Response{
        Value: result.Value,
    })
}


前端还有一个转发请求到leader节点的拦截器(? 应该叫这个名字, 实际上是pipeline模式的一种)

func (this *ApiService) proxyHandler() gin.HandlerFunc {
    return func(context *gin.Context) {
        if this.raft.IsLeader() {
            context.Next()
        } else {
            leaderServiceAddress := this.raft.GetLeaderServiceAddress()
            if this.leaderServiceAddress != leaderServiceAddress {
                Director := func(req *http.Request) {
                    req.URL.Scheme = "http"
                    req.URL.Host = leaderServiceAddress
                }
                this.leaderProxy = &httputil.ReverseProxy{
                    Director: Director,
                }
                this.leaderServiceAddress = leaderServiceAddress
            }
            this.leaderProxy.ServeHTTP(context.Writer, context.Request)
            context.Abort()
        }
    }
}


下面是对协议的处理:

func (this *FSM) Apply(log *raft.Log) interface{} {
    result := &FSMApplyResult{
        Success: false,
    }
    t, cmd, err := raftLogToCommand(log)
    if err != nil {
        result.Error = err
        return result
    }
    binary.LittleEndian.PutUint64(keyCache, uint64(cmd.Key))
    binary.LittleEndian.PutUint64(valueCache, uint64(cmd.Value))
    switch t {
    case CommandPut:
        result.Success, result.Error = this.add(keyCache, valueCache)
    case CommandDelete:
        result.Success, result.Error = this.delete(keyCache)
    case CommandGet:
        result.Value, result.Error = this.get(keyCache)
    case CommandInc:
        result.Value, result.Error = this.inc(keyCache, cmd.Value)
    }
    return result
}


输入给Raft状态的命令实际上都是序列化好的, Raft状态机会自己把命令保存到Storage里面(可以是内存, 也可以是磁盘/DB等). 所以Apply命令的时候, 先对raft log进行解码, 然后switch去处理.

这边再看看例如inc的处理:

func (this *FSM) inc(key []byte, add int64) (int64, error) {
    var value int64 = 0
    err := this.db.Update(func(tx *bbolt.Tx) error {
        b, err := tx.CreateBucketIfNotExists(BBoltBucket)
        if err != nil {
            return err
        }
        valueBytes := b.Get(key)
        if len(valueBytes) != 8 {
            logging.Errorf("FSM.inc, key:%d, value length:%d, Reset",
                int64(binary.LittleEndian.Uint64(key)), len(valueBytes))
            valueBytes = make([]byte, 8)
        }
        value = int64(binary.LittleEndian.Uint64(valueBytes))
        value += add
        binary.LittleEndian.PutUint64(valueBytes, uint64(value))
        err = b.Put(key, valueBytes)
        return err
    })
    if err != nil {
        return -1, err
    }
    return value, err
}


这个指令稍微复杂一点, 需要先到db里面去找, 找到的话, 再加一个N, 然后存储, 然后返回新的值. 因为raft状态机apply log的时候, 是顺序的, 所以不需要加锁啥的, inc本身就是原子的.

至此一个简单的分布式KeyValue服务就实现, 而且还是一个CP系统.

当然这只是一个demo, 实际的应用远远比这个复杂, 本文只是提供一种思路.

不必非要把自己绑死在Etcd上, 条条大路通罗马. 如果你的系统只需要提供有限的操作原理, 那么是可以考虑Consul Raft或者DragonBoat来制作自定义协议的CP服务. 蚂蚁的SOFARaft也可以干这种事.


相关文章
|
6月前
|
消息中间件 算法 分布式数据库
Raft算法:分布式一致性领域的璀璨明珠
【4月更文挑战第21天】Raft算法是分布式一致性领域的明星,通过领导者选举、日志复制和安全性解决一致性问题。它将复杂问题简化,角色包括领导者、跟随者和候选者。领导者负责日志复制,确保多数节点同步。实现细节涉及超时机制、日志压缩和网络分区处理。广泛应用于分布式数据库、存储系统和消息队列,如Etcd、TiKV。其简洁高效的特点使其在分布式系统中备受青睐。
|
3月前
|
存储 算法 NoSQL
(七)漫谈分布式之一致性算法下篇:一文从根上儿理解大名鼎鼎的Raft共识算法!
Raft通过一致性检查,能在一定程度上保证集群的一致性,但无法保证所有情况下的一致性,毕竟分布式系统各种故障层出不穷,如何在有可能发生各类故障的分布式系统保证集群一致性,这才是Raft等一致性算法要真正解决的问题。
112 11
|
3月前
|
存储 算法 索引
(六)漫谈分布式之一致性算法上篇:用二十六张图一探Raft共识算法奥妙之处!
现如今,大多数分布式存储系统都投向了Raft算法的怀抱,而本文就来聊聊大名鼎鼎的Raft算法/协议!
114 8
|
4月前
|
算法 数据库 OceanBase
共识协议的技术变迁问题之Raft协议对分布式系统有什么贡献
共识协议的技术变迁问题之Raft协议对分布式系统有什么贡献
59 8
|
5月前
|
存储 算法 安全
程序员必知:分布式一致性Raft与JRaft
程序员必知:分布式一致性Raft与JRaft
54 0
|
6月前
|
算法 程序员 分布式数据库
分布式一致性必备:一文读懂Raft算法
Raft算法是一种用于分布式系统中复制日志一致性管理的算法。它通过选举领导者来协调日志复制,确保所有节点数据一致。算法包括心跳机制、选举过程、日志复制和一致性保证。当领导者失效时,节点会重新选举,保证高可用性。Raft易于理解和实现,提供强一致性,常用于分布式数据库和协调服务。作者小米分享了相关知识,鼓励对分布式系统感兴趣的读者进一步探索。
1310 0
|
6月前
|
算法 Go 分布式数据库
构建高可用的分布式数据库集群:使用Go语言与Raft共识算法
随着数据量的爆炸式增长,单一数据库服务器已难以满足高可用性和可扩展性的需求。在本文中,我们将探讨如何使用Go语言结合Raft共识算法来构建一个高可用的分布式数据库集群。我们不仅会介绍Raft算法的基本原理,还会详细阐述如何利用Go语言的并发特性和网络编程能力来实现这一目标。此外,我们还将分析构建过程中可能遇到的挑战和解决方案,为读者提供一个完整的实践指南。
|
6月前
|
存储 算法 前端开发
作者推荐 | 分布式协议之巅 — 揭秘基础Paxos与Raft协议如何实现分布式系统达成一致性(非变种Paxos协议)
作者推荐 | 分布式协议之巅 — 揭秘基础Paxos与Raft协议如何实现分布式系统达成一致性(非变种Paxos协议)
566 0
|
算法
分布式系统中的那些一致性(CAP、BASE、2PC、3PC、Paxos、ZAB、Raft)
本文介绍 CAP、BASE理论的正确理解、Paxos 算法如何保证一致性及死循环问题、ZAB 协议中原子广播及崩溃恢复以及 Raft 算法的动态演示。
273 0
|
6月前
|
人工智能 弹性计算 PyTorch
【Hello AI】安装和使用AIACC-ACSpeed-分布式训练场景的通信优化库
AIACC-ACSpeed专注于分布式训练场景的通信优化库,通过模块化的解耦优化设计,实现了分布式训练在兼容性、适用性和性能加速等方面的升级。本文为您介绍安装和使用AIACC-ACSpeed v1.1.0的方法。