Golang简单实现 分布式缓存+一致性哈希+节点再平衡(gossip + consistent + rebalance)

简介: Golang简单实现 分布式缓存+一致性哈希+节点再平衡(gossip + consistent + rebalance)

前言

Demo源码:Simple-Distributed-Cache

欢迎Star

代码较简单,分析源码仅分析重点

项目介绍

本项目刚开始实现了基于HTTP/REST的内存缓存服务,受限于HTTP协议解析,性能不高。而后实现了一个基于TCP的缓存服务提升性能,对于TCP来说,需要自己定义一套序列化规范来解析缓存的get,set和del三个操作,这里使用的是ABNF协议描述范式。

 项目实现的分布式缓存集群是同构集群,所有节点的功能完全相同,节点之间通过gossip协议来进行节点间通信,节点失效会在有限时间内扩散到整个集群。同时使用一致性哈希来计算负载均衡,当节点总数发生变化是,一致性哈希需要重新映射的key比传统哈希需要映射的key少的多。

 分布式系统的CAP理论中,这里无法满足C,无法保证一致性,前一秒set了键值对,可能下一秒存储该kv的节点发生故障,此时再去访问该key就无法保证一致性。所以在发生节点扩缩容时,我们需要进行节点再平衡,将需要被迁移的key赋值到对应的节点上。

使用介绍

注意我们使用HTTP/REST的时候,增删改查的操作并没有走一致性哈希。保留HTTP服务的定位是管理使用

#----------------基于HTTP----------------#
# 查看状态
curl 127.0.0.1:12345/status
# 插入一个kv
curl -v  127.0.0.1:12345/cache/testkey -XPUT -d testvalue
# 查看key的val
curl 127.0.0.1:12345/cache/testkey
# 查看状态
curl 127.0.0.1:12345/status
# 删除key
curl 127.0.0.1:12345/cache/testkey -XDELETE
# 查看状态
curl 127.0.0.1:12345/status
#----------------基于TCP----------------#
# 插入一个kv
../client/client.exe -c set -k testkey -v testvalue
# 查看key的val
../client/client.exe -c get -k testkey
# 查看状态
curl 127.0.0.1:12345/status
# 删除key
../client/client.exe -c del -k testkey
# 查看状态
curl 127.0.0.1:12345/status
#----------------集群----------------#
# 运行三台
go run main.go -n 10.29.1.1
go run main.go -n 10.29.1.2 -c 10.29.1.1
go run main.go -n 10.29.1.3 -c 10.29.1.2
# 查看集群节点列表
curl 10.29.1.1:12345/cluster
# 任选一台插入key,有些超过插入有些插入失败并返回重定向的地址
../client/client.exe -c set -k keya -v a -h 10.29.1.3
../client/client.exe -c set -k keyb -v b -h 10.29.1.3
../client/client.exe -c set -k keyc -v c -h 10.29.1.3
../client/client.exe -c set -k keyd -v d -h 10.29.1.3
../client/client.exe -c set -k keye -v e -h 10.29.1.3
../client/client.exe -c set -k keyf -v f -h 10.29.1.3
# 如果发生扩缩容,进行节点再平衡
curl 10.29.1.1:12345/rebalance -XPOST
# 只启动一台节点
go run main.go -n 10.29.1.1
# 插入100000条数据,那么这时100000条数据都是给10.29.1.1的
../cache-benchmark/cache-benchmark.exe -type tcp -n 100000 -d 1 --h 10.29.1.1
# 查看状态 count=100000
curl 10.29.1.1:12345/status
# 又启动一台 
go run main.go -n 10.29.1.2 -c 10.29.1.1
# 发送扩缩容,进行数据平衡
curl 10.29.1.1:12345/rebalance -XPOST
# 查看状态 count=50000左右
curl 10.29.1.1:12345/status
# 再启动一台
go run main.go -n 10.29.1.3 -c 10.29.1.2
# 发送扩缩容,进行数据平衡
curl 10.29.1.1:12345/rebalance -XPOST
# 查看状态 count=33000左右
curl 10.29.1.1:12345/status

