开发者社区> 牧小农> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

ZooKeeper分布式配置——看这篇就够了(2)

简介: ZooKeeper分布式配置——看这篇就够了
+关注继续查看

代码实现


下面我们就来演示如何使用代码来实现ZooKeeper的配置


首先我们需要引入ZK的jar


<dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.6.3</version>
    </dependency>


配置类


既然我们要做的是分布式配置,首先我们需要模拟一个配置,这个配置用来同步服务的地址

/**
 * @program: mxnzookeeper
 * @ClassName MyConf
 * @description: 配置类
 * @author: muxiaonong
 * @create: 2021-10-19 22:18
 * @Version 1.0
 **/
public class MyConfig {

    private String conf ;

    public String getConf() {
        return conf;
    }

    public void setConf(String conf) {
        this.conf = conf;
    }

}

Watcher

创建ZooKeeper的时候,我们需要一个Watcher进行监听,后续对Znode节点操作的时候,我们也需要使用到Watcher,但是这两类的功能不一样,所以我们需要定义一个自己的watcher类,如下所示:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

import java.util.concurrent.CountDownLatch;

/**
 * @program: mxnzookeeper
 * @ClassName DefaultWatch
 * @description:
 * @author: muxiaonong
 * @create: 2021-10-19 22:02
 * @Version 1.0
 **/
public class DefaultWatch implements Watcher {

    CountDownLatch cc;
    
    public void setCc(CountDownLatch cc) {
        this.cc = cc;
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println(event.toString());

        switch (event.getState()) {
            case Unknown:
                break;
            case Disconnected:
                break;
            case NoSyncConnected:
                break;
            case SyncConnected:
                System.out.println("连接成功。。。。。");
                //连接成功后,执行countDown,此时便可以拿zk对象使用了
                cc.countDown();
                break;
            case AuthFailed:
                break;
            case ConnectedReadOnly:
                break;
            case SaslAuthenticated:
                break;
            case Expired:
                break;
            case Closed:
                break;
        }

    }
}

由于是异步进行操作的,我们创建一个ZooKeeper对象之后,如果不进行阻塞操作的话,有可能还没有连接完成就执行后续的操作,所以这里我们用 CountDownLatch进行阻塞操作,当监测连接成功后,进行 countDown放行,执行后续的ZK的动作。


当我们连接成功 ZooKeeper 之后,我们需要通过 exists判断是否存在节点,存在就进行 getData操作。这里我们创建一个 WatchCallBack因为exists和getData都需要一个callback,所以除了实现Watcher以外还需要实现节点状态:AsyncCallback.StatCallback 数据监听:AsyncCallback.DataCallback


import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

/**
 * @program: mxnzookeeper
 * @ClassName WatchCallBack
 * @description:
 * @author: muxiaonong
 * @create: 2021-10-19 22:13
 * @Version 1.0
 **/
public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {

    ZooKeeper zk ;
    MyConfig conf ;
    CountDownLatch cc = new CountDownLatch(1);

    public MyConfig getConf() {
        return conf;
    }

    public void setConf(MyConfig conf) {
        this.conf = conf;
    }

    public ZooKeeper getZk() {
        return zk;
    }

    public void setZk(ZooKeeper zk) {
        this.zk = zk;
    }


    public void aWait(){
        //exists的异步实现版本
        zk.exists(ZKConstants.ZK_NODE,this,this ,"exists watch");
        try {
            cc.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /** @Author mxn
     * @Description //TODO 此回调用于检索节点的stat
     * @Date 21:24 2021/10/20
     * @param rc 调用返回的code或结果
     * @param path 传递给异步调用的路径
     * @param ctx 传递给异步调用的上下文对象
     * @param stat 指定路径上节点的Stat对象
     * @return 
     **/
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        if(stat != null){
            //getData的异步实现版本
            zk.getData(ZKConstants.ZK_NODE,this,this,"status");
        }
    }


    /** @Author mxn
     * @Description //TODO  此回调用于检索节点的数据和stat
     * @Date 21:23 2021/10/20
     * @param rc 调用返回的code或结果
     * @param path 传递给异步调用的路径
     * @param ctx 传递给异步调用的上下文对象
     * @param data 节点的数据
     * @param stat 指定节点的Stat对象
     * @return
     **/
    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        if(data != null ){
            String s = new String(data);
            conf.setConf(s);
            cc.countDown();
        }
    }

    /** @Author mxn
     * @Description //TODO Watcher接口的实现。
     *                      Watcher接口指定事件处理程序类必须实现的公共接口。
     *                      ZooKeeper客户机将从它连接到的ZooKeeper服务器获取各种事件。
     *                      使用这种客户机的应用程序通过向客户机注册回调对象来处理这些事件。
     *                      回调对象应该是实现监视器接口的类的实例。
     * @Date 21:24 2021/10/20
     * @Param  watchedEvent WatchedEvent表示监视者能够响应的ZooKeeper上的更改。
     *          WatchedEvent包含发生了什么,
     *          ZooKeeper的当前状态,以及事件中涉及的znode的路径。
     * @return 
     **/
    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                //当一个node被创建后,获取node
                //getData中又会触发StatCallback的回调processResult
                zk.getData(ZKConstants.ZK_NODE,this,this,"sdfs");
                break;
            case NodeDeleted:
                //节点删除
                conf.setConf("");
                //重新开启CountDownLatch
                cc = new CountDownLatch(1);
                break;
            case NodeDataChanged:
                //节点数据被改变了
                //触发DataCallback的回调
                zk.getData(ZKConstants.ZK_NODE,this,this,"sdfs");
                break;
                //子节点发生变化的时候
            case NodeChildrenChanged:
                break;
        }


    }
}

