概述
Zookeeper提供了简单易用的API,我们利用这些API实现添加、删除、修改、查看ZooKeeper的节点,以及实现对这些节点的监听功能。
API介绍
建议大家养成查阅 官方文档 的习惯,因为官方文档是最权威的,而且英文也不难,基本上大家都能看得懂。
本文以v3.8.0版本的zookeeper演示,org.apache.zookeeper.Zookeeper
是ZooKeeper客户端的主类,除非另有说明,该类的方法是线程安全的。
ZooKeeper构造方法
可以通过构造方法实例化ZooKeeper对象,同时会和到服务器建立连接,服务器就会为客户端分配一个会话ID。客户端将定期向服务器发送心跳,以保持会话的有效性。
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
- connectString: Zookeeper服务器的地址,多个地址用逗号分隔
- sessionTimeout:超时时间
- watcher:设置默认监听器
ZooKeeper常用方法
ZooKeeper的API提供了同步和异步两种方式。同步方法会阻塞,直到服务器响应。异步方法只是对请求进行排队,以便立即发送和返回。它们接受一个回调对象,该对象将在请求成功执行时执行,或在出现错误时执行,并带有指示错误的适当返回码。
方法 | 描述 |
create(String path, byte[] data, List acl, CreateMode createMode) | 同步方式创建节点 |
create(String path, byte[] data, List acl, CreateMode createMode) | 异步方式创建节点 |
delete(String path, int version) | 同步方式删除节点 |
delete(String path, int version, AsyncCallback.VoidCallback cb, Object ctx) | 异步方式删除节点 |
exists(String path, boolean watch) | 返回指定路径的节点状态信息,如果不存在返回null |
getChildren(String path, boolean watch) | 返回指定路径的所有子节点状态信息 |
getData(String path, boolean watch, Stat stat) | 返回指定路径的节点数据和状态信息 |
setData(String path, byte[] data, int version) | 给指定路径和版本的节点设置新值,如版本为-1,即给所有版本设置值 |
环境准备
- 引入对应版本的依赖,本文用3.8.0最新版本演示
<!--zookeeper 依赖包--> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.8.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.24</version> <scope>compile</scope> </dependency> <!--junit测试依赖--> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.5.2</version> <scope>test</scope> </dependency> </dependencies>
- 我们采用junit的方式演示,junit的常用注解作用如下:
@BeforeClass
– 表示在类中的任意public static void方法执行之前执行@AfterClass
– 表示在类中的任意public static void方法执行之后执行@Before
– 表示在任意使用@Test注解标注的public void方法执行之前执行@After
– 表示在任意使用@Test注解标注的public void方法执行之后执行@Test
– 使用该注解标注的public void方法会表示为一个测试方法
测试案例
创建会话和关闭会话
可以通过Zookeeper类的构造函数创建会话,它有10个重载的构造方法。
参数 | 说明 |
connectString | 指定ZooKeeper服务器列表,有英文逗号分隔的host:port字符串组成,如"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"。可以指定客户端连上connectString中服务器后的根目录,如 "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a" ,对ZooKeeper的操作都会基于/app/a这个根目录,即创建路径为"/foo/bar"的节点,实际该节点的路径为"/app/a/foo/bar" |
sessionTimeout | 会话的超时时间,单位毫秒。在一个会话周期内,ZooKeeper客户端和服务器之间会通过心跳检测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。 |
watcher | ZooKeeper允许客户端在构造方法中传入一个接口Watcher(org.apache.zookeeper.Watcher)的实现类对象来作为默认的Watch事件通知器。该参数也可以设置为null,表明不需要设置默认的Watch处理器。 |
- 客户端和服务端建立会话是异步的。构造方法会在处理完客户端初始化工作后立即返回,在通常情况下,此时并没有真正建立好一个可用的会话,此时在会话的生命周期中处于“CONNECTING”的状态。当该会话真正创建完毕后,ZooKeeper服务端会向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知后,才算真正建立了会话。
- 实例化的ZooKeeper客户端对象将从connectString列举的服务器中随机选择一个服务器,并尝试连接到该服务器。如果建立连接失败,将尝试连接另一个服务器(顺序是不确定的,因为列举的服务器是随机洗牌的),直到建立连接。即客户端连接一个服务器失败,将继续尝试,直到会话显式关闭。
代码:
private static final String ZK_ADDR = "10.100.1.14:2181"; private static final Integer ZK_SESSION_TIMEOUT = 30000; private ZooKeeper zooKeeper = null; @Before public void init() throws IOException, InterruptedException { log.info("********************** start zk .................."); CountDownLatch countDownLatch = new CountDownLatch(1); zooKeeper = new ZooKeeper(ZK_ADDR, ZK_SESSION_TIMEOUT, event -> { log.info("触发了事件:[{}]", event); countDownLatch.countDown(); }); countDownLatch.await(); } @After public void close() throws InterruptedException { zooKeeper.close(); log.info("************************ close zk .................."); }
- init方法和close方法是用来创建和关闭zk会话,加了
@Before
和@After
注解,它会在每个测试用例前后执行。 - 由于客户端和服务端建立会话是异步的,因此做一个阻塞操作,防止还没开启就执行后面的操作,在真正打开了客户端之后,发送一个消息,并解掉阻塞。
创建节点
创建节点有同步和异步两种方式。
create( final String path, byte[] data, List<ACL> acl, CreateMode createMode)
说明:
- 该方法是一个同步创建节点的方法
参数说明:
参数 | 说明 |
path | znode路径。例如,/path, /app/node |
data | 存储到znode路径的数据,byte数组,最大1M |
acl | 要创建的节点的访问控制列表。zookeeper API提供了一个静态接口 ZooDefs.Ids 来获取一些基本的acl列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE 返回打开znode的acl列表。- ZooDefs.Ids.OPEN_ACL_UNSAFE:表示开放权限,所有用户拥有所有权限 |
- ZooDefs.Ids.CREATOR_ALL_ACL:表示使用 auth 权限模式,并且对于满足条件的用户开放所有权限
- ZooDefs.Ids.READ_ACL_UNSAFE:表示对于所有用户,只开放Read权限
- ZooDefs.Ids.ANYONE_ID_UNSAFE:是一个常用的Id对象,表示所有用户
- ZooDefs.Ids.AUTH_IDS:是一个Auth模式的Id对象。
- 我们也可以自己定义权限模式 | | createMode | 节点的类型,是一个枚举。- PERSISTENT:持久节点(也有叫永久节点的),不会随着会话的结束而自动删除。
- PERSISTENT_SEQUENTIAL:带单调递增序号的持久节点,不会随着会话的结束而自动删除。
- EPHEMERAL:临时节点,会随着会话的结束而自动删除。
- EPHEMERAL_SEQUENTIAL:带单调递增序号的临时节点,会随着会话的结束而自动删除。
- CONTAINER:容器节点,用于Leader、Lock等特殊用途,当容器节点不存在任何子节点时,容器将成为服务器在将来某个时候删除的候选节点。
- PERSISTENT_WITH_TTL:带TTL(time-to-live,存活时间)的持久节点,节点在TTL时间之内没有得到更新并且没有子节点,就会被自动删除。
- PERSISTENT_SEQUENTIAL_WITH_TTL:带TTL(time-to-live,存活时间)和单调递增序号的持久节点,节点在TTL时间之内没有得到更新并且没有子节点,就会被自动删除。 |
create(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsyncCallback.StringCallback callBack,Object ctx)
说明:
- 这是一个异步创建节点的方法
参数说明:
其他参数和上面同步创建节点一样.
参数 | 说明 |
callBack | 异步回调接口 |
ctx | 传递上下文参数 |
代码:
@Test public void testCreate() throws InterruptedException, KeeperException { // 创建一个持久节点,对所有用户开放 zooKeeper.create("/node1", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // 创建一个临时的有序节点,权限模式为对指定ip开放 Id ip = new Id("ip", "10.100.1.100"); zooKeeper.create("/user", "u00001".getBytes(), Collections.singletonList(new ACL(ZooDefs.Perms.ALL, ip)), CreateMode.EPHEMERAL_SEQUENTIAL); } // 异步创建节点 @Test public void testCreateAsync() throws InterruptedException { zooKeeper.create("/path2", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new AsyncCallback.StringCallback(){ @Override public void processResult(int rc, String path, Object ctx, String name) { log.info("rc: [{}]", rc); // 0代表成功了 log.info(path); // 传进来的,添加的节点 log.info(name); // 真正查到的节点的名字 log.info(ctx.toString()); // 上下文参数,ctx传进来的东西 log.info("create node success!"); } }, "ctx" ); Thread.sleep(1000); }
创建成功的日志
查看节点
// 同步方式查看节点数据,使用自定义的监听器 byte[] getData(final String path, Watcher watcher, Stat stat) // 同步方式查看节点数据,使用连接时的监听器 byte[] getData(String path, boolean watch, Stat stat) // 异步方式查看节点,使用自定义的监听器 void getData(final String path, Watcher watcher, DataCallback cb, Object ctx) // 异步方式查看节点,使用注册的连接器 void getData(String path, boolean watch, DataCallback cb, Object ctx)
参数说明:
参数 | 说明 |
path | znode路径 |
watcher | 注册一个监听器 |
watch | 是否使用连接对象中注册的监视器 |
stat | 返回znode的元数据 |
callBack | 异步回调接口 |
ctx | 传递上下文参数 |
代码
@Test public void testGet() throws InterruptedException, KeeperException { Stat stat = new Stat(); byte[] data = zooKeeper.getData("/node1", false, stat); log.info("获取到的数据是:" + new String(data)); log.info("当前节点的版本:" + stat.getVersion()); } @Test public void testGetAsync() throws InterruptedException, KeeperException { zooKeeper.getData("/node1", null, new AsyncCallback.DataCallback() { @Override public void processResult(int rc, String path, Object ctx, byte[] bytes, Stat stat) { log.info("rc: " + rc); log.info(path); log.info(new String(bytes)); log.info("version: " + stat.getVersion()); } }, null); Thread.sleep(1000); }
更新节点
// 同步方式更新节点 Stat setData(final String path, byte[] data, int version) // 异步方式更新节点 void setData(final String path, byte[] data, int version, StatCallback cb, Object ctx)
参数说明:
参数 | 说明 |
path | znode路径 |
data | 更新的数据 |
version | znode的当前版本。值为-1时,表示不需要考虑版本。如果指定版本之后,就可以做成一个乐观锁。 |
callBack | 异步回调接口 |
ctx | 传递上下文参数 |
代码:
@Test public void testSetData() throws InterruptedException, KeeperException { Stat stat = zooKeeper.setData("/node1", "alvin".getBytes(), -1); // 返回状态信息 log.info(stat.toString()); // 将状态信息打印 log.info("当前版本号" + stat.getVersion()); log.info("节点创建时间" + stat.getCtime()); log.info("节点修改时间" + stat.getMtime()); } @Test public void testSetDataAsync() throws InterruptedException, KeeperException { zooKeeper.setData("/node1", "alvin2".getBytes(), 1, new AsyncCallback.StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { log.info("rc" + rc); // 0 代表修改成功 log.info(path); // 输入的节点路径 log.info("version " + stat.getVersion()); // 当前版本 } }, null); // 返回状态信息 Thread.sleep(1000); }
删除节点
// 同步方式删除节点 void delete(final String path, int version) // 异步方式删除节点 void delete(final String path, int version, VoidCallback cb, Object ctx)
参数说明:
参数 | 说明 |
path | znode路径 |
version | znode的当前版本。值为-1时,表示不需要考虑版本。如果指定版本之后,就可以做成一个乐观锁。 |
callBack | 异步回调接口 |
ctx | 传递上下文参数 |
代码:
@Test public void testDelete() throws InterruptedException, KeeperException { zooKeeper.delete("/node1", -1); // 如果节点不存在,会删除失败 zooKeeper.delete("/node5/child1", -1); // 如果节点不存在,会删除失败 } @Test public void testDeleteAsync() { zooKeeper.delete("/node1", -1, new AsyncCallback.VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { log.info("rc:" + rc); log.info(path); } }, "ctx"); }
查看子节点
//同步方式查看子节点,传入监听器 List<String> getChildren(final String path, Watcher watcher) //同步方式查看子节点,是否使用默认的监听器 List<String> getChildren(String path, boolean watch) //异步方式查看子节点,传入监听器 void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx) //异步方式查看子节点,是否使用默认的监听器 void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
代码:
private void syncCreateNode(String path, String data) throws InterruptedException, KeeperException { zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } @Test public void testGetChild() throws InterruptedException, KeeperException { syncCreateNode("/a", "hello"); syncCreateNode("/a/b", "hello"); syncCreateNode("/a/c", "hello"); List<String> children = zooKeeper.getChildren("/a", false); log.info("********* children: [{}]", children); } 复制代码
结果:
检查节点是否存在
// 同步方式检查节点是否存在, 传入监听器 Stat exists(final String path, Watcher watcher) // 同步方式检查节点是否存在, 是否用默认监听器 Stat exists(String path, boolean watch) // 异步方式检查节点是否存在, 传入监听器 void exists(final String path, Watcher watcher, StatCallback cb, Object ctx) // 异步方式检查节点是否存在, 是否用默认监听器 void exists(String path, boolean watch, StatCallback cb, Object ctx)
代码:
@Test public void testExist() throws InterruptedException, KeeperException { syncCreateNode("/alvin", "hello"); Stat stat = zooKeeper.exists("/alvin", false); log.info("stat: [{}]", stat); log.info("delete node /alvin ......"); zooKeeper.delete("/alvin", -1); stat = zooKeeper.exists("/alvin", false); log.info("stat: [{}]", stat); }
监听器代码验证
getData、exist是、getChildren三个方法都可以监听对应节点变化。
验证watch的一次性
- 创建监听执行
private void syncCreateNode(String path, String data) throws InterruptedException, KeeperException { zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } @Test public void testGetWatch() throws InterruptedException, KeeperException { // 创建临时节点 syncCreateNode("/watch", "aaa"); byte[] data = zooKeeper.getData("/watch", true, new Stat()); log.info("getData: [{}]", new String(data)); Thread.sleep(100000L); }
- 多次执行更新节点的操作
// 修改数据 @Test public void testUpdateData() throws InterruptedException, KeeperException { // 创建临时节点 zooKeeper.setData("/watch", "bbb".getBytes(), -1); Thread.sleep(10000L); }
- 查看结果, 日志只打印了一次
通过自定义监听器多次监听
- 通过exists创建自定义监听
@Test public void testExists() throws KeeperException, InterruptedException { // 创建临时节点 syncCreateNode("/watch", "aaa"); // 重复使用,用完再注册一个新的 Stat stat = zooKeeper.exists("/watch", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { switch (watchedEvent.getType()) { case NodeCreated: log.info("{}节点创建了", watchedEvent.getPath()); break; case NodeDataChanged: log.info("{}节点数据被修改了", watchedEvent.getPath()); break; case NodeDeleted: log.info("{}节点被删除了", watchedEvent.getPath()); break; } try { // 重复监听的关键 zooKeeper.exists("/watch", this); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }); if (stat != null) { log.info("version: " + stat.getVersion()); } Thread.sleep(100000); }
- 多次执行更新节点、删除节点、创建节点的操作
- 查看结果,多次响应监听
通过addWatcher方法实现多次监听
- 通过addWatch添加监听器,addWatch方法支持重复监听
@Test public void testAddWatch() throws InterruptedException, KeeperException { // 创建临时节点 syncCreateNode("/watch", "aaa"); zooKeeper.addWatch("/watch", new Watcher() { @Override public void process(WatchedEvent event) { switch (event.getType()) { case NodeCreated: log.info("{}节点创建了", event.getPath()); break; case NodeDataChanged: log.info("{}节点数据被修改了", event.getPath()); break; case NodeDeleted: log.info("{}节点被删除了", event.getPath()); break; } } }, AddWatchMode.PERSISTENT); Thread.sleep(100000); }
- 多次执行更新节点、删除节点、创建节点的操作
- 查看结果
说明:
- zk api提供了addWatch、printwatches、removewatches方法,分别用来添加监听,答应监听器和移除监听器列表。
- addWatch的参数中可以传入监听的两种模式,PERSISTENT和PERSISTENT_RECURSIVE,PERSISTENT模式只监听指定的节点事件,而PERSISTENT_RECURSIVE模式会监听指定节点与它所有子节点的事件。