Java实现一致性哈希算法,并搭建环境测试其负载均衡特性(二)

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 实现负载均衡是后端领域一个重要的话题,一致性哈希算法是实现服务器负载均衡的方法之一,你很可能已在一些远程服务框架中使用过它。下面我们尝试一下自己实现一致性哈希算法。

II. 创建服务器集群, 提供RPC远程调用服务

  1. 首先创建一个服务器项目(使用Maven), 添加 zookeeper依赖

  2. 创建常量接口, 用于存储连接 zookeeper 的信息

  3. public interface Constant {
  4.    //zookeeper集群的地址
  5.    String ZK_HOST = "192.168.117.129:2181,192.168.117.129:2182,192.168.117.129:2183";
  6.    //连接zookeeper的超时时间
  7.    int ZK_TIME_OUT = 5000;
  8.    //服务器所发布的远程服务在zookeeper中的注册地址, 也就是说这个节点中保存了各个服务器提供的接口
  9.    String ZK_REGISTRY = "/provider";
  10.    //zookeeper集群中注册服务的url地址的瞬时节点
  11.    String ZK_RMI = ZK_REGISTRY + "/rmi";
  12. }

3.封装操作 zookeeper和发布远程服务的接口供自己调用, 本案例中发布远程服务使用Java自身提供的 rmi包完成, 如果没有了解过可以参考这篇

  1. public class ServiceProvider {

  2.    private CountDownLatch latch = new CountDownLatch(1);

  3.    /**
  4.     * 连接zookeeper集群
  5.     */
  6.    public ZooKeeper connectToZK(){
  7.        ZooKeeper zk = null;
  8.        try {
  9.            zk = new ZooKeeper(Constant.ZK_HOST, Constant.ZK_TIME_OUT, new Watcher() {
  10.                @Override
  11.                public void process(WatchedEvent watchedEvent) {
  12.                    //如果连接上了就唤醒当前线程.
  13.                    latch.countDown();
  14.                }
  15.            });
  16.            latch.await();//还没连接上时当前线程等待
  17.        } catch (Exception e) {
  18.            e.printStackTrace();
  19.        }
  20.        return zk;
  21.    }

  22.    /**
  23.     * 创建znode节点
  24.     * @param zk
  25.     * @param url 节点中写入的数据
  26.     */
  27.    public void createNode(ZooKeeper zk, String url){
  28.        try{
  29.            //要把写入的数据转化为字节数组
  30.            byte[] data = url.getBytes();
  31.            zk.create(Constant.ZK_RMI, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  32.        } catch (Exception e) {
  33.            e.printStackTrace();
  34.        }
  35.    }

  36.    /**
  37.     * 发布rmi服务
  38.     */
  39.    public String publishService(Remote remote, String host, int port){
  40.        String url = null;
  41.        try{
  42.            LocateRegistry.createRegistry(port);
  43.            url = "rmi://" + host + ":" + port + "/rmiService";
  44.            Naming.bind(url, remote);
  45.        } catch (Exception e) {
  46.            e.printStackTrace();
  47.        }
  48.        return url;
  49.    }

  50.    /**
  51.     * 发布rmi服务, 并且将服务的url注册到zookeeper集群中
  52.     */
  53.    public void publish(Remote remote, String host, int port){
  54.        //调用publishService, 得到服务的url地址
  55.        String url = publishService(remote, host, port);
  56.        if(null != url){
  57.            ZooKeeper zk = connectToZK();//连接到zookeeper
  58.            if(null != zk){
  59.                createNode(zk, url);
  60.            }
  61.        }
  62.    }
  63. }
  64. 自定义远程服务. 服务提供一个简单的方法: 客户端发来一个字符串, 服务器在字符串前面添加上 Hello, 并返回字符串。
  65. //UserService
  66. public interface UserService extends Remote {
  67.    public String helloRmi(String name) throws RemoteException;
  68. }
  69. //UserServiceImpl
  70. public class UserServiceImpl implements UserService {

  71.    public UserServiceImpl() throws RemoteException{
  72.        super();
  73.    }

  74.    @Override
  75.    public String helloRmi(String name) throws RemoteException {
  76.        return "Hello " + name + "!";
  77.    }
  78. }
  79. 修改端口号, 启动多个java虚拟机, 模拟服务器集群. 为了方便演示, 自定义7777, 8888, 9999端口开启3个服务器进程, 到时会模拟7777端口的服务器宕机和修复重连。
  80. public static void main(String[] args) throws RemoteException {
  81.    //创建工具类对象
  82.    ServiceProvider sp = new ServiceProvider();
  83.    //创建远程服务对象
  84.    UserService userService = new UserServiceImpl();
  85.    //完成发布
  86.    sp.publish(userService, "localhost", 9999);
  87. }

8.jpg

III. 编写客户端程序(运用一致性哈希算法实现负载均衡

  1. 封装客户端接口:
  2. public class ServiceConsumer {
  3.    /**
  4.     * 提供远程服务的服务器列表, 只记录远程服务的url
  5.     */
  6.    private volatile List<String> urls = new LinkedList<>();
  7.    /**
  8.     * 远程服务对应的虚拟节点集合
  9.     */
  10.    private static TreeMap<Integer, String> virtualNodes = new TreeMap<>();

  11.    public ServiceConsumer(){
  12.        ZooKeeper zk = connectToZK();//客户端连接到zookeeper
  13.        if(null != zk){
  14.            //连接上后关注zookeeper中的节点变化(服务器变化)
  15.            watchNode(zk);
  16.        }
  17.    }

  18.    private void watchNode(final ZooKeeper zk) {
  19.        try{
  20.            //观察/provider节点下的子节点是否有变化(是否有服务器登入或登出)
  21.            List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY, new Watcher() {
  22.                @Override
  23.                public void process(WatchedEvent watchedEvent) {
  24.                    //如果服务器节点有变化就重新获取
  25.                    if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
  26.                        System.out.println("服务器端有变化, 可能有旧服务器宕机或者新服务器加入集群...");
  27.                        watchNode(zk);
  28.                    }
  29.                }
  30.            });
  31.            //将获取到的服务器节点数据保存到集合中, 也就是获得了远程服务的访问url地址
  32.            List<String> dataList = new LinkedList<>();
  33.            TreeMap<Integer, String> newVirtualNodesList = new TreeMap<>();
  34.            for(String nodeStr : nodeList){
  35.                byte[] data = zk.getData(Constants.ZK_REGISTRY + "/" + nodeStr, false, null);
  36.                //放入服务器列表的url
  37.                String url = new String(data);
  38.                //为每个服务器分配虚拟节点, 为了方便模拟, 默认开启在9999端口的服务器性能较差, 只分配300个虚拟节点, 其他分配1000个.
  39.                if(url.contains("9999")){
  40.                    for(int i = 1; i <= 300; i++){
  41.                        newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
  42.                    }
  43.                }else{
  44.                    for(int i = 1; i <= 1000; i++){
  45.                        newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
  46.                    }
  47.                }
  48.                dataList.add(url);
  49.            }
  50.            urls = dataList;
  51.            virtualNodes = newVirtualNodesList;
  52.            dataList = null;//好让垃圾回收器尽快收集
  53.            newVirtualNodesList = null;
  54.        } catch (Exception e) {
  55.            e.printStackTrace();
  56.        }
  57.    }

  58.    /**
  59.     * 根据url获得远程服务对象
  60.     */
  61.    public <T> T lookUpService(String url){
  62.        T remote = null;
  63.        try{
  64.            remote = (T)Naming.lookup(url);
  65.        } catch (Exception e) {
  66.            //如果该url连接不上, 很有可能是该服务器挂了, 这时使用服务器列表中的第一个服务器url重新获取远程对象.
  67.            if(e instanceof ConnectException){
  68.                if (urls.size() != 0){
  69.                    url = urls.get(0);
  70.                    return lookUpService(url);
  71.                }
  72.            }
  73.        }
  74.        return remote;
  75.    }

  76.    /**
  77.     * 通过一致性哈希算法, 选取一个url, 最后返回一个远程服务对象
  78.     */
  79.    public <T extends Remote> T lookUp(){
  80.        T service = null;
  81.        //随机计算一个哈希值
  82.        int hash = FVNHash(Math.random() * 10000 + "");
  83.        //得到大于该哈希值的所有map集合
  84.        SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
  85.        //找到比该值大的第一个虚拟节点, 如果没有比它大的虚拟节点, 根据哈希环, 则返回第一个节点.
  86.        Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();
  87.        //通过该虚拟节点获得服务器url
  88.        String virtualNodeName = virtualNodes.get(targetKey);
  89.        String url = virtualNodeName.split("@")[0];
  90.        //根据服务器url获取远程服务对象
  91.        service = lookUpService(url);
  92.        System.out.print("提供本次服务的地址为: " + url + ", 返回结果: ");
  93.        return service;
  94.    }

  95.    private CountDownLatch latch = new CountDownLatch(1);

  96.    public ZooKeeper connectToZK(){
  97.        ZooKeeper zk = null;
  98.        try {
  99.            zk = new ZooKeeper(Constants.ZK_HOST, Constants.ZK_TIME_OUT, new Watcher() {
  100.                @Override
  101.                public void process(WatchedEvent watchedEvent) {
  102.                    //判断是否连接zk集群
  103.                    latch.countDown();//唤醒处于等待状态的当前线程
  104.                }
  105.            });
  106.            latch.await();//没有连接上的时候当前线程处于等待状态.
  107.        } catch (IOException e) {
  108.            e.printStackTrace();
  109.        } catch (InterruptedException e) {
  110.            e.printStackTrace();
  111.        }
  112.        return zk;
  113.    }


  114.    public static int FVNHash(String data){
  115.        final int p = 16777619;
  116.        int hash = (int)2166136261L;
  117.        for(int i = 0; i < data.length(); i++)
  118.            hash = (hash ^ data.charAt(i)) * p;
  119.        hash += hash << 13;
  120.        hash ^= hash >> 7;
  121.        hash += hash << 3;
  122.        hash ^= hash >> 17;
  123.        hash += hash << 5;
  124.        return hash < 0 ? Math.abs(hash) : hash;
  125.    }
  126. }
  127. 启动客户端进行测试:
  128. public static void main(String[] args){
  129.    ServiceConsumer sc = new ServiceConsumer();//创建工具类对象
  130.    while(true){
  131.        //获得rmi远程服务对象
  132.        UserService userService = sc.lookUp();
  133.        try{
  134.            //调用远程方法
  135.            String result = userService.helloRmi("炭烧生蚝");
  136.            System.out.println(result);
  137.            Thread.sleep(100);
  138.        }catch(Exception e){
  139.            e.printStackTrace();
  140.        }
  141.    }
  142. }
  143. 客户端跑起来后, 在显示台不断进行打印...下面将对数据进行统计。

