Java实现一致性哈希算法,并搭建环境测试其负载均衡特性(二)

本文涉及的产品
应用型负载均衡 ALB,每月750个小时 15LCU
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 实现负载均衡是后端领域一个重要的话题,一致性哈希算法是实现服务器负载均衡的方法之一,你很可能已在一些远程服务框架中使用过它。下面我们尝试一下自己实现一致性哈希算法。

II. 创建服务器集群, 提供RPC远程调用服务

  1. 首先创建一个服务器项目(使用Maven), 添加 zookeeper依赖

  2. 创建常量接口, 用于存储连接 zookeeper 的信息

  3. public interface Constant {
  4.    //zookeeper集群的地址
  5.    String ZK_HOST = "192.168.117.129:2181,192.168.117.129:2182,192.168.117.129:2183";
  6.    //连接zookeeper的超时时间
  7.    int ZK_TIME_OUT = 5000;
  8.    //服务器所发布的远程服务在zookeeper中的注册地址, 也就是说这个节点中保存了各个服务器提供的接口
  9.    String ZK_REGISTRY = "/provider";
  10.    //zookeeper集群中注册服务的url地址的瞬时节点
  11.    String ZK_RMI = ZK_REGISTRY + "/rmi";
  12. }

3.封装操作 zookeeper和发布远程服务的接口供自己调用, 本案例中发布远程服务使用Java自身提供的 rmi包完成, 如果没有了解过可以参考这篇

  1. public class ServiceProvider {

  2.    private CountDownLatch latch = new CountDownLatch(1);

  3.    /**
  4.     * 连接zookeeper集群
  5.     */
  6.    public ZooKeeper connectToZK(){
  7.        ZooKeeper zk = null;
  8.        try {
  9.            zk = new ZooKeeper(Constant.ZK_HOST, Constant.ZK_TIME_OUT, new Watcher() {
  10.                @Override
  11.                public void process(WatchedEvent watchedEvent) {
  12.                    //如果连接上了就唤醒当前线程.
  13.                    latch.countDown();
  14.                }
  15.            });
  16.            latch.await();//还没连接上时当前线程等待
  17.        } catch (Exception e) {
  18.            e.printStackTrace();
  19.        }
  20.        return zk;
  21.    }

  22.    /**
  23.     * 创建znode节点
  24.     * @param zk
  25.     * @param url 节点中写入的数据
  26.     */
  27.    public void createNode(ZooKeeper zk, String url){
  28.        try{
  29.            //要把写入的数据转化为字节数组
  30.            byte[] data = url.getBytes();
  31.            zk.create(Constant.ZK_RMI, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  32.        } catch (Exception e) {
  33.            e.printStackTrace();
  34.        }
  35.    }

  36.    /**
  37.     * 发布rmi服务
  38.     */
  39.    public String publishService(Remote remote, String host, int port){
  40.        String url = null;
  41.        try{
  42.            LocateRegistry.createRegistry(port);
  43.            url = "rmi://" + host + ":" + port + "/rmiService";
  44.            Naming.bind(url, remote);
  45.        } catch (Exception e) {
  46.            e.printStackTrace();
  47.        }
  48.        return url;
  49.    }

  50.    /**
  51.     * 发布rmi服务, 并且将服务的url注册到zookeeper集群中
  52.     */
  53.    public void publish(Remote remote, String host, int port){
  54.        //调用publishService, 得到服务的url地址
  55.        String url = publishService(remote, host, port);
  56.        if(null != url){
  57.            ZooKeeper zk = connectToZK();//连接到zookeeper
  58.            if(null != zk){
  59.                createNode(zk, url);
  60.            }
  61.        }
  62.    }
  63. }
  64. 自定义远程服务. 服务提供一个简单的方法: 客户端发来一个字符串, 服务器在字符串前面添加上 Hello, 并返回字符串。
  65. //UserService
  66. public interface UserService extends Remote {
  67.    public String helloRmi(String name) throws RemoteException;
  68. }
  69. //UserServiceImpl
  70. public class UserServiceImpl implements UserService {

  71.    public UserServiceImpl() throws RemoteException{
  72.        super();
  73.    }

  74.    @Override
  75.    public String helloRmi(String name) throws RemoteException {
  76.        return "Hello " + name + "!";
  77.    }
  78. }
  79. 修改端口号, 启动多个java虚拟机, 模拟服务器集群. 为了方便演示, 自定义7777, 8888, 9999端口开启3个服务器进程, 到时会模拟7777端口的服务器宕机和修复重连。
  80. public static void main(String[] args) throws RemoteException {
  81.    //创建工具类对象
  82.    ServiceProvider sp = new ServiceProvider();
  83.    //创建远程服务对象
  84.    UserService userService = new UserServiceImpl();
  85.    //完成发布
  86.    sp.publish(userService, "localhost", 9999);
  87. }

8.jpg

III. 编写客户端程序(运用一致性哈希算法实现负载均衡

  1. 封装客户端接口:
  2. public class ServiceConsumer {
  3.    /**
  4.     * 提供远程服务的服务器列表, 只记录远程服务的url
  5.     */
  6.    private volatile List<String> urls = new LinkedList<>();
  7.    /**
  8.     * 远程服务对应的虚拟节点集合
  9.     */
  10.    private static TreeMap<Integer, String> virtualNodes = new TreeMap<>();

  11.    public ServiceConsumer(){
  12.        ZooKeeper zk = connectToZK();//客户端连接到zookeeper
  13.        if(null != zk){
  14.            //连接上后关注zookeeper中的节点变化(服务器变化)
  15.            watchNode(zk);
  16.        }
  17.    }

  18.    private void watchNode(final ZooKeeper zk) {
  19.        try{
  20.            //观察/provider节点下的子节点是否有变化(是否有服务器登入或登出)
  21.            List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY, new Watcher() {
  22.                @Override
  23.                public void process(WatchedEvent watchedEvent) {
  24.                    //如果服务器节点有变化就重新获取
  25.                    if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
  26.                        System.out.println("服务器端有变化, 可能有旧服务器宕机或者新服务器加入集群...");
  27.                        watchNode(zk);
  28.                    }
  29.                }
  30.            });
  31.            //将获取到的服务器节点数据保存到集合中, 也就是获得了远程服务的访问url地址
  32.            List<String> dataList = new LinkedList<>();
  33.            TreeMap<Integer, String> newVirtualNodesList = new TreeMap<>();
  34.            for(String nodeStr : nodeList){
  35.                byte[] data = zk.getData(Constants.ZK_REGISTRY + "/" + nodeStr, false, null);
  36.                //放入服务器列表的url
  37.                String url = new String(data);
  38.                //为每个服务器分配虚拟节点, 为了方便模拟, 默认开启在9999端口的服务器性能较差, 只分配300个虚拟节点, 其他分配1000个.
  39.                if(url.contains("9999")){
  40.                    for(int i = 1; i <= 300; i++){
  41.                        newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
  42.                    }
  43.                }else{
  44.                    for(int i = 1; i <= 1000; i++){
  45.                        newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
  46.                    }
  47.                }
  48.                dataList.add(url);
  49.            }
  50.            urls = dataList;
  51.            virtualNodes = newVirtualNodesList;
  52.            dataList = null;//好让垃圾回收器尽快收集
  53.            newVirtualNodesList = null;
  54.        } catch (Exception e) {
  55.            e.printStackTrace();
  56.        }
  57.    }

  58.    /**
  59.     * 根据url获得远程服务对象
  60.     */
  61.    public <T> T lookUpService(String url){
  62.        T remote = null;
  63.        try{
  64.            remote = (T)Naming.lookup(url);
  65.        } catch (Exception e) {
  66.            //如果该url连接不上, 很有可能是该服务器挂了, 这时使用服务器列表中的第一个服务器url重新获取远程对象.
  67.            if(e instanceof ConnectException){
  68.                if (urls.size() != 0){
  69.                    url = urls.get(0);
  70.                    return lookUpService(url);
  71.                }
  72.            }
  73.        }
  74.        return remote;
  75.    }

  76.    /**
  77.     * 通过一致性哈希算法, 选取一个url, 最后返回一个远程服务对象
  78.     */
  79.    public <T extends Remote> T lookUp(){
  80.        T service = null;
  81.        //随机计算一个哈希值
  82.        int hash = FVNHash(Math.random() * 10000 + "");
  83.        //得到大于该哈希值的所有map集合
  84.        SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
  85.        //找到比该值大的第一个虚拟节点, 如果没有比它大的虚拟节点, 根据哈希环, 则返回第一个节点.
  86.        Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();
  87.        //通过该虚拟节点获得服务器url
  88.        String virtualNodeName = virtualNodes.get(targetKey);
  89.        String url = virtualNodeName.split("@")[0];
  90.        //根据服务器url获取远程服务对象
  91.        service = lookUpService(url);
  92.        System.out.print("提供本次服务的地址为: " + url + ", 返回结果: ");
  93.        return service;
  94.    }

  95.    private CountDownLatch latch = new CountDownLatch(1);

  96.    public ZooKeeper connectToZK(){
  97.        ZooKeeper zk = null;
  98.        try {
  99.            zk = new ZooKeeper(Constants.ZK_HOST, Constants.ZK_TIME_OUT, new Watcher() {
  100.                @Override
  101.                public void process(WatchedEvent watchedEvent) {
  102.                    //判断是否连接zk集群
  103.                    latch.countDown();//唤醒处于等待状态的当前线程
  104.                }
  105.            });
  106.            latch.await();//没有连接上的时候当前线程处于等待状态.
  107.        } catch (IOException e) {
  108.            e.printStackTrace();
  109.        } catch (InterruptedException e) {
  110.            e.printStackTrace();
  111.        }
  112.        return zk;
  113.    }


  114.    public static int FVNHash(String data){
  115.        final int p = 16777619;
  116.        int hash = (int)2166136261L;
  117.        for(int i = 0; i < data.length(); i++)
  118.            hash = (hash ^ data.charAt(i)) * p;
  119.        hash += hash << 13;
  120.        hash ^= hash >> 7;
  121.        hash += hash << 3;
  122.        hash ^= hash >> 17;
  123.        hash += hash << 5;
  124.        return hash < 0 ? Math.abs(hash) : hash;
  125.    }
  126. }
  127. 启动客户端进行测试:
  128. public static void main(String[] args){
  129.    ServiceConsumer sc = new ServiceConsumer();//创建工具类对象
  130.    while(true){
  131.        //获得rmi远程服务对象
  132.        UserService userService = sc.lookUp();
  133.        try{
  134.            //调用远程方法
  135.            String result = userService.helloRmi("炭烧生蚝");
  136.            System.out.println(result);
  137.            Thread.sleep(100);
  138.        }catch(Exception e){
  139.            e.printStackTrace();
  140.        }
  141.    }
  142. }
  143. 客户端跑起来后, 在显示台不断进行打印...下面将对数据进行统计。

