一致性hash算法与server列表维护

简介:

  考虑到不用重复造轮子,特此转载好文,出处http://shift-alt-ctrl.iteye.com/blog/1963244

    普通的hash算法有个很大的问题:当hash的"模数"发生变化时,整个hash数据结构就需要重新hash,重新hash之后的数据分布一定会和hash之前的不同;在很多场景下,"模数"的变化时必然的,但是这种"数据分布"的巨大变化却会带来一些麻烦.所以,就有了"一致性hash",当然学术界对"一致性hash"的阐述,还远远不止这些.

    在编程应用方面,最直观的例子就是"分布式缓存",一个cache集群中,有N台物理server,为了提升单台server的支撑能力,可能会考虑将数据通过hash的方式相对均匀的分布在每个server上.

    判定方式: location = hashcode(key) % N;事实上,由于需要,N可能会被增加或者削减,不管程序上是否能够妥善的支持N的变更,单从"数据迁移"的面积而言,也是非常大的.

    一致性Hash很巧妙的简化了这个问题,同时再使用"虚拟节点"的方式来细分数据的分布.



 

F1

    图示中表名,有2个物理server节点,每个物理server节点有多个Snode虚拟节点,server1和server2的虚拟节点互相"穿插"且依次排列,每个snode都有一个code,它表示接受数据的hashcode起始值(或终止值),比如上述图示中第一个snode.code为500,即当一个数据的hashcode值在[0,500]时则会被存储在它上.

    引入虚拟节点之后,事情就会好很多;假如KEY1分布在Snode3上,snode3事实为物理server1,当server1故障后,snode1也将被移除,那么KEY1将会被分布在"临近的"虚拟节点上--snode2(或者snode4,由实现而定);无论是存取,下一次对KEY1的操作将会有snode2(或snode4)来服务.

    1) 一个物理server在逻辑上分成N个虚拟节点(本例中为256个)

    2) 多个物理server的虚拟节点需要散列分布,即互相"穿插".

    3) 所有的虚拟节点,在逻辑上形成一个链表

    4) 每个虚拟节点,负责一定区间的hashcode值.