9.jpg

10.png

IV. 对服务器调用数据进行统计分析

重温一遍模拟的过程: 首先分别在7777, 8888, 9999端口启动了3台服务器. 然后启动客户端进行访问. 7777, 8888端口的两台服务器设置性能指数为1000, 而9999端口的服务器性能指数设置为300。

在客户端运行期间, 我手动关闭了8888端口的服务器, 客户端正常打印出服务器变化信息。此时理论上不会有访问被路由到8888端口的服务器。当我重新启动8888端口服务器时, 客户端打印出服务器变化信息, 访问能正常到达8888端口服务器。

下面对各服务器的访问量进行统计, 看是否实现了负载均衡。

测试程序如下:

  1. public class DataStatistics {
  2.    private static float ReqToPort7777 = 0;
  3.    private static float ReqToPort8888 = 0;
  4.    private static float ReqToPort9999 = 0;

  5.    public static void main(String[] args) {
  6.        BufferedReader br = null;
  7.        try {
  8.            br = new BufferedReader(new FileReader("C://test.txt"));
  9.            String line = null;
  10.            while(null != (line = br.readLine())){
  11.                if(line.contains("7777")){
  12.                    ReqToPort7777++;
  13.                }else if(line.contains("8888")){
  14.                    ReqToPort8888++;
  15.                }else if(line.contains("9999")){
  16.                    ReqToPort9999++;
  17.                }else{
  18.                    print(false);
  19.                }
  20.            }
  21.            print(true);
  22.        } catch (Exception e) {
  23.            e.printStackTrace();
  24.        }finally {
  25.            if(null != br){
  26.                try {
  27.                    br.close();
  28.                } catch (IOException e) {
  29.                    e.printStackTrace();
  30.                }
  31.                br = null;
  32.            }
  33.        }
  34.    }

  35.    private static void print(boolean isEnd){
  36.        if(!isEnd){
  37.            System.out.println("------------- 服务器集群发生变化 -------------");
  38.        }else{
  39.            System.out.println("------------- 最后一次统计 -------------");
  40.        }
  41.        System.out.println("截取自上次服务器变化到现在: ");
  42.        float total = ReqToPort7777 + ReqToPort8888 + ReqToPort9999;
  43.        System.out.println("7777端口服务器访问量为: " + ReqToPort7777 + ", 占比" + (ReqToPort7777 / total));
  44.        System.out.println("8888端口服务器访问量为: " + ReqToPort8888 + ", 占比" + (ReqToPort8888 / total));
  45.        System.out.println("9999端口服务器访问量为: " + ReqToPort9999 + ", 占比" + (ReqToPort9999 / total));
  46.        ReqToPort7777 = 0;
  47.        ReqToPort8888 = 0;
  48.        ReqToPort9999 = 0;
  49.    }
  50. }

  51. /* 以下是输出结果
  52. ------------- 服务器集群发生变化 -------------
  53. 截取自上次服务器变化到现在:
  54. 7777端口服务器访问量为: 198.0, 占比0.4419643
  55. 8888端口服务器访问量为: 184.0, 占比0.4107143
  56. 9999端口服务器访问量为: 66.0, 占比0.14732143
  57. ------------- 服务器集群发生变化 -------------
  58. 截取自上次服务器变化到现在:
  59. 7777端口服务器访问量为: 510.0, 占比0.7589286
  60. 8888端口服务器访问量为: 1.0, 占比0.0014880953
  61. 9999端口服务器访问量为: 161.0, 占比0.23958333
  62. ------------- 最后一次统计 -------------
  63. 截取自上次服务器变化到现在:
  64. 7777端口服务器访问量为: 410.0, 占比0.43248945
  65. 8888端口服务器访问量为: 398.0, 占比0.41983122
  66. 9999端口服务器访问量为: 140.0, 占比0.14767933
  67. */

