Java实现一致性哈希算法,并搭建环境测试其负载均衡特性(二)

本文涉及的产品
应用型负载均衡 ALB,每月750个小时 15LCU
传统型负载均衡 CLB,每月750个小时 15LCU
云原生网关 MSE Higress,422元/月
简介: 实现负载均衡是后端领域一个重要的话题,一致性哈希算法是实现服务器负载均衡的方法之一,你很可能已在一些远程服务框架中使用过它。下面我们尝试一下自己实现一致性哈希算法。

II. 创建服务器集群, 提供RPC远程调用服务

  1. 首先创建一个服务器项目(使用Maven), 添加 zookeeper依赖

  2. 创建常量接口, 用于存储连接 zookeeper 的信息

  3. public interface Constant {
  4.    //zookeeper集群的地址
  5.    String ZK_HOST = "192.168.117.129:2181,192.168.117.129:2182,192.168.117.129:2183";
  6.    //连接zookeeper的超时时间
  7.    int ZK_TIME_OUT = 5000;
  8.    //服务器所发布的远程服务在zookeeper中的注册地址, 也就是说这个节点中保存了各个服务器提供的接口
  9.    String ZK_REGISTRY = "/provider";
  10.    //zookeeper集群中注册服务的url地址的瞬时节点
  11.    String ZK_RMI = ZK_REGISTRY + "/rmi";
  12. }