分析源码

REST接口

func (s *Server) Listen() {
  //这里就是对一个map进行增删改查,并维护一个stat
  http.Handle("/cache/", s.cacheHandler())
  //返回上面维护的sata结构体
  http.Handle("/status", s.statusHandler())
  //consistent实现Members,以切片形式返回所有活跃节点的地址
  //m := h.Members()
  //bytes, err := json.Marshal(m)
  //w.Write(bytes)
  http.Handle("/cluster", s.clusterHandler())
  //最后介绍
  http.Handle("/rebalance", s.rebalanceHandler())
  http.ListenAndServe(s.Addr()+":12345", nil)
}

TCP字节流

for {
  conn, err := listener.Accept()
  if err != nil {
    log.Println(err)
  }
  //对每一个连接都开一个process的协程
  go s.process(conn)
}
//代码中忽略了细节,可能会与源码有出路
func (s *Server) process(conn net.Conn) {
  r := bufio.NewReader(conn)
  for {
    op, err := r.ReadByte()
    switch op {
    case 'S':
      err = s.set(conn, r)
    case 'G':
      err = s.get(conn, r)
    case 'D':
      err = s.del(conn, r)
  }
}
//下面以get为例,set和del原理一样
func (s *Server) get(conn net.Conn, r *bufio.Reader) error {
  key, err := s.readKey(r)
  if err != nil {
    return sendResponse(nil, err, conn)
  }
  val, err := s.Get(key)
  return sendResponse(val, err, conn)
}
//通过readKey对TCP字节流进行解析
func (s *Server) readKey(r *bufio.Reader) (string, error) {
  keyLen, err := readLen(r)
  key := make([]byte, keyLen)
  _, err = io.ReadFull(r, key)
  //这里进行一致性哈希运算,判断该key是否映射到该节点上
  //如果是则继续运行,如果不是则返回重定向到哪个节点
  jumpAddr, ok := s.ShouldProcess(string(key))
  if !ok {
    return "", errors.New("redirect " + jumpAddr)
  }
  return string(key), nil
}
//这里进行一致性哈希运算,判断该key是否映射到该节点上
func (n *node) ShouldProcess(key string) (string, bool) {
  addr, _ := n.Get(key)
  return addr, addr == n.addr
}

创建新节点加入到集群gossip

这里使用的是第三方库gossip

//创建新节点
node, err := cluster.New(*nodeAddr, *cls)
func New(addr, cluster string) (Node, error) {
  //创建gossip新节点的config
  config := memberlist.DefaultLANConfig()
  config.Name = addr
  config.BindAddr = addr
  config.LogOutput = ioutil.Discard
  //创建新节点
  mbl, err := memberlist.Create(config)
  if err != nil {
    return nil, err
  }
  if cluster == "" {
    cluster = addr
  }
  existing := []string{cluster}
  //连接到集群
  _, err = mbl.Join(existing)
  //创建一致性哈希的节点实例
  circle := consistent.New()
  //设置虚拟节点数量
  circle.NumberOfReplicas = 256
  go func() {
    for {
      //获取集群成员
      m := mbl.Members()
      nodes := make([]string, len(m))
      for i, n := range m {
        nodes[i] = n.Name
      }
      //每隔1s将集群节点列表m更新到circle中
      circle.Set(nodes)
      time.Sleep(time.Second)
    }
  }()
  return &node{circle, addr}, nil
}

一致性哈希consistent

这里使用的是第三方库consistent

如果想要学习一致性哈希算法,可以参考一致性哈希算法

//创建一致性哈希的节点实例
circle := consistent.New()
//设置虚拟节点数量
circle.NumberOfReplicas = 256
//每隔1s将集群节点列表m更新到circle中
circle.Set(nodes)
//这里进行一致性哈希运算,判断该key是否映射到该节点上
addr, _ := n.Get(key)

节点再平衡rebalance

func (h *rebalanceHandler) rebalance() {
  s := h.NewScanner()
  defer s.Close()
  client := &http.Client{}
  //遍历本节点所有的key
  for s.Scan() {
    k := s.Key()
    redirectAddr, ok := h.ShouldProcess(k)
    if !ok {//如果因为扩缩容,该key不再映射到本节点
      //则将key写入到对应的节点去
      r, _ := http.NewRequest(http.MethodPut, "http://"+redirectAddr+":12345/cache/"+k, bytes.NewReader(s.Value()))
      client.Do(r)
      h.Del(k)
    }
  }
}
func (c *inMemoryCache) NewScanner() Scanner {
  pairCh := make(chan *pair)
  closeCh := make(chan struct{})
  go func() {
    defer close(pairCh)
    c.mutex.RLock()
    for k, v := range c.cacheMap {
      c.mutex.RUnlock()
      select {
      case <-closeCh:
        return
      //从map中读取一个kv,并写入channel中
      case pairCh <- &pair{k, v}:
      }
      c.mutex.RLock()
    }
    c.mutex.RUnlock()
  }()
  return &inMemoryScanner{pair{}, pairCh, closeCh}
}
func (s *inMemoryScanner) Scan() bool {
  //从channel中读取一个kv
  p, ok := <-s.pairCh
  if ok {
    s.k, s.v = p.k, p.v
  }
  return ok
}


目录
相关文章
|
4月前
|
缓存 算法 NoSQL
【分布式详解】一致性算法、全局唯一ID、分布式锁、分布式事务、 分布式缓存、分布式任务、分布式会话
分布式系统通过副本控制协议,使得从系统外部读取系统内部各个副本的数据在一定的约束条件下相同,称之为副本一致性(consistency)。副本一致性是针对分布式系统而言的,不是针对某一个副本而言。强一致性(strong consistency):任何时刻任何用户或节点都可以读到最近一次成功更新的副本数据。强一致性是程度最高的一致性要求,也是实践中最难以实现的一致性。单调一致性(monotonic consistency):任何时刻,任何用户一旦读到某个数据在某次更新后的值,这个用户不会再读到比这个值更旧的值。
407 0
|
3月前
|
存储 缓存 算法
Golang高性能内存缓存库BigCache设计与分析
【2月更文挑战第4天】分析Golang高性能内存缓存库BigCache设计
73 0
|
3月前
|
缓存 负载均衡 算法
分布式系统设计理论之一致性哈希
分布式系统设计理论之一致性哈希
21 1
|
4月前
|
Java Go C++
Golang每日一练(leetDay0113) 奇偶链表、链表随机节点
Golang每日一练(leetDay0113) 奇偶链表、链表随机节点
33 0
Golang每日一练(leetDay0113) 奇偶链表、链表随机节点
|
4月前
|
Java Go C++
Golang每日一练(leetDay0096) 添加运算符、移动零
Golang每日一练(leetDay0096) 添加运算符、移动零
45 0
Golang每日一练(leetDay0096) 添加运算符、移动零
|
4月前
|
Java Go C++
Golang每日一练(leetDay0086) 回文链表、删除链表节点
Golang每日一练(leetDay0086) 回文链表、删除链表节点
22 0
Golang每日一练(leetDay0086) 回文链表、删除链表节点
|
4月前
|
算法 C++ Python
Golang每日一练(leetDay0043) 单词接龙、最长连续序列、根节点到叶节点数字之和
Golang每日一练(leetDay0043) 单词接龙、最长连续序列、根节点到叶节点数字之和
29 0
Golang每日一练(leetDay0043) 单词接龙、最长连续序列、根节点到叶节点数字之和
|
4月前
|
Go
golang力扣leetcode 2049.统计最高分的节点数目
golang力扣leetcode 2049.统计最高分的节点数目
19 0
|
4月前
|
Go
golang力扣leetcode 450.删除二叉搜索树中的节点
golang力扣leetcode 450.删除二叉搜索树中的节点
18 0
|
4月前
|
Go
golang力扣leetcode 24.两两交换链表中的节点
golang力扣leetcode 24.两两交换链表中的节点
15 0