V. 结果

从测试数据可以看出, 不管是8888端口服务器宕机之前, 还是宕机之后, 三台服务器接收的访问量和性能指数成正比,成功地验证了一致性哈希算法的负载均衡作用。

四. 扩展思考

初识一致性哈希算法的时候, 对这种奇特的思路佩服得五体投地。但是一致性哈希算法除了能够让后端服务器实现负载均衡, 还有一个特点可能是其他负载均衡算法所不具备的。

这个特点是基于哈希函数的, 我们知道通过哈希函数, 固定的输入能够产生固定的输出. 换句话说, 同样的请求会路由到相同的服务器. 这点就很牛逼了, 我们可以结合一致性哈希算法和缓存机制提供后端服务器的性能。

比如说在一个分布式系统中, 有一个服务器集群提供查询用户信息的方法, 每个请求将会带着用户的 uid到达, 我们可以通过哈希函数进行处理(从上面的演示代码可以看到, 这点是可以轻松实现的), 使同样的 uid路由到某个独定的服务器. 这样我们就可以在服务器上对该的 uid背后的用户信息进行缓存, 从而减少对数据库或其他中间件的操作, 从而提高系统效率。

当然如果使用该策略的话, 你可能还要考虑缓存更新等操作, 但作为一种优良的策略, 我们可以考虑在适当的场合灵活运用。

