II. 创建服务器集群, 提供RPC远程调用服务
- 首先创建一个服务器项目(使用Maven), 添加
zookeeper依赖 - 创建常量接口, 用于存储连接
zookeeper的信息 public interface Constant {//zookeeper集群的地址String ZK_HOST = "192.168.117.129:2181,192.168.117.129:2182,192.168.117.129:2183";//连接zookeeper的超时时间int ZK_TIME_OUT = 5000;//服务器所发布的远程服务在zookeeper中的注册地址, 也就是说这个节点中保存了各个服务器提供的接口String ZK_REGISTRY = "/provider";//zookeeper集群中注册服务的url地址的瞬时节点String ZK_RMI = ZK_REGISTRY + "/rmi";}
3.封装操作 zookeeper和发布远程服务的接口供自己调用, 本案例中发布远程服务使用Java自身提供的 rmi包完成, 如果没有了解过可以参考这篇
public class ServiceProvider {private CountDownLatch latch = new CountDownLatch(1);/*** 连接zookeeper集群*/public ZooKeeper connectToZK(){ZooKeeper zk = null;try {zk = new ZooKeeper(Constant.ZK_HOST, Constant.ZK_TIME_OUT, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//如果连接上了就唤醒当前线程.latch.countDown();}});latch.await();//还没连接上时当前线程等待} catch (Exception e) {e.printStackTrace();}return zk;}/*** 创建znode节点* @param zk* @param url 节点中写入的数据*/public void createNode(ZooKeeper zk, String url){try{//要把写入的数据转化为字节数组byte[] data = url.getBytes();zk.create(Constant.ZK_RMI, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);} catch (Exception e) {e.printStackTrace();}}/*** 发布rmi服务*/public String publishService(Remote remote, String host, int port){String url = null;try{LocateRegistry.createRegistry(port);url = "rmi://" + host + ":" + port + "/rmiService";Naming.bind(url, remote);} catch (Exception e) {e.printStackTrace();}return url;}/*** 发布rmi服务, 并且将服务的url注册到zookeeper集群中*/public void publish(Remote remote, String host, int port){//调用publishService, 得到服务的url地址String url = publishService(remote, host, port);if(null != url){ZooKeeper zk = connectToZK();//连接到zookeeperif(null != zk){createNode(zk, url);}}}}- 自定义远程服务. 服务提供一个简单的方法: 客户端发来一个字符串, 服务器在字符串前面添加上
Hello, 并返回字符串。 //UserServicepublic interface UserService extends Remote {public String helloRmi(String name) throws RemoteException;}//UserServiceImplpublic class UserServiceImpl implements UserService {public UserServiceImpl() throws RemoteException{super();}@Overridepublic String helloRmi(String name) throws RemoteException {return "Hello " + name + "!";}}- 修改端口号, 启动多个java虚拟机, 模拟服务器集群. 为了方便演示, 自定义7777, 8888, 9999端口开启3个服务器进程, 到时会模拟7777端口的服务器宕机和修复重连。
public static void main(String[] args) throws RemoteException {//创建工具类对象ServiceProvider sp = new ServiceProvider();//创建远程服务对象UserService userService = new UserServiceImpl();//完成发布sp.publish(userService, "localhost", 9999);}
III. 编写客户端程序(运用一致性哈希算法实现负载均衡
- 封装客户端接口:
public class ServiceConsumer {/*** 提供远程服务的服务器列表, 只记录远程服务的url*/private volatile List<String> urls = new LinkedList<>();/*** 远程服务对应的虚拟节点集合*/private static TreeMap<Integer, String> virtualNodes = new TreeMap<>();public ServiceConsumer(){ZooKeeper zk = connectToZK();//客户端连接到zookeeperif(null != zk){//连接上后关注zookeeper中的节点变化(服务器变化)watchNode(zk);}}private void watchNode(final ZooKeeper zk) {try{//观察/provider节点下的子节点是否有变化(是否有服务器登入或登出)List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//如果服务器节点有变化就重新获取if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){System.out.println("服务器端有变化, 可能有旧服务器宕机或者新服务器加入集群...");watchNode(zk);}}});//将获取到的服务器节点数据保存到集合中, 也就是获得了远程服务的访问url地址List<String> dataList = new LinkedList<>();TreeMap<Integer, String> newVirtualNodesList = new TreeMap<>();for(String nodeStr : nodeList){byte[] data = zk.getData(Constants.ZK_REGISTRY + "/" + nodeStr, false, null);//放入服务器列表的urlString url = new String(data);//为每个服务器分配虚拟节点, 为了方便模拟, 默认开启在9999端口的服务器性能较差, 只分配300个虚拟节点, 其他分配1000个.if(url.contains("9999")){for(int i = 1; i <= 300; i++){newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);}}else{for(int i = 1; i <= 1000; i++){newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);}}dataList.add(url);}urls = dataList;virtualNodes = newVirtualNodesList;dataList = null;//好让垃圾回收器尽快收集newVirtualNodesList = null;} catch (Exception e) {e.printStackTrace();}}/*** 根据url获得远程服务对象*/public <T> T lookUpService(String url){T remote = null;try{remote = (T)Naming.lookup(url);} catch (Exception e) {//如果该url连接不上, 很有可能是该服务器挂了, 这时使用服务器列表中的第一个服务器url重新获取远程对象.if(e instanceof ConnectException){if (urls.size() != 0){url = urls.get(0);return lookUpService(url);}}}return remote;}/*** 通过一致性哈希算法, 选取一个url, 最后返回一个远程服务对象*/public <T extends Remote> T lookUp(){T service = null;//随机计算一个哈希值int hash = FVNHash(Math.random() * 10000 + "");//得到大于该哈希值的所有map集合SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);//找到比该值大的第一个虚拟节点, 如果没有比它大的虚拟节点, 根据哈希环, 则返回第一个节点.Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();//通过该虚拟节点获得服务器urlString virtualNodeName = virtualNodes.get(targetKey);String url = virtualNodeName.split("@")[0];//根据服务器url获取远程服务对象service = lookUpService(url);System.out.print("提供本次服务的地址为: " + url + ", 返回结果: ");return service;}private CountDownLatch latch = new CountDownLatch(1);public ZooKeeper connectToZK(){ZooKeeper zk = null;try {zk = new ZooKeeper(Constants.ZK_HOST, Constants.ZK_TIME_OUT, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//判断是否连接zk集群latch.countDown();//唤醒处于等待状态的当前线程}});latch.await();//没有连接上的时候当前线程处于等待状态.} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}return zk;}public static int FVNHash(String data){final int p = 16777619;int hash = (int)2166136261L;for(int i = 0; i < data.length(); i++)hash = (hash ^ data.charAt(i)) * p;hash += hash << 13;hash ^= hash >> 7;hash += hash << 3;hash ^= hash >> 17;hash += hash << 5;return hash < 0 ? Math.abs(hash) : hash;}}- 启动客户端进行测试:
public static void main(String[] args){ServiceConsumer sc = new ServiceConsumer();//创建工具类对象while(true){//获得rmi远程服务对象UserService userService = sc.lookUp();try{//调用远程方法String result = userService.helloRmi("炭烧生蚝");System.out.println(result);Thread.sleep(100);}catch(Exception e){e.printStackTrace();}}}- 客户端跑起来后, 在显示台不断进行打印...下面将对数据进行统计。
IV. 对服务器调用数据进行统计分析
重温一遍模拟的过程: 首先分别在7777, 8888, 9999端口启动了3台服务器. 然后启动客户端进行访问. 7777, 8888端口的两台服务器设置性能指数为1000, 而9999端口的服务器性能指数设置为300。
在客户端运行期间, 我手动关闭了8888端口的服务器, 客户端正常打印出服务器变化信息。此时理论上不会有访问被路由到8888端口的服务器。当我重新启动8888端口服务器时, 客户端打印出服务器变化信息, 访问能正常到达8888端口服务器。
下面对各服务器的访问量进行统计, 看是否实现了负载均衡。
测试程序如下:
public class DataStatistics {private static float ReqToPort7777 = 0;private static float ReqToPort8888 = 0;private static float ReqToPort9999 = 0;public static void main(String[] args) {BufferedReader br = null;try {br = new BufferedReader(new FileReader("C://test.txt"));String line = null;while(null != (line = br.readLine())){if(line.contains("7777")){ReqToPort7777++;}else if(line.contains("8888")){ReqToPort8888++;}else if(line.contains("9999")){ReqToPort9999++;}else{print(false);}}print(true);} catch (Exception e) {e.printStackTrace();}finally {if(null != br){try {br.close();} catch (IOException e) {e.printStackTrace();}br = null;}}}private static void print(boolean isEnd){if(!isEnd){System.out.println("------------- 服务器集群发生变化 -------------");}else{System.out.println("------------- 最后一次统计 -------------");}System.out.println("截取自上次服务器变化到现在: ");float total = ReqToPort7777 + ReqToPort8888 + ReqToPort9999;System.out.println("7777端口服务器访问量为: " + ReqToPort7777 + ", 占比" + (ReqToPort7777 / total));System.out.println("8888端口服务器访问量为: " + ReqToPort8888 + ", 占比" + (ReqToPort8888 / total));System.out.println("9999端口服务器访问量为: " + ReqToPort9999 + ", 占比" + (ReqToPort9999 / total));ReqToPort7777 = 0;ReqToPort8888 = 0;ReqToPort9999 = 0;}}/* 以下是输出结果------------- 服务器集群发生变化 -------------截取自上次服务器变化到现在:7777端口服务器访问量为: 198.0, 占比0.44196438888端口服务器访问量为: 184.0, 占比0.41071439999端口服务器访问量为: 66.0, 占比0.14732143------------- 服务器集群发生变化 -------------截取自上次服务器变化到现在:7777端口服务器访问量为: 510.0, 占比0.75892868888端口服务器访问量为: 1.0, 占比0.00148809539999端口服务器访问量为: 161.0, 占比0.23958333------------- 最后一次统计 -------------截取自上次服务器变化到现在:7777端口服务器访问量为: 410.0, 占比0.432489458888端口服务器访问量为: 398.0, 占比0.419831229999端口服务器访问量为: 140.0, 占比0.14767933*/
V. 结果
从测试数据可以看出, 不管是8888端口服务器宕机之前, 还是宕机之后, 三台服务器接收的访问量和性能指数成正比,成功地验证了一致性哈希算法的负载均衡作用。
四. 扩展思考
初识一致性哈希算法的时候, 对这种奇特的思路佩服得五体投地。但是一致性哈希算法除了能够让后端服务器实现负载均衡, 还有一个特点可能是其他负载均衡算法所不具备的。
这个特点是基于哈希函数的, 我们知道通过哈希函数, 固定的输入能够产生固定的输出. 换句话说, 同样的请求会路由到相同的服务器. 这点就很牛逼了, 我们可以结合一致性哈希算法和缓存机制提供后端服务器的性能。
比如说在一个分布式系统中, 有一个服务器集群提供查询用户信息的方法, 每个请求将会带着用户的 uid到达, 我们可以通过哈希函数进行处理(从上面的演示代码可以看到, 这点是可以轻松实现的), 使同样的 uid路由到某个独定的服务器. 这样我们就可以在服务器上对该的 uid背后的用户信息进行缓存, 从而减少对数据库或其他中间件的操作, 从而提高系统效率。
当然如果使用该策略的话, 你可能还要考虑缓存更新等操作, 但作为一种优良的策略, 我们可以考虑在适当的场合灵活运用。
以上思考受启发于 Dubbo框架中对其实现的四种负载均衡策略的描述。


