上一篇博客《分布式协调工具Zookeeper(介绍&安装&配置详解)》讲到了分布式协调工具Zookeeper,主要讲解Zookeeper的概念、应用场景以及安装配置。基于上一篇博客安装的环境,本文讲解如何使用Java操作Zookeeper?
代码已提交至Gtihub,有兴趣的同学可以下载看看(git版本号:e9d27b6df05095bb50c3666a1e8965102c85bb01
):https://github.com/ylw-github/Zookeeper-Demo.git
本文目录结构:
1. 基本概念
1.1 创建节点(znode)方法
Zookeeper提供了两套创建节点的方法,同步
和异步
创建节点方式。
其中同步的方式,有几个节点需要注意:
- 节点路径(名称) InodeName: (不允许递归创建节点,也就是说在父节点不存在
的情况下,不允许创建子节点) - 节点内容: 要求类型是字节数组(也就是说,不支持序列化方式,如果需要实现序
列化,可使用java相关序列化框架,如Hessian、Kryo框架) - 节点权限: 使用Ids.OPEN_ACL_UNSAFE开放权限即可。(这个参数一般在权展
没有太高要求的场景下,没必要关注) - 节点类型: 创建节点的类型: CreateMode,提供四种首点象型,如下:
名称 | 解析 |
PERSISTENT | 持久化节点 |
PERSISTENT_SEQUENTIAL | 顺序自动编号持久化节点,这种节点会根据当前已存在的节点数自动加 1 |
EPHEMERAL | 临时节点, 客户端session超时这类节点就会被自动删除 |
EPHEMERAL_SEQUENTIAL | 临时自动编号节点 |
1.2 Watcher
在ZooKeeper中,接口类Watcher用于表示一个标准的事件处理器,其定义了事件通知相关的逻辑,包含KeeperState和EventType两个枚举类,分别代表了通知状态和事件类型,同时定义了事件的回调方法:process(WatchedEvent event)
。
什么是watcher接口?
同一个事件类型在不同的通知状态中代表的含义有所不同,下表例举了常见的通知状态和事件类型:
KeeperState | EventType | 触发条件 | 说明 |
None(-1) | 客户端与服务端成功建立连接 | ||
SyncConnected(0) | NodeCreated(1) | Watcher监听的对应数据节点被创建 | |
NodeDeleted(2) | Watcher监听的对应数据节点被删除 | 此时客户端和服务器处于连接状态 | |
NodeDataChanged(3) | Watcher监听的对应数据节点的数据内容发生变更 | ||
NodeChildChanged(4) | Wather监听的对应数据节点的子节点列表发生变更 | ||
Disconnected(0) | None(-1) | 客户端与ZooKeeper服务器断开连接 | 此时客户端和服务器处于断开连接状态 |
Expired(-112) | Node(-1) | 会话超时 | 此时客户端会话失效,通常同时也会受到SessionExpiredException异常 |
AuthFailed(4) | None(-1) | 通常有两种情况,1:使用错误的schema进行权限检查 2:SASL权限检查失败 | 通常同时也会收到AuthFailedException异常 |
回调方法process():
- process方法是Watcher接口中的一个回调方法,当ZooKeeper向客户端发送一个Watcher事件通知时,客户端就会对相应的process方法进行回调,从而实现对事件的处理,process方法的定义如下:
abstract public void process(WatchedEvent event);
这个回调方法的定义非常简单,我们重点看下方法的参数定义WatchedEvent:
- WatchedEvent包含了每一个事件的三个基本属性:通知状态(keeperState),事件类型(EventType)和节点路径(path)。ZooKeeper使用WatchedEvent对象来封装服务端事件并传递给Watcher,从而方便回调方法process对服务端事件进行处理。
- 提到WatchedEvent,两者表示的是同一个事物,都是对一个服务端事件的封装。不同的是,WatchedEvent是一个逻辑事件,用于服务端和客户端程序执行过程中所需的逻辑对象,而WatcherEvent因为实现了序列化接口,因此可以用于网络传输。
- 服务端在生成WatchedEvent事件之后,会调用getWrapper方法将自己包装成一个可序列化的WatcherEvent事件,以便通过网络传输到客户端。客户端在接收到服务端的这个事件对象后,首先会将WatcherEvent还原成一个WatchedEvent事件,并传递给process方法处理,回调方法process根据入参就能够解析出完整的服务端事件了。
- 需要注意的一点是,无论是WatchedEvent还是WatcherEvent,其对ZooKeeper服务端事件的封装都是机及其简单的。
2.Java操作Zookeeper
1.创建项目
2.添加maven依赖
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency>
3.Zookeeper客户端连接
package com.ylw.zookeeper.Test; import org.apache.zookeeper.*; import java.io.IOException; import java.util.concurrent.CountDownLatch; public class Test { //连接地址 private static final String ADDRES = "192.168.162.131:2181"; //session 会话 private static final int SESSION_OUTTIME = 2000; //信号量,阻塞程序执行,用户等待zookeeper连接成功,发送成功信号, private static final CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper(ADDRES, SESSION_OUTTIME, new Watcher() { public void process(WatchedEvent event) { // 获取事件状态 Event.KeeperState keeperState = event.getState(); // 获取事件类型 Event.EventType eventType = event.getType(); if (Event.KeeperState.SyncConnected == keeperState) { if (Event.EventType.None == eventType) { countDownLatch.countDown(); System.out.println("zk 启动连接..."); } } } }); // 进行阻塞 countDownLatch.await(); String result = zk.create("/ylw_Lasting", "Lasting".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("result - >" + result); zk.close(); } }
注意创建节点的两种方式:
1.创建持久节点,并且允许任何服务器可以操作 String result = zk.create("/ylw_Lasting", "Lasting".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 2.创建临时节点 String result = zk.create("/ylw_temp", "temp".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
4.Zookeeper客户端连接
package com.ylw.zookeeper.Test; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.util.concurrent.CountDownLatch; public class ZkClientWatcher implements Watcher { // 集群连接地址 private static final String CONNECT_ADDRES = "192.168.162.131:2181,192.168.162.131:2182,192.168.162.131:2183"; // 会话超时时间 private static final int SESSIONTIME = 2000; // 信号量,让zk在连接之前等待,连接成功后才能往下走. private static final CountDownLatch countDownLatch = new CountDownLatch(1); private static String LOG_MAIN = "【main】 "; private ZooKeeper zk; public void createConnection(String connectAddres, int sessionTimeOut) { try { zk = new ZooKeeper(connectAddres, sessionTimeOut, this); System.out.println(LOG_MAIN + "zk 开始启动连接服务器...."); countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } } public boolean createPath(String path, String data) { try { this.exists(path, true); this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(LOG_MAIN + "节点创建成功, Path:" + path + ",data:" + data); } catch (Exception e) { e.printStackTrace(); return false; } return true; } /** * 判断指定节点是否存在 * * @param path 节点路径 */ public Stat exists(String path, boolean needWatch) { try { return this.zk.exists(path, needWatch); } catch (Exception e) { e.printStackTrace(); return null; } } public boolean updateNode(String path, String data) throws KeeperException, InterruptedException { exists(path, true); this.zk.setData(path, data.getBytes(), -1); return false; } public void process(WatchedEvent watchedEvent) { // 获取事件状态 Event.KeeperState keeperState = watchedEvent.getState(); // 获取事件类型 Event.EventType eventType = watchedEvent.getType(); // zk 路径 String path = watchedEvent.getPath(); System.out.println("进入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path); // 判断是否建立连接 if (Event.KeeperState.SyncConnected == keeperState) { if (Event.EventType.None == eventType) { // 如果建立建立成功,让后程序往下走 System.out.println(LOG_MAIN + "zk 建立连接成功!"); countDownLatch.countDown(); } else if (Event.EventType.NodeCreated == eventType) { System.out.println(LOG_MAIN + "事件通知,新增node节点" + path); } else if (Event.EventType.NodeDataChanged == eventType) { System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被修改...."); } else if (Event.EventType.NodeDeleted == eventType) { System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被删除...."); } } System.out.println("--------------------------------------------------------"); } public static void main(String[] args) throws KeeperException, InterruptedException { ZkClientWatcher zkClientWatcher = new ZkClientWatcher(); zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME); boolean createResult = zkClientWatcher.createPath("/del.do", "http://www.xxx001.delete"); //zkClientWatcher.updateNode("/del.do", "http://www.xxx002.delete"); } }
运行结果:
更新节点:
//boolean createResult = zkClientWatcher.createPath("/del.do", "http://www.xxx001.delete"); zkClientWatcher.updateNode("/del.do", "http://www.xxx002.delete");
运行后:
3.总结