以上思考受启发于 Dubbo框架中对其实现的四种负载均衡策略的描述。


相关实践学习
通过ACR快速部署网站应用
本次实验任务是在云上基于ECS部署Docker环境,制作网站镜像并上传至ACR镜像仓库,通过容器镜像运行网站应用,网站运行在Docker容器中、网站业务数据存储在Mariadb数据库中、网站文件数据存储在服务器ECS云盘中,通过公网地址进行访问。
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
1天前
|
Java jenkins 持续交付
Jenkins是开源CI/CD工具,用于自动化Java项目构建、测试和部署。通过配置源码管理、构建触发器、执行Maven目标,实现代码提交即触发构建和测试
【7月更文挑战第1天】Jenkins是开源CI/CD工具,用于自动化Java项目构建、测试和部署。通过配置源码管理、构建触发器、执行Maven目标,实现代码提交即触发构建和测试。成功后,Jenkins执行部署任务,发布到服务器或云环境。使用Jenkins能提升效率,保证软件质量,加速上线,并需维护其稳定运行。
11 0
|
1天前
|
XML 测试技术 数据格式
《手把手教你》系列基础篇(八十三)-java+ selenium自动化测试-框架设计基础-TestNG测试报告-下篇(详解教程)
【7月更文挑战第1天】使用TestNG自定义报告的简要说明: - TestNG提供默认的HTML和XML报告,但可通过实现IReporter接口创建自定义报告。 - 自定义报告器类需扩展`CustomReporter.java`,实现`generateReport()`方法,接收XML套房、测试结果及输出目录作为参数。
10 0
|
1天前
|
存储 NoSQL Java
探索Java分布式锁:在高并发环境下的同步访问实现与优化
【7月更文挑战第1天】在分布式系统中,Java分布式锁解决了多节点共享资源的同步访问问题,确保数据一致性。常见的实现包括Redis的SETNX和过期时间、ZooKeeper的临时有序节点、数据库操作及Java并发库。优化策略涉及锁超时、续期、公平性及性能。选择合适的锁策略对高并发系统的稳定性和性能至关重要。
10 0
|
1天前
|
Java 测试技术 Maven
如何使用Java进行单元测试
如何使用Java进行单元测试
|
1天前
|
Java API 数据处理
Java 8的新特性详解
Java 8的新特性详解
|
1天前
|
Java 测试技术
Java中的测试驱动开发(TDD)实践
Java中的测试驱动开发(TDD)实践
|
1天前
|
Java API 开发者
Java版本对比:特性、升级改动与优势分析
Java版本对比:特性、升级改动与优势分析
9 0
|
2天前
|
负载均衡 Java 测试技术
性能测试与负载均衡:保证Java应用的稳定性
性能测试与负载均衡:保证Java应用的稳定性
|
2天前
|
监控 Java 测试技术
Java性能测试与调优工具使用指南
Java性能测试与调优工具使用指南
|
2天前
|
Java 测试技术 数据库
Java单元测试与集成测试的最佳实践
Java单元测试与集成测试的最佳实践