9.jpg

10.png

IV. 对服务器调用数据进行统计分析

重温一遍模拟的过程: 首先分别在7777, 8888, 9999端口启动了3台服务器. 然后启动客户端进行访问. 7777, 8888端口的两台服务器设置性能指数为1000, 而9999端口的服务器性能指数设置为300。

在客户端运行期间, 我手动关闭了8888端口的服务器, 客户端正常打印出服务器变化信息。此时理论上不会有访问被路由到8888端口的服务器。当我重新启动8888端口服务器时, 客户端打印出服务器变化信息, 访问能正常到达8888端口服务器。

下面对各服务器的访问量进行统计, 看是否实现了负载均衡。

测试程序如下:

  1. public class DataStatistics {
  2.    private static float ReqToPort7777 = 0;
  3.    private static float ReqToPort8888 = 0;
  4.    private static float ReqToPort9999 = 0;

  5.    public static void main(String[] args) {
  6.        BufferedReader br = null;
  7.        try {
  8.            br = new BufferedReader(new FileReader("C://test.txt"));
  9.            String line = null;
  10.            while(null != (line = br.readLine())){
  11.                if(line.contains("7777")){
  12.                    ReqToPort7777++;
  13.                }else if(line.contains("8888")){
  14.                    ReqToPort8888++;
  15.                }else if(line.contains("9999")){
  16.                    ReqToPort9999++;
  17.                }else{
  18.                    print(false);
  19.                }
  20.            }
  21.            print(true);
  22.        } catch (Exception e) {
  23.            e.printStackTrace();
  24.        }finally {
  25.            if(null != br){
  26.                try {
  27.                    br.close();
  28.                } catch (IOException e) {
  29.                    e.printStackTrace();
  30.                }
  31.                br = null;
  32.            }
  33.        }
  34.    }

  35.    private static void print(boolean isEnd){
  36.        if(!isEnd){
  37.            System.out.println("------------- 服务器集群发生变化 -------------");
  38.        }else{
  39.            System.out.println("------------- 最后一次统计 -------------");
  40.        }
  41.        System.out.println("截取自上次服务器变化到现在: ");
  42.        float total = ReqToPort7777 + ReqToPort8888 + ReqToPort9999;
  43.        System.out.println("7777端口服务器访问量为: " + ReqToPort7777 + ", 占比" + (ReqToPort7777 / total));
  44.        System.out.println("8888端口服务器访问量为: " + ReqToPort8888 + ", 占比" + (ReqToPort8888 / total));
  45.        System.out.println("9999端口服务器访问量为: " + ReqToPort9999 + ", 占比" + (ReqToPort9999 / total));
  46.        ReqToPort7777 = 0;
  47.        ReqToPort8888 = 0;
  48.        ReqToPort9999 = 0;
  49.    }
  50. }

  51. /* 以下是输出结果
  52. ------------- 服务器集群发生变化 -------------
  53. 截取自上次服务器变化到现在:
  54. 7777端口服务器访问量为: 198.0, 占比0.4419643
  55. 8888端口服务器访问量为: 184.0, 占比0.4107143
  56. 9999端口服务器访问量为: 66.0, 占比0.14732143
  57. ------------- 服务器集群发生变化 -------------
  58. 截取自上次服务器变化到现在:
  59. 7777端口服务器访问量为: 510.0, 占比0.7589286
  60. 8888端口服务器访问量为: 1.0, 占比0.0014880953
  61. 9999端口服务器访问量为: 161.0, 占比0.23958333
  62. ------------- 最后一次统计 -------------
  63. 截取自上次服务器变化到现在:
  64. 7777端口服务器访问量为: 410.0, 占比0.43248945
  65. 8888端口服务器访问量为: 398.0, 占比0.41983122
  66. 9999端口服务器访问量为: 140.0, 占比0.14767933
  67. */