Java代码   收藏代码
  1. import java.net.InetSocketAddress;  
  2. import java.net.Socket;  
  3. import java.net.SocketAddress;  
  4. import java.nio.charset.Charset;  
  5. import java.security.MessageDigest;  
  6. import java.security.NoSuchAlgorithmException;  
  7. import java.util.Map;  
  8. import java.util.NavigableMap;  
  9. import java.util.TreeMap;  
  10.   
  11.   
  12. public class ServerPool {  
  13.   
  14.     private static final int VIRTUAL_NODES_NUMBER = 256;//物理节点对应的虚拟节点的个数  
  15.     private static final String TAG = ".-vtag-.";  
  16.     private NavigableMap<Long, SNode> innerPool = new TreeMap<Long, SNode>();  
  17.     private Hashing hashing = new Hashing();  
  18.   
  19.     /** 
  20.      * 新增物理server地址 
  21.      * @param address 
  22.      * @param  weight 
  23.      * 权重,权重越高,其虚拟节点的个数越高,事实上没命中的概率越高 
  24.      * @throws Exception 
  25.      */  
  26.     public synchronized void addServer(String address,int weight) throws Exception {  
  27.         SNode prev = null;  
  28.         SNode header = null;  
  29.         String[] tmp = address.split(":");  
  30.         InnerServer server = new InnerServer(tmp[0], Integer.parseInt(tmp[1]));  
  31.         server.init();  
  32.         //将一个address下的所有虚拟节点SNode形成链表,可以在removeServer,以及  
  33.         //特殊场景下使用  
  34.         int max = 1;  
  35.         if(weight > 0){  
  36.             max = VIRTUAL_NODES_NUMBER * weight;  
  37.         }  
  38.         for (int i = 0; i < max; i++) {  
  39.             long code = hashing.hash(address + TAG + i);  
  40.             SNode current = new SNode(server, code);  
  41.             if (header == null) {  
  42.                 header = current;  
  43.             }  
  44.             current.setPrev(prev);  
  45.             innerPool.put(code, current);  
  46.             prev = current;  
  47.         }  
  48.     }  
  49.     /** 
  50.      * 删除物理server地址,伴随着虚拟节点的删除 
  51.      * @param address 
  52.      */  
  53.     public synchronized void removeServer(String address) {  
  54.         long code = hashing.hash(address + TAG + (VIRTUAL_NODES_NUMBER - 1));  
  55.         SNode current = innerPool.get(code);  
  56.         if(current == null){  
  57.             return;  
  58.         }  
  59.         if(!current.getAddress().equalsIgnoreCase(address)){  
  60.             return;  
  61.         }  
  62.         current.getServer().close();;  
  63.         while (current != null) {  
  64.             current = innerPool.remove(current.getCode()).getPrev();  
  65.         }  
  66.   
  67.     }  
  68.   
  69.     /** 
  70.      * 根据指定的key,获取此key应该命中的物理server信息 
  71.      * @param key 
  72.      * @return 
  73.      */  
  74.     public InnerServer getServer(String key) {  
  75.         long code = hashing.hash(key);  
  76.         SNode snode = innerPool.lowerEntry(code).getValue();  
  77.         if (snode == null) {  
  78.             snode = innerPool.firstEntry().getValue();  
  79.         }  
  80.         return snode.getServer();  
  81.     }  
  82.   
  83.   
  84.     /** 
  85.      * 虚拟节点描述 
  86.      */  
  87.     class SNode {  
  88.         Long code;  
  89.         InnerServer server;  
  90.         SNode prev;  
  91.   
  92.         SNode(InnerServer server, Long code) {  
  93.             this.server = server;  
  94.             this.code = code;  
  95.         }  
  96.   
  97.         SNode getPrev() {  
  98.             return prev;  
  99.         }  
  100.   
  101.         void setPrev(SNode prev) {  
  102.             this.prev = prev;  
  103.         }  
  104.   
  105.         Long getCode() {  
  106.             return this.code;  
  107.         }  
  108.   
  109.         InnerServer getServer() {  
  110.             return server;  
  111.         }  
  112.         String getAddress(){  
  113.             return server.ip + ":" + server.port;  
  114.         }  
  115.     }  
  116.   
  117.     /** 
  118.      * hashcode生成 
  119.      */  
  120.     class Hashing {  
  121.         //少量优化性能  
  122.         private ThreadLocal<MessageDigest> md5Holder = new ThreadLocal<MessageDigest>();  
  123.         private Charset DEFAULT_CHARSET = Charset.forName("utf-8");  
  124.   
  125.         public long hash(String key) {  
  126.             return hash(key.getBytes(DEFAULT_CHARSET));  
  127.         }  
  128.   
  129.         public long hash(byte[] key) {  
  130.             try {  
  131.                 if (md5Holder.get() == null) {  
  132.                     md5Holder.set(MessageDigest.getInstance("MD5"));  
  133.                 }  
  134.             } catch (NoSuchAlgorithmException e) {  
  135.                 throw new IllegalStateException("no md5 algorythm found");  
  136.             }  
  137.             MessageDigest md5 = md5Holder.get();  
  138.   
  139.             md5.reset();  
  140.             md5.update(key);  
  141.             byte[] bKey = md5.digest();  
  142.             long res = ((long) (bKey[3] & 0xFF) << 24)  
  143.                     | ((long) (bKey[2] & 0xFF) << 16)  
  144.                     | ((long) (bKey[1] & 0xFF) << 8) | (long) (bKey[0] & 0xFF);  
  145.             return res;  
  146.         }  
  147.     }  
  148.   
  149.     /** 
  150.      * 与物理server的TCP链接,用于实际的IO操作 
  151.      */  
  152.     class InnerServer {  
  153.         String ip;  
  154.         int port;  
  155.         Socket socket;  
  156.   
  157.         InnerServer(String ip, int port) {  
  158.             this.ip = ip;  
  159.             this.port = port;  
  160.         }  
  161.   
  162.         synchronized void init() throws Exception {  
  163.             SocketAddress socketAddress = new InetSocketAddress(ip, port);  
  164.             socket = new Socket();  
  165.             socket.connect(socketAddress, 30000);  
  166.         }  
  167.   
  168.         public boolean write(byte[] sources) {  
  169.             //TODO  
  170.             return true;  
  171.         }  
  172.   
  173.         public byte[] read() {  
  174.             //TODO  
  175.             return new byte[]{};  
  176.         }  
  177.   
  178.         public void close(){  
  179.              if(socket == null || socket.isClosed()){  
  180.                  return;  
  181.              }  
  182.             try{  
  183.                 socket.close();  
  184.             } catch (Exception e){  
  185.                 //  
  186.             }  
  187.         }  
  188.     }  
  189. }  

 



