Golang简单实现 分布式缓存+一致性哈希+节点再平衡(gossip + consistent + rebalance)

简介: Golang简单实现 分布式缓存+一致性哈希+节点再平衡(gossip + consistent + rebalance)

前言

Demo源码:Simple-Distributed-Cache

欢迎Star

代码较简单,分析源码仅分析重点

项目介绍

本项目刚开始实现了基于HTTP/REST的内存缓存服务,受限于HTTP协议解析,性能不高。而后实现了一个基于TCP的缓存服务提升性能,对于TCP来说,需要自己定义一套序列化规范来解析缓存的get,set和del三个操作,这里使用的是ABNF协议描述范式。

 项目实现的分布式缓存集群是同构集群,所有节点的功能完全相同,节点之间通过gossip协议来进行节点间通信,节点失效会在有限时间内扩散到整个集群。同时使用一致性哈希来计算负载均衡,当节点总数发生变化是,一致性哈希需要重新映射的key比传统哈希需要映射的key少的多。

 分布式系统的CAP理论中,这里无法满足C,无法保证一致性,前一秒set了键值对,可能下一秒存储该kv的节点发生故障,此时再去访问该key就无法保证一致性。所以在发生节点扩缩容时,我们需要进行节点再平衡,将需要被迁移的key赋值到对应的节点上。

使用介绍

注意我们使用HTTP/REST的时候,增删改查的操作并没有走一致性哈希。保留HTTP服务的定位是管理使用

#----------------基于HTTP----------------#
# 查看状态
curl 127.0.0.1:12345/status
# 插入一个kv
curl -v  127.0.0.1:12345/cache/testkey -XPUT -d testvalue
# 查看key的val
curl 127.0.0.1:12345/cache/testkey
# 查看状态
curl 127.0.0.1:12345/status
# 删除key
curl 127.0.0.1:12345/cache/testkey -XDELETE
# 查看状态
curl 127.0.0.1:12345/status
#----------------基于TCP----------------#
# 插入一个kv
../client/client.exe -c set -k testkey -v testvalue
# 查看key的val
../client/client.exe -c get -k testkey
# 查看状态
curl 127.0.0.1:12345/status
# 删除key
../client/client.exe -c del -k testkey
# 查看状态
curl 127.0.0.1:12345/status
#----------------集群----------------#
# 运行三台
go run main.go -n 10.29.1.1
go run main.go -n 10.29.1.2 -c 10.29.1.1
go run main.go -n 10.29.1.3 -c 10.29.1.2
# 查看集群节点列表
curl 10.29.1.1:12345/cluster
# 任选一台插入key,有些超过插入有些插入失败并返回重定向的地址
../client/client.exe -c set -k keya -v a -h 10.29.1.3
../client/client.exe -c set -k keyb -v b -h 10.29.1.3
../client/client.exe -c set -k keyc -v c -h 10.29.1.3
../client/client.exe -c set -k keyd -v d -h 10.29.1.3
../client/client.exe -c set -k keye -v e -h 10.29.1.3
../client/client.exe -c set -k keyf -v f -h 10.29.1.3
# 如果发生扩缩容,进行节点再平衡
curl 10.29.1.1:12345/rebalance -XPOST
# 只启动一台节点
go run main.go -n 10.29.1.1
# 插入100000条数据,那么这时100000条数据都是给10.29.1.1的
../cache-benchmark/cache-benchmark.exe -type tcp -n 100000 -d 1 --h 10.29.1.1
# 查看状态 count=100000
curl 10.29.1.1:12345/status
# 又启动一台 
go run main.go -n 10.29.1.2 -c 10.29.1.1
# 发送扩缩容,进行数据平衡
curl 10.29.1.1:12345/rebalance -XPOST
# 查看状态 count=50000左右
curl 10.29.1.1:12345/status
# 再启动一台
go run main.go -n 10.29.1.3 -c 10.29.1.2
# 发送扩缩容,进行数据平衡
curl 10.29.1.1:12345/rebalance -XPOST
# 查看状态 count=33000左右
curl 10.29.1.1:12345/status

分析源码

REST接口