V. 结果

从测试数据可以看出, 不管是8888端口服务器宕机之前, 还是宕机之后, 三台服务器接收的访问量和性能指数成正比,成功地验证了一致性哈希算法的负载均衡作用。

四. 扩展思考

初识一致性哈希算法的时候, 对这种奇特的思路佩服得五体投地。但是一致性哈希算法除了能够让后端服务器实现负载均衡, 还有一个特点可能是其他负载均衡算法所不具备的。

这个特点是基于哈希函数的, 我们知道通过哈希函数, 固定的输入能够产生固定的输出. 换句话说, 同样的请求会路由到相同的服务器. 这点就很牛逼了, 我们可以结合一致性哈希算法和缓存机制提供后端服务器的性能。

比如说在一个分布式系统中, 有一个服务器集群提供查询用户信息的方法, 每个请求将会带着用户的 uid到达, 我们可以通过哈希函数进行处理(从上面的演示代码可以看到, 这点是可以轻松实现的), 使同样的 uid路由到某个独定的服务器. 这样我们就可以在服务器上对该的 uid背后的用户信息进行缓存, 从而减少对数据库或其他中间件的操作, 从而提高系统效率。

当然如果使用该策略的话, 你可能还要考虑缓存更新等操作, 但作为一种优良的策略, 我们可以考虑在适当的场合灵活运用。