3.封装操作 zookeeper和发布远程服务的接口供自己调用, 本案例中发布远程服务使用Java自身提供的 rmi包完成, 如果没有了解过可以参考这篇

  1. public class ServiceProvider {

  2.    private CountDownLatch latch = new CountDownLatch(1);

  3.    /**
  4.     * 连接zookeeper集群
  5.     */
  6.    public ZooKeeper connectToZK(){
  7.        ZooKeeper zk = null;
  8.        try {
  9.            zk = new ZooKeeper(Constant.ZK_HOST, Constant.ZK_TIME_OUT, new Watcher() {
  10.                @Override
  11.                public void process(WatchedEvent watchedEvent) {
  12.                    //如果连接上了就唤醒当前线程.
  13.                    latch.countDown();
  14.                }
  15.            });
  16.            latch.await();//还没连接上时当前线程等待
  17.        } catch (Exception e) {
  18.            e.printStackTrace();
  19.        }
  20.        return zk;
  21.    }

  22.    /**
  23.     * 创建znode节点
  24.     * @param zk
  25.     * @param url 节点中写入的数据
  26.     */
  27.    public void createNode(ZooKeeper zk, String url){
  28.        try{
  29.            //要把写入的数据转化为字节数组
  30.            byte[] data = url.getBytes();
  31.            zk.create(Constant.ZK_RMI, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  32.        } catch (Exception e) {
  33.            e.printStackTrace();
  34.        }
  35.    }

  36.    /**
  37.     * 发布rmi服务
  38.     */
  39.    public String publishService(Remote remote, String host, int port){
  40.        String url = null;
  41.        try{
  42.            LocateRegistry.createRegistry(port);
  43.            url = "rmi://" + host + ":" + port + "/rmiService";
  44.            Naming.bind(url, remote);
  45.        } catch (Exception e) {
  46.            e.printStackTrace();
  47.        }
  48.        return url;
  49.    }

  50.    /**
  51.     * 发布rmi服务, 并且将服务的url注册到zookeeper集群中
  52.     */
  53.    public void publish(Remote remote, String host, int port){
  54.        //调用publishService, 得到服务的url地址
  55.        String url = publishService(remote, host, port);
  56.        if(null != url){
  57.            ZooKeeper zk = connectToZK();//连接到zookeeper
  58.            if(null != zk){
  59.                createNode(zk, url);
  60.            }
  61.        }
  62.    }
  63. }
  64. 自定义远程服务. 服务提供一个简单的方法: 客户端发来一个字符串, 服务器在字符串前面添加上 Hello, 并返回字符串。
  65. //UserService
  66. public interface UserService extends Remote {
  67.    public String helloRmi(String name) throws RemoteException;
  68. }
  69. //UserServiceImpl
  70. public class UserServiceImpl implements UserService {

  71.    public UserServiceImpl() throws RemoteException{
  72.        super();
  73.    }

  74.    @Override
  75.    public String helloRmi(String name) throws RemoteException {
  76.        return "Hello " + name + "!";
  77.    }
  78. }
  79. 修改端口号, 启动多个java虚拟机, 模拟服务器集群. 为了方便演示, 自定义7777, 8888, 9999端口开启3个服务器进程, 到时会模拟7777端口的服务器宕机和修复重连。
  80. public static void main(String[] args) throws RemoteException {
  81.    //创建工具类对象
  82.    ServiceProvider sp = new ServiceProvider();
  83.    //创建远程服务对象
  84.    UserService userService = new UserServiceImpl();
  85.    //完成发布
  86.    sp.publish(userService, "localhost", 9999);
  87. }

8.jpg

III. 编写客户端程序(运用一致性哈希算法实现负载均衡

  1. 封装客户端接口:
  2. public class ServiceConsumer {
  3.    /**
  4.     * 提供远程服务的服务器列表, 只记录远程服务的url
  5.     */
  6.    private volatile List<String> urls = new LinkedList<>();
  7.    /**
  8.     * 远程服务对应的虚拟节点集合
  9.     */
  10.    private static TreeMap<Integer, String> virtualNodes = new TreeMap<>();

  11.    public ServiceConsumer(){
  12.        ZooKeeper zk = connectToZK();//客户端连接到zookeeper
  13.        if(null != zk){
  14.            //连接上后关注zookeeper中的节点变化(服务器变化)
  15.            watchNode(zk);
  16.        }
  17.    }

  18.    private void watchNode(final ZooKeeper zk) {
  19.        try{
  20.            //观察/provider节点下的子节点是否有变化(是否有服务器登入或登出)
  21.            List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY, new Watcher() {
  22.                @Override
  23.                public void process(WatchedEvent watchedEvent) {
  24.                    //如果服务器节点有变化就重新获取
  25.                    if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
  26.                        System.out.println("服务器端有变化, 可能有旧服务器宕机或者新服务器加入集群...");
  27.                        watchNode(zk);
  28.                    }
  29.                }
  30.            });
  31.            //将获取到的服务器节点数据保存到集合中, 也就是获得了远程服务的访问url地址
  32.            List<String> dataList = new LinkedList<>();
  33.            TreeMap<Integer, String> newVirtualNodesList = new TreeMap<>();
  34.            for(String nodeStr : nodeList){
  35.                byte[] data = zk.getData(Constants.ZK_REGISTRY + "/" + nodeStr, false, null);
  36.                //放入服务器列表的url
  37.                String url = new String(data);
  38.                //为每个服务器分配虚拟节点, 为了方便模拟, 默认开启在9999端口的服务器性能较差, 只分配300个虚拟节点, 其他分配1000个.
  39.                if(url.contains("9999")){
  40.                    for(int i = 1; i <= 300; i++){
  41.                        newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
  42.                    }
  43.                }else{
  44.                    for(int i = 1; i <= 1000; i++){
  45.                        newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
  46.                    }
  47.                }
  48.                dataList.add(url);
  49.            }
  50.            urls = dataList;
  51.            virtualNodes = newVirtualNodesList;
  52.            dataList = null;//好让垃圾回收器尽快收集
  53.            newVirtualNodesList = null;
  54.        } catch (Exception e) {
  55.            e.printStackTrace();
  56.        }
  57.    }

  58.    /**
  59.     * 根据url获得远程服务对象
  60.     */
  61.    public <T> T lookUpService(String url){
  62.        T remote = null;
  63.        try{
  64.            remote = (T)Naming.lookup(url);
  65.        } catch (Exception e) {
  66.            //如果该url连接不上, 很有可能是该服务器挂了, 这时使用服务器列表中的第一个服务器url重新获取远程对象.
  67.            if(e instanceof ConnectException){
  68.                if (urls.size() != 0){
  69.                    url = urls.get(0);
  70.                    return lookUpService(url);
  71.                }
  72.            }
  73.        }
  74.        return remote;
  75.    }

  76.    /**
  77.     * 通过一致性哈希算法, 选取一个url, 最后返回一个远程服务对象
  78.     */
  79.    public <T extends Remote> T lookUp(){
  80.        T service = null;
  81.        //随机计算一个哈希值
  82.        int hash = FVNHash(Math.random() * 10000 + "");
  83.        //得到大于该哈希值的所有map集合
  84.        SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
  85.        //找到比该值大的第一个虚拟节点, 如果没有比它大的虚拟节点, 根据哈希环, 则返回第一个节点.
  86.        Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();
  87.        //通过该虚拟节点获得服务器url
  88.        String virtualNodeName = virtualNodes.get(targetKey);
  89.        String url = virtualNodeName.split("@")[0];
  90.        //根据服务器url获取远程服务对象
  91.        service = lookUpService(url);
  92.        System.out.print("提供本次服务的地址为: " + url + ", 返回结果: ");
  93.        return service;
  94.    }

  95.    private CountDownLatch latch = new CountDownLatch(1);

  96.    public ZooKeeper connectToZK(){
  97.        ZooKeeper zk = null;
  98.        try {
  99.            zk = new ZooKeeper(Constants.ZK_HOST, Constants.ZK_TIME_OUT, new Watcher() {
  100.                @Override
  101.                public void process(WatchedEvent watchedEvent) {
  102.                    //判断是否连接zk集群
  103.                    latch.countDown();//唤醒处于等待状态的当前线程
  104.                }
  105.            });
  106.            latch.await();//没有连接上的时候当前线程处于等待状态.
  107.        } catch (IOException e) {
  108.            e.printStackTrace();
  109.        } catch (InterruptedException e) {
  110.            e.printStackTrace();
  111.        }
  112.        return zk;
  113.    }


  114.    public static int FVNHash(String data){
  115.        final int p = 16777619;
  116.        int hash = (int)2166136261L;
  117.        for(int i = 0; i < data.length(); i++)
  118.            hash = (hash ^ data.charAt(i)) * p;
  119.        hash += hash << 13;
  120.        hash ^= hash >> 7;
  121.        hash += hash << 3;
  122.        hash ^= hash >> 17;
  123.        hash += hash << 5;
  124.        return hash < 0 ? Math.abs(hash) : hash;
  125.    }
  126. }
  127. 启动客户端进行测试:
  128. public static void main(String[] args){
  129.    ServiceConsumer sc = new ServiceConsumer();//创建工具类对象
  130.    while(true){
  131.        //获得rmi远程服务对象
  132.        UserService userService = sc.lookUp();
  133.        try{
  134.            //调用远程方法
  135.            String result = userService.helloRmi("炭烧生蚝");
  136.            System.out.println(result);
  137.            Thread.sleep(100);
  138.        }catch(Exception e){
  139.            e.printStackTrace();
  140.        }
  141.    }
  142. }
  143. 客户端跑起来后, 在显示台不断进行打印...下面将对数据进行统计。

