代码实现
下面我们就来演示如何使用代码来实现ZooKeeper的配置
首先我们需要引入ZK的jar
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.3</version> </dependency>
配置类
既然我们要做的是分布式配置,首先我们需要模拟一个配置,这个配置用来同步服务的地址
/** * @program: mxnzookeeper * @ClassName MyConf * @description: 配置类 * @author: muxiaonong * @create: 2021-10-19 22:18 * @Version 1.0 **/ public class MyConfig { private String conf ; public String getConf() { return conf; } public void setConf(String conf) { this.conf = conf; } }
Watcher
创建ZooKeeper的时候,我们需要一个Watcher进行监听,后续对Znode节点操作的时候,我们也需要使用到Watcher,但是这两类的功能不一样,所以我们需要定义一个自己的watcher类,如下所示:
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.util.concurrent.CountDownLatch; /** * @program: mxnzookeeper * @ClassName DefaultWatch * @description: * @author: muxiaonong * @create: 2021-10-19 22:02 * @Version 1.0 **/ public class DefaultWatch implements Watcher { CountDownLatch cc; public void setCc(CountDownLatch cc) { this.cc = cc; } @Override public void process(WatchedEvent event) { System.out.println(event.toString()); switch (event.getState()) { case Unknown: break; case Disconnected: break; case NoSyncConnected: break; case SyncConnected: System.out.println("连接成功。。。。。"); //连接成功后,执行countDown,此时便可以拿zk对象使用了 cc.countDown(); break; case AuthFailed: break; case ConnectedReadOnly: break; case SaslAuthenticated: break; case Expired: break; case Closed: break; } } }
由于是异步进行操作的,我们创建一个ZooKeeper对象之后,如果不进行阻塞操作的话,有可能还没有连接完成就执行后续的操作,所以这里我们用 CountDownLatch进行阻塞操作,当监测连接成功后,进行 countDown放行,执行后续的ZK的动作。
当我们连接成功 ZooKeeper 之后,我们需要通过 exists判断是否存在节点,存在就进行 getData操作。这里我们创建一个 WatchCallBack因为exists和getData都需要一个callback,所以除了实现Watcher以外还需要实现节点状态:AsyncCallback.StatCallback 数据监听:AsyncCallback.DataCallback
import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.util.concurrent.CountDownLatch; /** * @program: mxnzookeeper * @ClassName WatchCallBack * @description: * @author: muxiaonong * @create: 2021-10-19 22:13 * @Version 1.0 **/ public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { ZooKeeper zk ; MyConfig conf ; CountDownLatch cc = new CountDownLatch(1); public MyConfig getConf() { return conf; } public void setConf(MyConfig conf) { this.conf = conf; } public ZooKeeper getZk() { return zk; } public void setZk(ZooKeeper zk) { this.zk = zk; } public void aWait(){ //exists的异步实现版本 zk.exists(ZKConstants.ZK_NODE,this,this ,"exists watch"); try { cc.await(); } catch (InterruptedException e) { e.printStackTrace(); } } /** @Author mxn * @Description //TODO 此回调用于检索节点的stat * @Date 21:24 2021/10/20 * @param rc 调用返回的code或结果 * @param path 传递给异步调用的路径 * @param ctx 传递给异步调用的上下文对象 * @param stat 指定路径上节点的Stat对象 * @return **/ @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if(stat != null){ //getData的异步实现版本 zk.getData(ZKConstants.ZK_NODE,this,this,"status"); } } /** @Author mxn * @Description //TODO 此回调用于检索节点的数据和stat * @Date 21:23 2021/10/20 * @param rc 调用返回的code或结果 * @param path 传递给异步调用的路径 * @param ctx 传递给异步调用的上下文对象 * @param data 节点的数据 * @param stat 指定节点的Stat对象 * @return **/ @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if(data != null ){ String s = new String(data); conf.setConf(s); cc.countDown(); } } /** @Author mxn * @Description //TODO Watcher接口的实现。 * Watcher接口指定事件处理程序类必须实现的公共接口。 * ZooKeeper客户机将从它连接到的ZooKeeper服务器获取各种事件。 * 使用这种客户机的应用程序通过向客户机注册回调对象来处理这些事件。 * 回调对象应该是实现监视器接口的类的实例。 * @Date 21:24 2021/10/20 * @Param watchedEvent WatchedEvent表示监视者能够响应的ZooKeeper上的更改。 * WatchedEvent包含发生了什么, * ZooKeeper的当前状态,以及事件中涉及的znode的路径。 * @return **/ @Override public void process(WatchedEvent event) { switch (event.getType()) { case None: break; case NodeCreated: //当一个node被创建后,获取node //getData中又会触发StatCallback的回调processResult zk.getData(ZKConstants.ZK_NODE,this,this,"sdfs"); break; case NodeDeleted: //节点删除 conf.setConf(""); //重新开启CountDownLatch cc = new CountDownLatch(1); break; case NodeDataChanged: //节点数据被改变了 //触发DataCallback的回调 zk.getData(ZKConstants.ZK_NODE,this,this,"sdfs"); break; //子节点发生变化的时候 case NodeChildrenChanged: break; } } }
当前面准备好了之后,我们可以编写测试用例了:
ZKUtils 工具类
import org.apache.zookeeper.ZooKeeper; import java.util.concurrent.CountDownLatch; /** * @program: mxnzookeeper * @ClassName ZKUtils * @description: * @author: muxiaonong * @create: 2021-10-19 21:59 * @Version 1.0 **/ public class ZKUtils { private static ZooKeeper zk; //192.168.5.130:2181/mxn 这个后面/mxn,表示客户端如果成功建立了到zk集群的连接, // 那么默认该客户端工作的根path就是/mxn,如果不带/mxn,默认根path是/ //当然我们要保证/mxn这个节点在ZK上是存在的 private static String address ="192.18.5.129:2181,192.168.5.130:2181,192.168.5.130:2181/mxn"; private static DefaultWatch watch = new DefaultWatch(); private static CountDownLatch init = new CountDownLatch(1); public static ZooKeeper getZK(){ try { //因为是异步的,所以要await,等到连接上zk集群之后再进行后续操作 zk = new ZooKeeper(address,1000,watch); watch.setCc(init); init.await(); } catch (Exception e) { e.printStackTrace(); } return zk; } }
测试类:
import org.apache.zookeeper.ZooKeeper; import org.junit.Before; import org.junit.Test; /** * @program: mxnzookeeper * @ClassName TestConfig * @description: * @author: muxiaonong * @create: 2021-10-19 22:04 * @Version 1.0 **/ public class TestConfig { ZooKeeper zk; @Before public void conn(){ zk = ZKUtils.getZK(); } /** @Author mxn * @Description //TODO 关闭ZK * @Date 21:16 2021/10/20 * @Param * @return **/ public void close(){ try { zk.close(); }catch (Exception e){ e.printStackTrace(); } } @Test public void getConf(){ WatchCallBack watchCallBack = new WatchCallBack(); watchCallBack.setZk(zk); MyConfig myConfig = new MyConfig(); watchCallBack.setConf(myConfig); //阻塞等待 watchCallBack.aWait(); while(true){ if(myConfig.getConf().equals("")){ System.out.println("zk node 节点丢失了 ......"); watchCallBack.aWait(); }else{ System.out.println(myConfig.getConf()); } // try { //每隔500毫秒打印一次 Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }