一、前言
上一篇博客已经介绍了如何使用Zookeeper提供的原生态Java API进行操作,本篇博文主要讲解如何通过开源客户端来进行操作。
二、ZkClient
ZkClient是在Zookeeper原声API接口之上进行了包装,是一个更易用的Zookeeper客户端,其内部还实现了诸如Session超时重连、Watcher反复注册等功能。
2.1 添加依赖
在pom.xml文件中添加如下内容即可。
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.2</version> </dependency>
2.2 创建会话
使用ZkClient可以轻松的创建会话,连接到服务端。
package com.hust.grid.leesf.zkclient.examples; import java.io.IOException; import org.I0Itec.zkclient.ZkClient; public class Create_Session_Sample { public static void main(String[] args) throws IOException, InterruptedException { ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000); System.out.println("ZooKeeper session established."); } }
运行结果:
ZooKeeper session established.
结果表明已经成功创建会话。
2.3 创建节点
ZkClient提供了递归创建节点的接口,即其帮助开发者完成父节点的创建,再创建子节点。
package com.hust.grid.leesf.zkclient.examples; import org.I0Itec.zkclient.ZkClient; public class Create_Node_Sample { public static void main(String[] args) throws Exception { ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000); String path = "/zk-book/c1"; zkClient.createPersistent(path, true); System.out.println("success create znode."); } }
运行结果:
success create znode.
结果表明已经成功创建了节点,值得注意的是,在原生态接口中是无法创建成功的(父节点不存在),但是通过ZkClient可以递归的先创建父节点,再创建子节点。
可以看到确实成功创建了/zk-book和/zk-book/c1两个节点。
2.4 删除节点
ZkClient提供了递归删除节点的接口,即其帮助开发者先删除所有子节点(存在),再删除父节点。
package com.hust.grid.leesf.zkclient.examples; import org.I0Itec.zkclient.ZkClient; public class Del_Data_Sample { public static void main(String[] args) throws Exception { String path = "/zk-book"; ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000); zkClient.createPersistent(path, ""); zkClient.createPersistent(path+"/c1", ""); zkClient.deleteRecursive(path); System.out.println("success delete znode."); } }
运行结果:
success delete znode.
结果表明ZkClient可直接删除带子节点的父节点,因为其底层先删除其所有子节点,然后再删除父节点。
2.5 获取子节点
package com.hust.grid.leesf.zkclient.examples; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; public class Get_Children_Sample { public static void main(String[] args) throws Exception { String path = "/zk-book"; ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000); zkClient.subscribeChildChanges(path, new IZkChildListener() { public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { System.out.println(parentPath + " 's child changed, currentChilds:" + currentChilds); } }); zkClient.createPersistent(path); Thread.sleep(1000); zkClient.createPersistent(path + "/c1"); Thread.sleep(1000); zkClient.delete(path + "/c1"); Thread.sleep(1000); zkClient.delete(path); Thread.sleep(Integer.MAX_VALUE); } }
运行结果:
/zk-book 's child changed, currentChilds:[] /zk-book 's child changed, currentChilds:[c1] /zk-book 's child changed, currentChilds:[] /zk-book 's child changed, currentChilds:null
结果表明:
客户端可以对一个不存在的节点进行子节点变更的监听。
一旦客户端对一个节点注册了子节点列表变更监听之后,那么当该节点的子节点列表发生变更时,服务端都会通知客户端,并将最新的子节点列表发送给客户端
该节点本身的创建或删除也会通知到客户端。
2.6 获取数据
package com.hust.grid.leesf.zkclient.examples; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; public class Get_Data_Sample { public static void main(String[] args) throws Exception { String path = "/zk-book"; ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000); zkClient.createEphemeral(path, "123"); zkClient.subscribeDataChanges(path, new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { System.out.println("Node " + dataPath + " deleted."); } public void handleDataChange(String dataPath, Object data) throws Exception { System.out.println("Node " + dataPath + " changed, new data: " + data); } }); System.out.println(zkClient.readData(path)); zkClient.writeData(path, "456"); Thread.sleep(1000); zkClient.delete(path); Thread.sleep(Integer.MAX_VALUE); } }
运行结果:
123 Node /zk-book changed, new data: 456 Node /zk-book deleted.
结果表明可以成功监听节点数据变化或删除事件。
2.7 检测节点是否存在
package com.hust.grid.leesf.zkclient.examples; import org.I0Itec.zkclient.ZkClient; public class Exist_Node_Sample { public static void main(String[] args) throws Exception { String path = "/zk-book"; ZkClient zkClient = new ZkClient("127.0.0.1:2181", 2000); System.out.println("Node " + path + " exists " + zkClient.exists(path)); } }
运行结果:
Node /zk-book exists false
结果表明,可以通过ZkClient轻易检测节点是否存在,其相比于原生态的接口更易于理解。
三、Curator客户端
Curator解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连,反复注册Watcher和NodeExistsException异常等,现已成为Apache的顶级项目。
3.1 添加依赖
在pom.xml文件中添加如下内容即可。
<!-- https://mvnrepository.com/artifact/org.apache.curator/apache-curator --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.4.2</version> </dependency>
3.2 创建会话
Curator除了使用一般方法创建会话外,还可以使用fluent风格进行创建。
package com.hust.grid.leesf.curator.examples; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; public class Create_Session_Sample { public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 3000, retryPolicy); client.start(); System.out.println("Zookeeper session1 established. "); CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("base").build(); client1.start(); System.out.println("Zookeeper session2 established. "); } }
Zookeeper session1 established. Zookeeper session2 established.
值得注意的是session2会话含有隔离命名空间,即客户端对Zookeeper上数据节点的任何操作都是相对/base目录进行的,这有利于实现不同的Zookeeper的业务之间的隔离。
3.3 创建节点
通过使用Fluent风格的接口,开发人员可以进行自由组合来完成各种类型节点的创建。
package com.hust.grid.leesf.curator.examples; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class Create_Node_Sample { public static void main(String[] args) throws Exception { String path = "/zk-book/c1"; CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes()); System.out.println("success create znode: " + path); } }
运行结果:
success create znode: /zk-book/c1
其中,也创建了/zk-book/c1的父节点/zk-book节点。
3.4 删除节点
package com.hust.grid.leesf.curator.examples; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; public class Del_Data_Sample { public static void main(String[] args) throws Exception { String path = "/zk-book/c1"; CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes()); Stat stat = new Stat(); System.out.println(new String(client.getData().storingStatIn(stat).forPath(path))); client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path); System.out.println("success delete znode " + path); Thread.sleep(Integer.MAX_VALUE); } }
运行结果:
init success delete znode /zk-book/c1
结果表明成功删除/zk-book/c1节点。
3.5 获取数据
package com.hust.grid.leesf.curator.examples; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; public class Get_Data_Sample { public static void main(String[] args) throws Exception { String path = "/zk-book"; CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes()); Stat stat = new Stat(); System.out.println(new String(client.getData().storingStatIn(stat).forPath(path))); } }
运行结果:
init
结果表明成功获取了节点的数据。
3.6 更新数据
package com.hust.grid.leesf.curator.examples; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; public class Set_Data_Sample { public static void main(String[] args) throws Exception { String path = "/zk-book"; CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes()); Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath(path); System.out.println("Success set node for : " + path + ", new version: " + client.setData().withVersion(stat.getVersion()).forPath(path).getVersion()); try { client.setData().withVersion(stat.getVersion()).forPath(path); } catch (Exception e) { System.out.println("Fail set node due to " + e.getMessage()); } } }
运行结果:
Success set node for : /zk-book, new version: 1 Fail set node due to KeeperErrorCode = BadVersion for /zk-book
结果表明当携带数据版本不一致时,无法完成更新操作。
3.7 异步接口
如同Zookeeper原生API提供了异步接口,Curator也提供了异步接口。在Zookeeper中,所有的异步通知事件处理都是由EventThread这个线程来处理的,EventThread线程用于串行处理所有的事件通知,其可以保证对事件处理的顺序性,但是一旦碰上复杂的处理单元,会消耗过长的处理时间,从而影响其他事件的处理,Curator允许用户传入Executor实例,这样可以将比较复杂的事件处理放到一个专门的线程池中去。
package com.hust.grid.leesf.curator.examples; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class Create_Node_Background_Sample { static String path = "/zk-book"; static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); static CountDownLatch semaphore = new CountDownLatch(2); static ExecutorService tp = Executors.newFixedThreadPool(2); public static void main(String[] args) throws Exception { client.start(); System.out.println("Main thread: " + Thread.currentThread().getName()); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() { public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName()); System.out.println(); semaphore.countDown(); } }, tp).forPath(path, "init".getBytes()); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() { public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName()); semaphore.countDown(); } }).forPath(path, "init".getBytes()); semaphore.await(); tp.shutdown(); } }
运行结果:
Main thread: main event[code: -110, type: CREATE], Thread of processResult: main-EventThread event[code: 0, type: CREATE], Thread of processResult: pool-3-thread-1
其中,创建节点的事件由线程池自己处理,而非默认线程处理。
Curator除了提供很便利的API,还提供了一些典型的应用场景,开发人员可以使用参考更好的理解如何使用Zookeeper客户端,所有的都在recipes包中,只需要在pom.xml中添加如下依赖即可
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.4.2</version> </dependency>