9.jpg

10.png

IV. 对服务器调用数据进行统计分析

重温一遍模拟的过程: 首先分别在7777, 8888, 9999端口启动了3台服务器. 然后启动客户端进行访问. 7777, 8888端口的两台服务器设置性能指数为1000, 而9999端口的服务器性能指数设置为300。

在客户端运行期间, 我手动关闭了8888端口的服务器, 客户端正常打印出服务器变化信息。此时理论上不会有访问被路由到8888端口的服务器。当我重新启动8888端口服务器时, 客户端打印出服务器变化信息, 访问能正常到达8888端口服务器。

下面对各服务器的访问量进行统计, 看是否实现了负载均衡。

测试程序如下:

  1. public class DataStatistics {
  2.    private static float ReqToPort7777 = 0;
  3.    private static float ReqToPort8888 = 0;
  4.    private static float ReqToPort9999 = 0;

  5.    public static void main(String[] args) {
  6.        BufferedReader br = null;
  7.        try {
  8.            br = new BufferedReader(new FileReader("C://test.txt"));
  9.            String line = null;
  10.            while(null != (line = br.readLine())){
  11.                if(line.contains("7777")){
  12.                    ReqToPort7777++;
  13.                }else if(line.contains("8888")){
  14.                    ReqToPort8888++;
  15.                }else if(line.contains("9999")){
  16.                    ReqToPort9999++;
  17.                }else{
  18.                    print(false);
  19.                }
  20.            }
  21.            print(true);
  22.        } catch (Exception e) {
  23.            e.printStackTrace();
  24.        }finally {
  25.            if(null != br){
  26.                try {
  27.                    br.close();
  28.                } catch (IOException e) {
  29.                    e.printStackTrace();
  30.                }
  31.                br = null;
  32.            }
  33.        }
  34.    }

  35.    private static void print(boolean isEnd){
  36.        if(!isEnd){
  37.            System.out.println("------------- 服务器集群发生变化 -------------");
  38.        }else{
  39.            System.out.println("------------- 最后一次统计 -------------");
  40.        }
  41.        System.out.println("截取自上次服务器变化到现在: ");
  42.        float total = ReqToPort7777 + ReqToPort8888 + ReqToPort9999;
  43.        System.out.println("7777端口服务器访问量为: " + ReqToPort7777 + ", 占比" + (ReqToPort7777 / total));
  44.        System.out.println("8888端口服务器访问量为: " + ReqToPort8888 + ", 占比" + (ReqToPort8888 / total));
  45.        System.out.println("9999端口服务器访问量为: " + ReqToPort9999 + ", 占比" + (ReqToPort9999 / total));
  46.        ReqToPort7777 = 0;
  47.        ReqToPort8888 = 0;
  48.        ReqToPort9999 = 0;
  49.    }
  50. }

  51. /* 以下是输出结果
  52. ------------- 服务器集群发生变化 -------------
  53. 截取自上次服务器变化到现在:
  54. 7777端口服务器访问量为: 198.0, 占比0.4419643
  55. 8888端口服务器访问量为: 184.0, 占比0.4107143
  56. 9999端口服务器访问量为: 66.0, 占比0.14732143
  57. ------------- 服务器集群发生变化 -------------
  58. 截取自上次服务器变化到现在:
  59. 7777端口服务器访问量为: 510.0, 占比0.7589286
  60. 8888端口服务器访问量为: 1.0, 占比0.0014880953
  61. 9999端口服务器访问量为: 161.0, 占比0.23958333
  62. ------------- 最后一次统计 -------------
  63. 截取自上次服务器变化到现在:
  64. 7777端口服务器访问量为: 410.0, 占比0.43248945
  65. 8888端口服务器访问量为: 398.0, 占比0.41983122
  66. 9999端口服务器访问量为: 140.0, 占比0.14767933
  67. */