原文链接:[http://wely.iteye.com/blog/2275958]

相关文章
|
7月前
|
消息中间件 算法 分布式数据库
Raft算法:分布式一致性领域的璀璨明珠
【4月更文挑战第21天】Raft算法是分布式一致性领域的明星,通过领导者选举、日志复制和安全性解决一致性问题。它将复杂问题简化,角色包括领导者、跟随者和候选者。领导者负责日志复制,确保多数节点同步。实现细节涉及超时机制、日志压缩和网络分区处理。广泛应用于分布式数据库、存储系统和消息队列,如Etcd、TiKV。其简洁高效的特点使其在分布式系统中备受青睐。
|
4月前
|
存储 算法 NoSQL
(七)漫谈分布式之一致性算法下篇:一文从根上儿理解大名鼎鼎的Raft共识算法!
Raft通过一致性检查,能在一定程度上保证集群的一致性,但无法保证所有情况下的一致性,毕竟分布式系统各种故障层出不穷,如何在有可能发生各类故障的分布式系统保证集群一致性,这才是Raft等一致性算法要真正解决的问题。
122 11
|
4月前
|
存储 算法 索引
(六)漫谈分布式之一致性算法上篇:用二十六张图一探Raft共识算法奥妙之处!
现如今,大多数分布式存储系统都投向了Raft算法的怀抱,而本文就来聊聊大名鼎鼎的Raft算法/协议!
131 8
|
4月前
|
存储 算法 Java
(五)漫谈分布式之一致性算法篇:谁说Paxos晦涩难懂?你瞧这不一学就会!
没在时代发展的洪流中泯然于众的道理很简单,是因为它们并不仅是空中楼阁般的高大上理论,而是有着完整落地的思想,它们已然成为构建分布式系统不可或缺的底层基石,而本文则来好好聊聊分布式与一致性思想的落地者:Paxos与Raft协议(算法)。
115 6
|
5月前
|
缓存 负载均衡 算法
(四)网络编程之请求分发篇:负载均衡静态调度算法、平滑轮询加权、一致性哈希、最小活跃数算法实践!
先如今所有的技术栈中,只要一谈关于高可用、高并发处理相关的实现,必然会牵扯到集群这个话题,也就是部署多台服务器共同对外提供服务,从而做到提升系统吞吐量,优化系统的整体性能以及稳定性等目的。
|
7月前
|
缓存 负载均衡 算法
C++如何实现一致性算法
一致性哈希是一种用于分布式系统的负载均衡算法,旨在减少服务器增减导致的数据迁移。当有N台服务器时,通过哈希环将请求均匀分布到每台服务器,每台处理N/1的请求。若使用缓存如Redis,可进一步处理高并发场景。算法将哈希值空间视为环形,服务器和请求哈希后定位到环上,按顺时针方向找到第一台服务器作为负载目标。提供的C++代码实现了MD5哈希函数,以及一致性哈希算法的物理节点、虚拟节点和算法本身,以实现节点的添加、删除和请求映射。
52 1
C++如何实现一致性算法
|
6月前
|
算法 Java
Java中常用hash算法总结
Java中常用hash算法总结
54 0
|
7月前
|
算法 程序员 分布式数据库
分布式一致性必备:一文读懂Raft算法
Raft算法是一种用于分布式系统中复制日志一致性管理的算法。它通过选举领导者来协调日志复制,确保所有节点数据一致。算法包括心跳机制、选举过程、日志复制和一致性保证。当领导者失效时,节点会重新选举,保证高可用性。Raft易于理解和实现,提供强一致性,常用于分布式数据库和协调服务。作者小米分享了相关知识,鼓励对分布式系统感兴趣的读者进一步探索。
1424 0
|
7月前
|
存储 算法 安全
5. raft 一致性算法
5. raft 一致性算法
|
16天前
|
算法
基于WOA算法的SVDD参数寻优matlab仿真
该程序利用鲸鱼优化算法(WOA)对支持向量数据描述(SVDD)模型的参数进行优化,以提高数据分类的准确性。通过MATLAB2022A实现,展示了不同信噪比(SNR)下模型的分类误差。WOA通过模拟鲸鱼捕食行为,动态调整SVDD参数,如惩罚因子C和核函数参数γ,以寻找最优参数组合,增强模型的鲁棒性和泛化能力。
下一篇
DataWorks