func (s *Server) Listen() {
  //这里就是对一个map进行增删改查,并维护一个stat
  http.Handle("/cache/", s.cacheHandler())
  //返回上面维护的sata结构体
  http.Handle("/status", s.statusHandler())
  //consistent实现Members,以切片形式返回所有活跃节点的地址
  //m := h.Members()
  //bytes, err := json.Marshal(m)
  //w.Write(bytes)
  http.Handle("/cluster", s.clusterHandler())
  //最后介绍
  http.Handle("/rebalance", s.rebalanceHandler())
  http.ListenAndServe(s.Addr()+":12345", nil)
}

TCP字节流

for {
  conn, err := listener.Accept()
  if err != nil {
    log.Println(err)
  }
  //对每一个连接都开一个process的协程
  go s.process(conn)
}
//代码中忽略了细节,可能会与源码有出路
func (s *Server) process(conn net.Conn) {
  r := bufio.NewReader(conn)
  for {
    op, err := r.ReadByte()
    switch op {
    case 'S':
      err = s.set(conn, r)
    case 'G':
      err = s.get(conn, r)
    case 'D':
      err = s.del(conn, r)
  }
}
//下面以get为例,set和del原理一样
func (s *Server) get(conn net.Conn, r *bufio.Reader) error {
  key, err := s.readKey(r)
  if err != nil {
    return sendResponse(nil, err, conn)
  }
  val, err := s.Get(key)
  return sendResponse(val, err, conn)
}
//通过readKey对TCP字节流进行解析
func (s *Server) readKey(r *bufio.Reader) (string, error) {
  keyLen, err := readLen(r)
  key := make([]byte, keyLen)
  _, err = io.ReadFull(r, key)
  //这里进行一致性哈希运算,判断该key是否映射到该节点上
  //如果是则继续运行,如果不是则返回重定向到哪个节点
  jumpAddr, ok := s.ShouldProcess(string(key))
  if !ok {
    return "", errors.New("redirect " + jumpAddr)
  }
  return string(key), nil
}
//这里进行一致性哈希运算,判断该key是否映射到该节点上
func (n *node) ShouldProcess(key string) (string, bool) {
  addr, _ := n.Get(key)
  return addr, addr == n.addr
}

创建新节点加入到集群gossip

这里使用的是第三方库gossip

//创建新节点
node, err := cluster.New(*nodeAddr, *cls)
func New(addr, cluster string) (Node, error) {
  //创建gossip新节点的config
  config := memberlist.DefaultLANConfig()
  config.Name = addr
  config.BindAddr = addr
  config.LogOutput = ioutil.Discard
  //创建新节点
  mbl, err := memberlist.Create(config)
  if err != nil {
    return nil, err
  }
  if cluster == "" {
    cluster = addr
  }
  existing := []string{cluster}
  //连接到集群
  _, err = mbl.Join(existing)
  //创建一致性哈希的节点实例
  circle := consistent.New()
  //设置虚拟节点数量
  circle.NumberOfReplicas = 256
  go func() {
    for {
      //获取集群成员
      m := mbl.Members()
      nodes := make([]string, len(m))
      for i, n := range m {
        nodes[i] = n.Name
      }
      //每隔1s将集群节点列表m更新到circle中
      circle.Set(nodes)
      time.Sleep(time.Second)
    }
  }()
  return &node{circle, addr}, nil
}

一致性哈希consistent

这里使用的是第三方库consistent

如果想要学习一致性哈希算法,可以参考一致性哈希算法

//创建一致性哈希的节点实例
circle := consistent.New()
//设置虚拟节点数量
circle.NumberOfReplicas = 256
//每隔1s将集群节点列表m更新到circle中
circle.Set(nodes)
//这里进行一致性哈希运算,判断该key是否映射到该节点上
addr, _ := n.Get(key)

节点再平衡rebalance

func (h *rebalanceHandler) rebalance() {
  s := h.NewScanner()
  defer s.Close()
  client := &http.Client{}
  //遍历本节点所有的key
  for s.Scan() {
    k := s.Key()
    redirectAddr, ok := h.ShouldProcess(k)
    if !ok {//如果因为扩缩容,该key不再映射到本节点
      //则将key写入到对应的节点去
      r, _ := http.NewRequest(http.MethodPut, "http://"+redirectAddr+":12345/cache/"+k, bytes.NewReader(s.Value()))
      client.Do(r)
      h.Del(k)
    }
  }
}
func (c *inMemoryCache) NewScanner() Scanner {
  pairCh := make(chan *pair)
  closeCh := make(chan struct{})
  go func() {
    defer close(pairCh)
    c.mutex.RLock()
    for k, v := range c.cacheMap {
      c.mutex.RUnlock()
      select {
      case <-closeCh:
        return
      //从map中读取一个kv,并写入channel中
      case pairCh <- &pair{k, v}:
      }
      c.mutex.RLock()
    }
    c.mutex.RUnlock()
  }()
  return &inMemoryScanner{pair{}, pairCh, closeCh}
}
func (s *inMemoryScanner) Scan() bool {
  //从channel中读取一个kv
  p, ok := <-s.pairCh
  if ok {
    s.k, s.v = p.k, p.v
  }
  return ok
}


目录
相关文章
|
12天前
|
算法 调度
【孤岛划分】分布式能源接入弹性配电网模型研究【IEEE33节点】(Matlab代码实现)
【孤岛划分】分布式能源接入弹性配电网模型研究【IEEE33节点】(Matlab代码实现)
98 10
|
12天前
|
并行计算 算法 安全
【ADMM、碳排放】基于分布式ADMM算法的考虑碳排放交易的电力系统优化调度研究【IEEE6节点、IEEE30节点、IEEE118节点】(Matlab代码实现)
【ADMM、碳排放】基于分布式ADMM算法的考虑碳排放交易的电力系统优化调度研究【IEEE6节点、IEEE30节点、IEEE118节点】(Matlab代码实现)
|
24天前
|
算法 安全 新能源
基于DistFlow的含分布式电源配电网优化模型【IEEE39节点】(Python代码实现)
基于DistFlow的含分布式电源配电网优化模型【IEEE39节点】(Python代码实现)
|
5月前
|
安全 网络安全 数据库
YashanDB分布式节点间SSL连接配置
本文介绍YashanDB分布式节点间SSL连接配置方法,确保通信安全。需统一为整个集群配置SSL,使用相同根证书签名的服务器证书,否则可能导致连接失败或数据库无法启动。文章详细说明了使用OpenSSL生成根证书、服务器私钥、证书及DH文件的步骤,并指导如何将证书分发至各节点。最后,通过配置数据库参数(如`din_ssl_enable`)并重启集群完成设置。注意,证书过期需重新生成以保障安全性。
|
1月前
|
存储 并行计算 算法
【前推回代法】含有分布式电源的三相不平衡配电网潮流计算【IEEE33节点】(Matlab代码实现)
【前推回代法】含有分布式电源的三相不平衡配电网潮流计算【IEEE33节点】(Matlab代码实现)
|
4月前
|
监控 Linux 应用服务中间件
Linux多节点多硬盘部署MinIO:分布式MinIO集群部署指南搭建高可用架构实践
通过以上步骤,已成功基于已有的 MinIO 服务,扩展为一个 MinIO 集群。该集群具有高可用性和容错性,适合生产环境使用。如果有任何问题,请检查日志或参考MinIO 官方文档。作者联系方式vx:2743642415。
1402 57
|
9月前
|
存储 缓存 负载均衡
一致性哈希:解决分布式难题的神奇密钥
一致哈希是一种特殊的哈希算法,用于分布式系统中实现数据的高效、均衡分布。它通过将节点和数据映射到一个虚拟环上,确保在节点增减时只需重定位少量数据,从而提供良好的负载均衡、高扩展性和容错性。相比传统取模方法,一致性哈希能显著减少数据迁移成本,广泛应用于分布式缓存、存储、数据库及微服务架构中,有效提升系统的稳定性和性能。
577 1
|
11月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
203 1
|
11月前
|
消息中间件 缓存 算法
分布式系列第一弹:分布式一致性!
分布式系列第一弹:分布式一致性!
237 0
|
11月前
|
算法 Java 关系型数据库
漫谈分布式数据复制和一致性!
漫谈分布式数据复制和一致性!
142 0

热门文章

最新文章

推荐镜像

更多