V. 结果

从测试数据可以看出, 不管是8888端口服务器宕机之前, 还是宕机之后, 三台服务器接收的访问量和性能指数成正比,成功地验证了一致性哈希算法的负载均衡作用。

四. 扩展思考

初识一致性哈希算法的时候, 对这种奇特的思路佩服得五体投地。但是一致性哈希算法除了能够让后端服务器实现负载均衡, 还有一个特点可能是其他负载均衡算法所不具备的。

这个特点是基于哈希函数的, 我们知道通过哈希函数, 固定的输入能够产生固定的输出. 换句话说, 同样的请求会路由到相同的服务器. 这点就很牛逼了, 我们可以结合一致性哈希算法和缓存机制提供后端服务器的性能。

比如说在一个分布式系统中, 有一个服务器集群提供查询用户信息的方法, 每个请求将会带着用户的 uid到达, 我们可以通过哈希函数进行处理(从上面的演示代码可以看到, 这点是可以轻松实现的), 使同样的 uid路由到某个独定的服务器. 这样我们就可以在服务器上对该的 uid背后的用户信息进行缓存, 从而减少对数据库或其他中间件的操作, 从而提高系统效率。

当然如果使用该策略的话, 你可能还要考虑缓存更新等操作, 但作为一种优良的策略, 我们可以考虑在适当的场合灵活运用。

以上思考受启发于 Dubbo框架中对其实现的四种负载均衡策略的描述。