当前面准备好了之后,我们可以编写测试用例了:

ZKUtils 工具类


import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

/**
 * @program: mxnzookeeper
 * @ClassName ZKUtils
 * @description:
 * @author: muxiaonong
 * @create: 2021-10-19 21:59
 * @Version 1.0
 **/
public class ZKUtils {

    private static ZooKeeper zk;

    //192.168.5.130:2181/mxn 这个后面/mxn,表示客户端如果成功建立了到zk集群的连接,
    // 那么默认该客户端工作的根path就是/mxn,如果不带/mxn,默认根path是/
    //当然我们要保证/mxn这个节点在ZK上是存在的
    private static String address ="192.18.5.129:2181,192.168.5.130:2181,192.168.5.130:2181/mxn";

    private static DefaultWatch watch = new DefaultWatch();

    private static CountDownLatch init = new CountDownLatch(1);

    public static ZooKeeper getZK(){

        try {
            //因为是异步的,所以要await,等到连接上zk集群之后再进行后续操作
            zk = new ZooKeeper(address,1000,watch);
            watch.setCc(init);
            init.await();

        } catch (Exception e) {
            e.printStackTrace();
        }

        return zk;
    }

}

测试类:


import org.apache.zookeeper.ZooKeeper;
import org.junit.Before;
import org.junit.Test;

/**
 * @program: mxnzookeeper
 * @ClassName TestConfig
 * @description:
 * @author: muxiaonong
 * @create: 2021-10-19 22:04
 * @Version 1.0
 **/
public class TestConfig {

    ZooKeeper zk;

    @Before
    public void conn(){
        zk = ZKUtils.getZK();
    }

    /** @Author mxn
     * @Description //TODO 关闭ZK
     * @Date 21:16 2021/10/20
     * @Param
     * @return
     **/
    public void close(){
        try {
            zk.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Test
    public void getConf(){
        WatchCallBack watchCallBack = new WatchCallBack();
        watchCallBack.setZk(zk);
        MyConfig myConfig = new MyConfig();
        watchCallBack.setConf(myConfig);

        //阻塞等待
        watchCallBack.aWait();


        while(true){

            if(myConfig.getConf().equals("")){
                System.out.println("zk node 节点丢失了 ......");
                watchCallBack.aWait();
            }else{
                System.out.println(myConfig.getConf());

            }
//
            try {
                //每隔500毫秒打印一次
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


    }



版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Zookeeper入门看这篇就够了(3)
Zookeeper入门看这篇就够了
29 0
【分布式】Zookeeper会话
 前面分析了Zookeeper客户端的细节,接着继续学习Zookeeper中的一个非常重要的概念:会话
14 0
【分布式】Zookeeper使用--开源客户端(二)
上一篇博客已经介绍了如何使用Zookeeper提供的原生态Java API进行操作,本篇博文主要讲解如何通过开源客户端来进行操作。
17 0
【分布式】Zookeeper在大型分布式系统中的应用
上一篇博文讲解了Zookeeper的典型应用场景,在大数据时代,各种分布式系统层出不穷,其中,有很多系统都直接或间接使用了Zookeeper,用来解决诸如配置管理、分布式通知/协调、集群管理和Master选举等一系列分布式问题。
45 0
【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列
前言上文【从入门到放弃-ZooKeeper】ZooKeeper入门中,我们学习了ZooKeeper的简单安装和cli使用。接下来我们开始基于java API的实战编程。本文先来写一个分布式队列的代码实现。
1693 0
ZooKeeper分布式入门实战(一)-基本安装配置等
1.1 zookeeper 简介 中间件,提供协调服务 作用于分布式系统,发挥其优势,可以为大数据服务 支持 Java, 提供 Java 和 C语言的客户端 API 1.
1188 0
+关注
牧小农
业精于勤荒于嬉,行成于思毁于随。
134
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载