1.1:Zookeeper API(原生)
把上次写的一些命令放到客户端来操作:比如增删改查,判断节点是否存在等等的一些操作。
Zookeeper的几个状态:
1.1.1:首先添加zookeeper相应的依赖:
1. <dependency> 2. <groupId>org.springframework.cloud</groupId> 3. <artifactId>spring-cloud-starter-zookeeper-discovery</artifactId> 4. </dependency>
1.1.2:创建zookeeper的对象:
第一个参数是连接zookeeper的字符串,第二个参数连接 zookeeper的超时时间(时间以毫秒值为单位)。第三个参数是监听器 Watcher
1.1.3:创建Zookeeper的会话
1. package com.weizhaoyang.zkclient.zkstatus; 2. import org.apache.zookeeper.ZooKeeper; 3. import java.io.IOException; 4. publicclassZkStatus{ 5. publicstaticvoid main(String[] args){ 6. try{ 7. //这样的一个会话就建立起来了 8. ZooKeeper zookeeper= 9. newZooKeeper("172.20.10.9:2181",3000,null); 10. }catch(IOException e){ 11. e.printStackTrace(); 12. } 13. } 14. }
1.1.4:现在启动下主函数:这就连到了zookeeper上。
1.1.5:创建一个zookeeper的监视器,来监听Zookeeper的状态,实现Watcher的接口,或者写内部类,我这里写实现Watcher接口。
里面有一个方法process:当zookeeper进行连接的时候,有对应的监视器,这个监听器代表当前的ZkStatus的这个对象,然后就会回调这个process方法,这里的监听方法只回调一次。
1. package com.example.zkstatus; 2. 3. import org.apache.zookeeper.WatchedEvent; 4. import org.apache.zookeeper.Watcher; 5. import org.apache.zookeeper.ZooKeeper; 6. 7. import java.io.IOException; 8. 9. publicclassZkStatusimplementsWatcher{ 10. publicstaticvoid main(String[] args){ 11. try{ 12. ZooKeeper zooKeeper =newZooKeeper("172.20.10.9:2181",3000,newZkStatus()); 13. }catch(IOException e){ 14. e.printStackTrace(); 15. } 16. } 17. 18. @Override 19. publicvoid process(WatchedEvent watchedEvent){ 20. System.out.println("---------"); 21. 22. } 23. }
但是运行的时候没有打印:因为主线程和调用监听器的线程是两个不同的线程。当监听器的线程还没有运行完,主线程就终止了,他们是异步的。这个监听器主要用来监听zookeeper的状态,以及和某个节点的状态。
1.1.6:解决上面的问题可以有两种方法:
a:让主线程沉睡代码如下:
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
再次运行主函数的话,输出的结果如下:
b:闭锁的方式:
上面的方法不怎么好,因为你也不知道它沉睡多久,做法应该当连到zookeeper客户端的时候就应该把沉睡给去掉,去让它执行下面的操作。可以用闭锁的方式代码如下:也就是说监听器的线程还没有执行完,就让主线程在那里等待,直到监听器的线程执行完,主线程在往下执行。
1.1.7:如何去确定zookeeper是否连上了呢?
通过process方法可以得到当前事件的状态:会话过期,断开连接,权限失败,同步连接 当第一次连接不上的话,这里没有重试,也就这个是它的缺点。
在开发中涉及到会话重连:当断开连接的时候,还想用上面的会话Zookeeper的话,第一次连接的时候会产生一个会话的id和会话的密码,只要把会话的id和会话的密码拿到,就能够实现会话的重连,否则就会报会话过期。
1.1.8:验证下会话过期的代码如下:标注的代码是实现会话重连的代码:运行的结果如下:
打印出了会话的重连的效果。
连接成功
10:32:52.674 [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=172.20.10.9:2181 sessionTimeout=3000 watcher=com.example.zkstatus.ZkStatus@6979e8cb sessionId=1000c4e4706000d sessionPasswd=<hidden>
10:32:52.674 [main] INFO org.apache.zookeeper.ClientCnxnSocket - jute.maxbuffer value is 4194304 Bytes
继续执行---------
1.1.8.:现在改下会话的id,代码和运行结果如下:主方法的代码修改下,运行结果如下:
public static void main(String[] args) throws InterruptedException {
try {
ZooKeeper zooKeeper =new ZooKeeper("172.20.10.9:2181",5000,new ZkStatus());
//countDownLatch.await();
//获取sessionid和密码
long sessionId = zooKeeper.getSessionId();
byte[] sessionPasswd = zooKeeper.getSessionPasswd();
ZooKeeper zooKeeper1 =new ZooKeeper("172.20.10.9:2181",5000,new ZkStatus(),0x2222,sessionPasswd);
System.out.println("继续执行---------");
} catch (Exception e) {
e.printStackTrace();
}
TimeUnit.SECONDS.sleep(2);
}
总结:会话过期,当会话sessionid不是同一个的时候会导致第二次会话过期。
1.1.9:演示下连接断开:把zookeeper的客户端给断开,断开以后控制台打印的结果如下:这里可以监测到连接断开。
再次的把服务给启动起来,它可以动态的去获取,就是当这个会话的时间还没有结束的时候,它是可以监测到当前的zookeeper的状态的。断开之前的会话id和重连之后的会话id是一样的,因为整个会话的过程还没有结束,只是和服务端的通信而断开了。
总结:这也就是zookeeper在微服务中做服务发现组件的原因,当你重连的时候会触发这个监听的事件,触发这个事件的话,就会在这个事件里可以做一些事情,比如每次连接进来都不一样,就可以组成当前在线的服务列表,从而在线的服务列表里面去写上对应的负载均衡机制。
1.10:在连接成功的里面,还有小的事件:
比如在zookeeper里面创建了一个节点,然后去监测新创建的节点,勾起来的代码是多加几个事件类型,可以监测到当前节点的变化。
更改下主函数的代码如下:
public static void main(String[] args) throws InterruptedException {
try {
ZooKeeper zooKeeper =new ZooKeeper("192.168.124.241:2181",5000,new ZkStatus());
countDownLatch.await();
/* //获取sessionid和密码
long sessionId = zooKeeper.getSessionId();
byte[] sessionPasswd = zooKeeper.getSessionPasswd();
ZooKeeper zooKeeper1 =new ZooKeeper("172.20.10.9:2181",5000,new ZkStatus(),0x2222,sessionPasswd);*/
//ZooDefs.Ids.OPEN_ACL_UNSAFE:代表的是创建的schema类型为world:anyone:cdrwa的权限的节点
//CreateMode.PERSISTENT:创建的是永久的节点
//注册监听
zooKeeper.exists("/test1",true);
zooKeeper.create("/test1","test-date".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
TimeUnit.SECONDS.sleep(1);
//修改节点的值:version为-1代表不根据任何的版本号来修改,只要有/test这样的节点,它就会去修改数据
zooKeeper.exists("/test1",true);
zooKeeper.setData("/test1","test-date".getBytes(),-1);
TimeUnit.SECONDS.sleep(1);
//删除节点
zooKeeper.exists("/test1",true);
zooKeeper.delete("/test1",-1);
TimeUnit.SECONDS.sleep(1);
logger.warn("继续执行");
} catch (Exception e) {
e.printStackTrace();
}
}
运行的结果如下: