1.pom 文件
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.8</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency>
2.zk 操作
package com.vince.xq.kafka; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Test; import java.util.concurrent.CountDownLatch; public class ZKTest { @Test public void testZkConnect() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(1); ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 4000, new Watcher() { @Override public void process(WatchedEvent event) { if (Event.KeeperState.SyncConnected == event.getState()) { //如果收到了服务端的响应事件,连接成功 countDownLatch.countDown(); } } }); countDownLatch.await(); //CONNECTED //zooKeeper.create("/CONFIG/timeout", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zooKeeper.delete("/CONFIG/timeout", -1); System.out.println(zooKeeper.getState()); } @Test public void testZkCreateNode() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(1); ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 4000, new Watcher() { @Override public void process(WatchedEvent event) { if (Event.KeeperState.SyncConnected == event.getState()) { //如果收到了服务端的响应事件,连接成功 countDownLatch.countDown(); } } }); countDownLatch.await(); //CONNECTED String path = "/watcher"; String nodePath = zooKeeper.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(nodePath); System.out.println(zooKeeper.getState()); } @Test public void testZkDeleteNode() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(1); ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 4000, new Watcher() { @Override public void process(WatchedEvent event) { if (Event.KeeperState.SyncConnected == event.getState()) { //如果收到了服务端的响应事件,连接成功 countDownLatch.countDown(); } } }); countDownLatch.await(); //CONNECTED zooKeeper.delete("/watcher", -1); System.out.println(zooKeeper.getState()); } @Test public void getZkNodeData() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(1); ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 4000, new Watcher() { @Override public void process(WatchedEvent event) { if (Event.KeeperState.SyncConnected == event.getState()) { //如果收到了服务端的响应事件,连接成功 countDownLatch.countDown(); } } }); countDownLatch.await(); Stat stat = new Stat(); // getData的返回值是该节点的数据值,节点的状态信息会赋值给stat对象 byte[] data = zooKeeper.getData("/CONFIG/timeout", true, stat); System.out.println(new String(data)); System.out.println(stat); System.out.println(zooKeeper.getState()); } @Test public void getZkConnect() throws Exception { CuratorFramework curatorFramework = CuratorFrameworkFactory. builder().connectString("127.0.0.1:2181"). sessionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000, 3)). namespace("").build(); curatorFramework.start(); Stat stat = new Stat(); //查询节点数据 byte[] bytes = curatorFramework.getData().storingStatIn(stat).forPath("/runoob"); System.out.println(new String(bytes)); curatorFramework.close(); } @Test public void setData() throws Exception { CuratorFramework curatorFramework = CuratorFrameworkFactory. builder().connectString("127.0.0.1:2181"). sessionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000, 3)). namespace("").build(); curatorFramework.start(); Stat stat = new Stat(); //查询节点数据 curatorFramework.setData().forPath("/watcher", "123".getBytes()); curatorFramework.close(); } }
参考:
https://segmentfault.com/a/1190000012262940