II. 创建服务器集群, 提供RPC远程调用服务
- 首先创建一个服务器项目(使用Maven), 添加
zookeeper
依赖 - 创建常量接口, 用于存储连接
zookeeper
的信息 public interface Constant {
//zookeeper集群的地址
String ZK_HOST = "192.168.117.129:2181,192.168.117.129:2182,192.168.117.129:2183";
//连接zookeeper的超时时间
int ZK_TIME_OUT = 5000;
//服务器所发布的远程服务在zookeeper中的注册地址, 也就是说这个节点中保存了各个服务器提供的接口
String ZK_REGISTRY = "/provider";
//zookeeper集群中注册服务的url地址的瞬时节点
String ZK_RMI = ZK_REGISTRY + "/rmi";
}
3.封装操作 zookeeper
和发布远程服务的接口供自己调用, 本案例中发布远程服务使用Java自身提供的 rmi
包完成, 如果没有了解过可以参考这篇
public class ServiceProvider {
private CountDownLatch latch = new CountDownLatch(1);
/**
* 连接zookeeper集群
*/
public ZooKeeper connectToZK(){
ZooKeeper zk = null;
try {
zk = new ZooKeeper(Constant.ZK_HOST, Constant.ZK_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//如果连接上了就唤醒当前线程.
latch.countDown();
}
});
latch.await();//还没连接上时当前线程等待
} catch (Exception e) {
e.printStackTrace();
}
return zk;
}
/**
* 创建znode节点
* @param zk
* @param url 节点中写入的数据
*/
public void createNode(ZooKeeper zk, String url){
try{
//要把写入的数据转化为字节数组
byte[] data = url.getBytes();
zk.create(Constant.ZK_RMI, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发布rmi服务
*/
public String publishService(Remote remote, String host, int port){
String url = null;
try{
LocateRegistry.createRegistry(port);
url = "rmi://" + host + ":" + port + "/rmiService";
Naming.bind(url, remote);
} catch (Exception e) {
e.printStackTrace();
}
return url;
}
/**
* 发布rmi服务, 并且将服务的url注册到zookeeper集群中
*/
public void publish(Remote remote, String host, int port){
//调用publishService, 得到服务的url地址
String url = publishService(remote, host, port);
if(null != url){
ZooKeeper zk = connectToZK();//连接到zookeeper
if(null != zk){
createNode(zk, url);
}
}
}
}
- 自定义远程服务. 服务提供一个简单的方法: 客户端发来一个字符串, 服务器在字符串前面添加上
Hello
, 并返回字符串。 //UserService
public interface UserService extends Remote {
public String helloRmi(String name) throws RemoteException;
}
//UserServiceImpl
public class UserServiceImpl implements UserService {
public UserServiceImpl() throws RemoteException{
super();
}
@Override
public String helloRmi(String name) throws RemoteException {
return "Hello " + name + "!";
}
}
- 修改端口号, 启动多个java虚拟机, 模拟服务器集群. 为了方便演示, 自定义7777, 8888, 9999端口开启3个服务器进程, 到时会模拟7777端口的服务器宕机和修复重连。
public static void main(String[] args) throws RemoteException {
//创建工具类对象
ServiceProvider sp = new ServiceProvider();
//创建远程服务对象
UserService userService = new UserServiceImpl();
//完成发布
sp.publish(userService, "localhost", 9999);
}
III. 编写客户端程序(运用一致性哈希算法实现负载均衡
- 封装客户端接口:
public class ServiceConsumer {
/**
* 提供远程服务的服务器列表, 只记录远程服务的url
*/
private volatile List<String> urls = new LinkedList<>();
/**
* 远程服务对应的虚拟节点集合
*/
private static TreeMap<Integer, String> virtualNodes = new TreeMap<>();
public ServiceConsumer(){
ZooKeeper zk = connectToZK();//客户端连接到zookeeper
if(null != zk){
//连接上后关注zookeeper中的节点变化(服务器变化)
watchNode(zk);
}
}
private void watchNode(final ZooKeeper zk) {
try{
//观察/provider节点下的子节点是否有变化(是否有服务器登入或登出)
List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//如果服务器节点有变化就重新获取
if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
System.out.println("服务器端有变化, 可能有旧服务器宕机或者新服务器加入集群...");
watchNode(zk);
}
}
});
//将获取到的服务器节点数据保存到集合中, 也就是获得了远程服务的访问url地址
List<String> dataList = new LinkedList<>();
TreeMap<Integer, String> newVirtualNodesList = new TreeMap<>();
for(String nodeStr : nodeList){
byte[] data = zk.getData(Constants.ZK_REGISTRY + "/" + nodeStr, false, null);
//放入服务器列表的url
String url = new String(data);
//为每个服务器分配虚拟节点, 为了方便模拟, 默认开启在9999端口的服务器性能较差, 只分配300个虚拟节点, 其他分配1000个.
if(url.contains("9999")){
for(int i = 1; i <= 300; i++){
newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
}
}else{
for(int i = 1; i <= 1000; i++){
newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
}
}
dataList.add(url);
}
urls = dataList;
virtualNodes = newVirtualNodesList;
dataList = null;//好让垃圾回收器尽快收集
newVirtualNodesList = null;
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 根据url获得远程服务对象
*/
public <T> T lookUpService(String url){
T remote = null;
try{
remote = (T)Naming.lookup(url);
} catch (Exception e) {
//如果该url连接不上, 很有可能是该服务器挂了, 这时使用服务器列表中的第一个服务器url重新获取远程对象.
if(e instanceof ConnectException){
if (urls.size() != 0){
url = urls.get(0);
return lookUpService(url);
}
}
}
return remote;
}
/**
* 通过一致性哈希算法, 选取一个url, 最后返回一个远程服务对象
*/
public <T extends Remote> T lookUp(){
T service = null;
//随机计算一个哈希值
int hash = FVNHash(Math.random() * 10000 + "");
//得到大于该哈希值的所有map集合
SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
//找到比该值大的第一个虚拟节点, 如果没有比它大的虚拟节点, 根据哈希环, 则返回第一个节点.
Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();
//通过该虚拟节点获得服务器url
String virtualNodeName = virtualNodes.get(targetKey);
String url = virtualNodeName.split("@")[0];
//根据服务器url获取远程服务对象
service = lookUpService(url);
System.out.print("提供本次服务的地址为: " + url + ", 返回结果: ");
return service;
}
private CountDownLatch latch = new CountDownLatch(1);
public ZooKeeper connectToZK(){
ZooKeeper zk = null;
try {
zk = new ZooKeeper(Constants.ZK_HOST, Constants.ZK_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//判断是否连接zk集群
latch.countDown();//唤醒处于等待状态的当前线程
}
});
latch.await();//没有连接上的时候当前线程处于等待状态.
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return zk;
}
public static int FVNHash(String data){
final int p = 16777619;
int hash = (int)2166136261L;
for(int i = 0; i < data.length(); i++)
hash = (hash ^ data.charAt(i)) * p;
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
return hash < 0 ? Math.abs(hash) : hash;
}
}
- 启动客户端进行测试:
public static void main(String[] args){
ServiceConsumer sc = new ServiceConsumer();//创建工具类对象
while(true){
//获得rmi远程服务对象
UserService userService = sc.lookUp();
try{
//调用远程方法
String result = userService.helloRmi("炭烧生蚝");
System.out.println(result);
Thread.sleep(100);
}catch(Exception e){
e.printStackTrace();
}
}
}
- 客户端跑起来后, 在显示台不断进行打印...下面将对数据进行统计。
IV. 对服务器调用数据进行统计分析
重温一遍模拟的过程: 首先分别在7777, 8888, 9999端口启动了3台服务器. 然后启动客户端进行访问. 7777, 8888端口的两台服务器设置性能指数为1000, 而9999端口的服务器性能指数设置为300。
在客户端运行期间, 我手动关闭了8888端口的服务器, 客户端正常打印出服务器变化信息。此时理论上不会有访问被路由到8888端口的服务器。当我重新启动8888端口服务器时, 客户端打印出服务器变化信息, 访问能正常到达8888端口服务器。
下面对各服务器的访问量进行统计, 看是否实现了负载均衡。
测试程序如下:
public class DataStatistics {
private static float ReqToPort7777 = 0;
private static float ReqToPort8888 = 0;
private static float ReqToPort9999 = 0;
public static void main(String[] args) {
BufferedReader br = null;
try {
br = new BufferedReader(new FileReader("C://test.txt"));
String line = null;
while(null != (line = br.readLine())){
if(line.contains("7777")){
ReqToPort7777++;
}else if(line.contains("8888")){
ReqToPort8888++;
}else if(line.contains("9999")){
ReqToPort9999++;
}else{
print(false);
}
}
print(true);
} catch (Exception e) {
e.printStackTrace();
}finally {
if(null != br){
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
br = null;
}
}
}
private static void print(boolean isEnd){
if(!isEnd){
System.out.println("------------- 服务器集群发生变化 -------------");
}else{
System.out.println("------------- 最后一次统计 -------------");
}
System.out.println("截取自上次服务器变化到现在: ");
float total = ReqToPort7777 + ReqToPort8888 + ReqToPort9999;
System.out.println("7777端口服务器访问量为: " + ReqToPort7777 + ", 占比" + (ReqToPort7777 / total));
System.out.println("8888端口服务器访问量为: " + ReqToPort8888 + ", 占比" + (ReqToPort8888 / total));
System.out.println("9999端口服务器访问量为: " + ReqToPort9999 + ", 占比" + (ReqToPort9999 / total));
ReqToPort7777 = 0;
ReqToPort8888 = 0;
ReqToPort9999 = 0;
}
}
/* 以下是输出结果
------------- 服务器集群发生变化 -------------
截取自上次服务器变化到现在:
7777端口服务器访问量为: 198.0, 占比0.4419643
8888端口服务器访问量为: 184.0, 占比0.4107143
9999端口服务器访问量为: 66.0, 占比0.14732143
------------- 服务器集群发生变化 -------------
截取自上次服务器变化到现在:
7777端口服务器访问量为: 510.0, 占比0.7589286
8888端口服务器访问量为: 1.0, 占比0.0014880953
9999端口服务器访问量为: 161.0, 占比0.23958333
------------- 最后一次统计 -------------
截取自上次服务器变化到现在:
7777端口服务器访问量为: 410.0, 占比0.43248945
8888端口服务器访问量为: 398.0, 占比0.41983122
9999端口服务器访问量为: 140.0, 占比0.14767933
*/
V. 结果
从测试数据可以看出, 不管是8888端口服务器宕机之前, 还是宕机之后, 三台服务器接收的访问量和性能指数成正比,成功地验证了一致性哈希算法的负载均衡作用。
四. 扩展思考
初识一致性哈希算法的时候, 对这种奇特的思路佩服得五体投地。但是一致性哈希算法除了能够让后端服务器实现负载均衡, 还有一个特点可能是其他负载均衡算法所不具备的。
这个特点是基于哈希函数的, 我们知道通过哈希函数, 固定的输入能够产生固定的输出. 换句话说, 同样的请求会路由到相同的服务器. 这点就很牛逼了, 我们可以结合一致性哈希算法和缓存机制提供后端服务器的性能。
比如说在一个分布式系统中, 有一个服务器集群提供查询用户信息的方法, 每个请求将会带着用户的 uid
到达, 我们可以通过哈希函数进行处理(从上面的演示代码可以看到, 这点是可以轻松实现的), 使同样的 uid
路由到某个独定的服务器. 这样我们就可以在服务器上对该的 uid
背后的用户信息进行缓存, 从而减少对数据库或其他中间件的操作, 从而提高系统效率。
当然如果使用该策略的话, 你可能还要考虑缓存更新等操作, 但作为一种优良的策略, 我们可以考虑在适当的场合灵活运用。
以上思考受启发于 Dubbo
框架中对其实现的四种负载均衡策略的描述。