相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
相关文章
|
2月前
|
设计模式 算法 搜索推荐
Java 设计模式之策略模式:灵活切换算法的艺术
策略模式通过封装不同算法并实现灵活切换,将算法与使用解耦。以支付为例,微信、支付宝等支付方式作为独立策略,购物车根据选择调用对应支付逻辑,提升代码可维护性与扩展性,避免冗长条件判断,符合开闭原则。
319 35
|
4月前
|
算法 IDE Java
Java 项目实战之实际代码实现与测试调试全过程详解
本文详细讲解了Java项目的实战开发流程,涵盖项目创建、代码实现(如计算器与汉诺塔问题)、单元测试(使用JUnit)及调试技巧(如断点调试与异常排查),帮助开发者掌握从编码到测试调试的完整技能,提升Java开发实战能力。
458 0
|
7月前
|
负载均衡 算法 关系型数据库
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
|
2月前
|
存储 算法 搜索推荐
《数据之美》:Java数据结构与算法精要
本系列深入探讨数据结构与算法的核心原理及Java实现,涵盖线性与非线性结构、常用算法分类、复杂度分析及集合框架应用,助你提升程序效率,掌握编程底层逻辑。
|
7月前
|
存储 缓存 监控
上网行为监控系统剖析:基于 Java LinkedHashMap 算法的时间序列追踪机制探究
数字化办公蓬勃发展的背景下,上网行为监控系统已成为企业维护信息安全、提升工作效能的关键手段。该系统需实时记录并深入分析员工的网络访问行为,如何高效存储和管理这些处于动态变化中的数据,便成为亟待解决的核心问题。Java 语言中的LinkedHashMap数据结构,凭借其独有的有序性特征以及可灵活配置的淘汰策略,为上网行为监控系统提供了一种兼顾性能与功能需求的数据管理方案。本文将对LinkedHashMap在上网行为监控系统中的应用原理、实现路径及其应用价值展开深入探究。
188 3
|
2月前
|
存储 人工智能 算法
从零掌握贪心算法Java版:LeetCode 10题实战解析(上)
在算法世界里,有一种思想如同生活中的"见好就收"——每次做出当前看来最优的选择,寄希望于通过局部最优达成全局最优。这种思想就是贪心算法,它以其简洁高效的特点,成为解决最优问题的利器。今天我们就来系统学习贪心算法的核心思想,并通过10道LeetCode经典题目实战演练,带你掌握这种"步步为营"的解题思维。
|
7月前
|
人工智能 算法 NoSQL
LRU算法的Java实现
LRU(Least Recently Used)算法用于淘汰最近最少使用的数据,常应用于内存管理策略中。在Redis中,通过`maxmemory-policy`配置实现不同淘汰策略,如`allkeys-lru`和`volatile-lru`等,采用采样方式近似LRU以优化性能。Java中可通过`LinkedHashMap`轻松实现LRUCache,利用其`accessOrder`特性和`removeEldestEntry`方法完成缓存淘汰逻辑,代码简洁高效。
308 0
|
3月前
|
机器学习/深度学习 传感器 算法
基于不变扩展卡尔曼滤波器RI-EKF的同时定位与地图构建SLAM算法的收敛性和一致性特性研究(Matlab代码实现)
基于不变扩展卡尔曼滤波器RI-EKF的同时定位与地图构建SLAM算法的收敛性和一致性特性研究(Matlab代码实现)
117 2
|
9月前
|
缓存 监控 负载均衡
如何提升 API 性能:来自 Java 和测试开发者的优化建议
本文探讨了如何优化API响应时间,提升用户体验。通过缓存(如Redis/Memcached)、减少数据负载(REST过滤字段或GraphQL精确请求)、负载均衡(Nginx/AWS等工具)、数据压缩(Gzip/Brotli)、限流节流、监控性能(Apipost/New Relic等工具)、升级基础设施、减少第三方依赖、优化数据库查询及采用异步处理等方式,可显著提高API速度。快速响应的API不仅让用户满意,还能增强应用整体性能。
|
6月前
|
存储 算法 安全
Java中的对称加密算法的原理与实现
本文详细解析了Java中三种常用对称加密算法(AES、DES、3DES)的实现原理及应用。对称加密使用相同密钥进行加解密,适合数据安全传输与存储。AES作为现代标准,支持128/192/256位密钥,安全性高;DES采用56位密钥,现已不够安全;3DES通过三重加密增强安全性,但性能较低。文章提供了各算法的具体Java代码示例,便于快速上手实现加密解密操作,帮助用户根据需求选择合适的加密方案保护数据安全。
423 58