前言
Demo源码:Simple-Distributed-Cache
欢迎Star
代码较简单,分析源码仅分析重点
项目介绍
本项目刚开始实现了基于HTTP/REST的内存缓存服务,受限于HTTP协议解析,性能不高。而后实现了一个基于TCP的缓存服务提升性能,对于TCP来说,需要自己定义一套序列化规范来解析缓存的get,set和del三个操作,这里使用的是ABNF协议描述范式。
项目实现的分布式缓存集群是同构集群,所有节点的功能完全相同,节点之间通过gossip协议来进行节点间通信,节点失效会在有限时间内扩散到整个集群。同时使用一致性哈希来计算负载均衡,当节点总数发生变化是,一致性哈希需要重新映射的key比传统哈希需要映射的key少的多。
分布式系统的CAP理论中,这里无法满足C,无法保证一致性,前一秒set了键值对,可能下一秒存储该kv的节点发生故障,此时再去访问该key就无法保证一致性。所以在发生节点扩缩容时,我们需要进行节点再平衡,将需要被迁移的key赋值到对应的节点上。
使用介绍
注意我们使用HTTP/REST的时候,增删改查的操作并没有走一致性哈希。保留HTTP服务的定位是管理使用
#----------------基于HTTP----------------# # 查看状态 curl 127.0.0.1:12345/status # 插入一个kv curl -v 127.0.0.1:12345/cache/testkey -XPUT -d testvalue # 查看key的val curl 127.0.0.1:12345/cache/testkey # 查看状态 curl 127.0.0.1:12345/status # 删除key curl 127.0.0.1:12345/cache/testkey -XDELETE # 查看状态 curl 127.0.0.1:12345/status
#----------------基于TCP----------------# # 插入一个kv ../client/client.exe -c set -k testkey -v testvalue # 查看key的val ../client/client.exe -c get -k testkey # 查看状态 curl 127.0.0.1:12345/status # 删除key ../client/client.exe -c del -k testkey # 查看状态 curl 127.0.0.1:12345/status
#----------------集群----------------# # 运行三台 go run main.go -n 10.29.1.1 go run main.go -n 10.29.1.2 -c 10.29.1.1 go run main.go -n 10.29.1.3 -c 10.29.1.2 # 查看集群节点列表 curl 10.29.1.1:12345/cluster # 任选一台插入key,有些超过插入有些插入失败并返回重定向的地址 ../client/client.exe -c set -k keya -v a -h 10.29.1.3 ../client/client.exe -c set -k keyb -v b -h 10.29.1.3 ../client/client.exe -c set -k keyc -v c -h 10.29.1.3 ../client/client.exe -c set -k keyd -v d -h 10.29.1.3 ../client/client.exe -c set -k keye -v e -h 10.29.1.3 ../client/client.exe -c set -k keyf -v f -h 10.29.1.3 # 如果发生扩缩容,进行节点再平衡 curl 10.29.1.1:12345/rebalance -XPOST
# 只启动一台节点 go run main.go -n 10.29.1.1 # 插入100000条数据,那么这时100000条数据都是给10.29.1.1的 ../cache-benchmark/cache-benchmark.exe -type tcp -n 100000 -d 1 --h 10.29.1.1 # 查看状态 count=100000 curl 10.29.1.1:12345/status # 又启动一台 go run main.go -n 10.29.1.2 -c 10.29.1.1 # 发送扩缩容,进行数据平衡 curl 10.29.1.1:12345/rebalance -XPOST # 查看状态 count=50000左右 curl 10.29.1.1:12345/status # 再启动一台 go run main.go -n 10.29.1.3 -c 10.29.1.2 # 发送扩缩容,进行数据平衡 curl 10.29.1.1:12345/rebalance -XPOST # 查看状态 count=33000左右 curl 10.29.1.1:12345/status
分析源码
REST接口
func (s *Server) Listen() { //这里就是对一个map进行增删改查,并维护一个stat http.Handle("/cache/", s.cacheHandler()) //返回上面维护的sata结构体 http.Handle("/status", s.statusHandler()) //consistent实现Members,以切片形式返回所有活跃节点的地址 //m := h.Members() //bytes, err := json.Marshal(m) //w.Write(bytes) http.Handle("/cluster", s.clusterHandler()) //最后介绍 http.Handle("/rebalance", s.rebalanceHandler()) http.ListenAndServe(s.Addr()+":12345", nil) }
TCP字节流
for { conn, err := listener.Accept() if err != nil { log.Println(err) } //对每一个连接都开一个process的协程 go s.process(conn) } //代码中忽略了细节,可能会与源码有出路 func (s *Server) process(conn net.Conn) { r := bufio.NewReader(conn) for { op, err := r.ReadByte() switch op { case 'S': err = s.set(conn, r) case 'G': err = s.get(conn, r) case 'D': err = s.del(conn, r) } } //下面以get为例,set和del原理一样 func (s *Server) get(conn net.Conn, r *bufio.Reader) error { key, err := s.readKey(r) if err != nil { return sendResponse(nil, err, conn) } val, err := s.Get(key) return sendResponse(val, err, conn) } //通过readKey对TCP字节流进行解析 func (s *Server) readKey(r *bufio.Reader) (string, error) { keyLen, err := readLen(r) key := make([]byte, keyLen) _, err = io.ReadFull(r, key) //这里进行一致性哈希运算,判断该key是否映射到该节点上 //如果是则继续运行,如果不是则返回重定向到哪个节点 jumpAddr, ok := s.ShouldProcess(string(key)) if !ok { return "", errors.New("redirect " + jumpAddr) } return string(key), nil } //这里进行一致性哈希运算,判断该key是否映射到该节点上 func (n *node) ShouldProcess(key string) (string, bool) { addr, _ := n.Get(key) return addr, addr == n.addr }
创建新节点加入到集群gossip
这里使用的是第三方库gossip
//创建新节点 node, err := cluster.New(*nodeAddr, *cls) func New(addr, cluster string) (Node, error) { //创建gossip新节点的config config := memberlist.DefaultLANConfig() config.Name = addr config.BindAddr = addr config.LogOutput = ioutil.Discard //创建新节点 mbl, err := memberlist.Create(config) if err != nil { return nil, err } if cluster == "" { cluster = addr } existing := []string{cluster} //连接到集群 _, err = mbl.Join(existing) //创建一致性哈希的节点实例 circle := consistent.New() //设置虚拟节点数量 circle.NumberOfReplicas = 256 go func() { for { //获取集群成员 m := mbl.Members() nodes := make([]string, len(m)) for i, n := range m { nodes[i] = n.Name } //每隔1s将集群节点列表m更新到circle中 circle.Set(nodes) time.Sleep(time.Second) } }() return &node{circle, addr}, nil }
一致性哈希consistent
这里使用的是第三方库consistent
如果想要学习一致性哈希算法,可以参考一致性哈希算法
//创建一致性哈希的节点实例 circle := consistent.New() //设置虚拟节点数量 circle.NumberOfReplicas = 256 //每隔1s将集群节点列表m更新到circle中 circle.Set(nodes) //这里进行一致性哈希运算,判断该key是否映射到该节点上 addr, _ := n.Get(key)
节点再平衡rebalance
func (h *rebalanceHandler) rebalance() { s := h.NewScanner() defer s.Close() client := &http.Client{} //遍历本节点所有的key for s.Scan() { k := s.Key() redirectAddr, ok := h.ShouldProcess(k) if !ok {//如果因为扩缩容,该key不再映射到本节点 //则将key写入到对应的节点去 r, _ := http.NewRequest(http.MethodPut, "http://"+redirectAddr+":12345/cache/"+k, bytes.NewReader(s.Value())) client.Do(r) h.Del(k) } } }
func (c *inMemoryCache) NewScanner() Scanner { pairCh := make(chan *pair) closeCh := make(chan struct{}) go func() { defer close(pairCh) c.mutex.RLock() for k, v := range c.cacheMap { c.mutex.RUnlock() select { case <-closeCh: return //从map中读取一个kv,并写入channel中 case pairCh <- &pair{k, v}: } c.mutex.RLock() } c.mutex.RUnlock() }() return &inMemoryScanner{pair{}, pairCh, closeCh} } func (s *inMemoryScanner) Scan() bool { //从channel中读取一个kv p, ok := <-s.pairCh if ok { s.k, s.v = p.k, p.v } return ok }