以上思考受启发于 Dubbo框架中对其实现的四种负载均衡策略的描述。


相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
7天前
|
监控 算法 网络协议
Java 实现局域网电脑屏幕监控算法揭秘
在数字化办公环境中,局域网电脑屏幕监控至关重要。本文介绍用Java实现这一功能的算法,涵盖图像采集、数据传输和监控端显示三个关键环节。通过Java的AWT/Swing库和Robot类抓取屏幕图像,使用Socket进行TCP/IP通信传输图像数据,并利用ImageIO类在监控端展示图像。整个过程确保高效、实时和准确,为提升数字化管理提供了技术基础。
40 15
|
3月前
|
存储 安全 Java
Java Map新玩法:探索HashMap和TreeMap的高级特性,让你的代码更强大!
【10月更文挑战第17天】Java Map新玩法:探索HashMap和TreeMap的高级特性,让你的代码更强大!
82 2
|
13天前
|
缓存 算法 搜索推荐
Java中的算法优化与复杂度分析
在Java开发中,理解和优化算法的时间复杂度和空间复杂度是提升程序性能的关键。通过合理选择数据结构、避免重复计算、应用分治法等策略,可以显著提高算法效率。在实际开发中,应该根据具体需求和场景,选择合适的优化方法,从而编写出高效、可靠的代码。
25 6
|
2月前
|
机器学习/深度学习 人工智能 算法
BALROG:基准测试工具,用于评估 LLMs 和 VLMs 在复杂动态环境中的推理能力
BALROG 是一款用于评估大型语言模型(LLMs)和视觉语言模型(VLMs)在复杂动态环境中推理能力的基准测试工具。它通过一系列挑战性的游戏环境,如 NetHack,测试模型的规划、空间推理和探索能力。BALROG 提供了一个开放且细粒度的评估框架,推动了自主代理研究的进展。
46 3
BALROG:基准测试工具,用于评估 LLMs 和 VLMs 在复杂动态环境中的推理能力
|
26天前
|
存储 Java 开发者
什么是java的Compact Strings特性,什么情况下使用
Java 9引入了紧凑字符串特性,优化了字符串的内存使用。它通过将字符串从UTF-16字符数组改为字节数组存储,根据内容选择更节省内存的编码方式,通常能节省10%至15%的内存。
|
1月前
|
存储 Java 数据挖掘
Java 8 新特性之 Stream API:函数式编程风格的数据处理范式
Java 8 引入的 Stream API 提供了一种新的数据处理方式,支持函数式编程风格,能够高效、简洁地处理集合数据,实现过滤、映射、聚合等操作。
57 6
|
1月前
|
负载均衡 网络协议 算法
Docker容器环境中服务发现与负载均衡的技术与方法,涵盖环境变量、DNS、集中式服务发现系统等方式
本文探讨了Docker容器环境中服务发现与负载均衡的技术与方法,涵盖环境变量、DNS、集中式服务发现系统等方式,以及软件负载均衡器、云服务负载均衡、容器编排工具等实现手段,强调两者结合的重要性及面临挑战的应对措施。
78 3
|
2月前
|
缓存 Ubuntu Linux
Linux环境下测试服务器的DDR5内存性能
通过使用 `memtester`和 `sysbench`等工具,可以有效地测试Linux环境下服务器的DDR5内存性能。这些工具不仅可以评估内存的读写速度,还可以检测内存中的潜在问题,帮助确保系统的稳定性和性能。通过合理配置和使用这些工具,系统管理员可以深入了解服务器内存的性能状况,为系统优化提供数据支持。
46 4
|
2月前
|
分布式计算 Java API
Java 8引入了流处理和函数式编程两大新特性
Java 8引入了流处理和函数式编程两大新特性。流处理提供了一种声明式的数据处理方式,使代码更简洁易读;函数式编程通过Lambda表达式和函数式接口,简化了代码书写,提高了灵活性。此外,Java 8还引入了Optional类、新的日期时间API等,进一步增强了编程能力。这些新特性使开发者能够编写更高效、更清晰的代码。
39 4
|
2月前
|
机器学习/深度学习 自然语言处理 前端开发
前端神经网络入门:Brain.js - 详细介绍和对比不同的实现 - CNN、RNN、DNN、FFNN -无需准备环境打开浏览器即可测试运行-支持WebGPU加速
本文介绍了如何使用 JavaScript 神经网络库 **Brain.js** 实现不同类型的神经网络,包括前馈神经网络(FFNN)、深度神经网络(DNN)和循环神经网络(RNN)。通过简单的示例和代码,帮助前端开发者快速入门并理解神经网络的基本概念。文章还对比了各类神经网络的特点和适用场景,并简要介绍了卷积神经网络(CNN)的替代方案。
187 1