ZooKeeper分布式配置——看这篇就够了(2)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: ZooKeeper分布式配置——看这篇就够了

代码实现


下面我们就来演示如何使用代码来实现ZooKeeper的配置


首先我们需要引入ZK的jar


<dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.6.3</version>
    </dependency>


配置类


既然我们要做的是分布式配置,首先我们需要模拟一个配置,这个配置用来同步服务的地址

/**
 * @program: mxnzookeeper
 * @ClassName MyConf
 * @description: 配置类
 * @author: muxiaonong
 * @create: 2021-10-19 22:18
 * @Version 1.0
 **/
public class MyConfig {
    private String conf ;
    public String getConf() {
        return conf;
    }
    public void setConf(String conf) {
        this.conf = conf;
    }
}

Watcher

创建ZooKeeper的时候,我们需要一个Watcher进行监听,后续对Znode节点操作的时候,我们也需要使用到Watcher,但是这两类的功能不一样,所以我们需要定义一个自己的watcher类,如下所示:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.CountDownLatch;
/**
 * @program: mxnzookeeper
 * @ClassName DefaultWatch
 * @description:
 * @author: muxiaonong
 * @create: 2021-10-19 22:02
 * @Version 1.0
 **/
public class DefaultWatch implements Watcher {
    CountDownLatch cc;
    public void setCc(CountDownLatch cc) {
        this.cc = cc;
    }
    @Override
    public void process(WatchedEvent event) {
        System.out.println(event.toString());
        switch (event.getState()) {
            case Unknown:
                break;
            case Disconnected:
                break;
            case NoSyncConnected:
                break;
            case SyncConnected:
                System.out.println("连接成功。。。。。");
                //连接成功后,执行countDown,此时便可以拿zk对象使用了
                cc.countDown();
                break;
            case AuthFailed:
                break;
            case ConnectedReadOnly:
                break;
            case SaslAuthenticated:
                break;
            case Expired:
                break;
            case Closed:
                break;
        }
    }
}

由于是异步进行操作的,我们创建一个ZooKeeper对象之后,如果不进行阻塞操作的话,有可能还没有连接完成就执行后续的操作,所以这里我们用 CountDownLatch进行阻塞操作,当监测连接成功后,进行 countDown放行,执行后续的ZK的动作。


当我们连接成功 ZooKeeper 之后,我们需要通过 exists判断是否存在节点,存在就进行 getData操作。这里我们创建一个 WatchCallBack因为exists和getData都需要一个callback,所以除了实现Watcher以外还需要实现节点状态:AsyncCallback.StatCallback 数据监听:AsyncCallback.DataCallback

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
/**
 * @program: mxnzookeeper
 * @ClassName WatchCallBack
 * @description:
 * @author: muxiaonong
 * @create: 2021-10-19 22:13
 * @Version 1.0
 **/
public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
    ZooKeeper zk ;
    MyConfig conf ;
    CountDownLatch cc = new CountDownLatch(1);
    public MyConfig getConf() {
        return conf;
    }
    public void setConf(MyConfig conf) {
        this.conf = conf;
    }
    public ZooKeeper getZk() {
        return zk;
    }
    public void setZk(ZooKeeper zk) {
        this.zk = zk;
    }
    public void aWait(){
        //exists的异步实现版本
        zk.exists(ZKConstants.ZK_NODE,this,this ,"exists watch");
        try {
            cc.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    /** @Author mxn
     * @Description //TODO 此回调用于检索节点的stat
     * @Date 21:24 2021/10/20
     * @param rc 调用返回的code或结果
     * @param path 传递给异步调用的路径
     * @param ctx 传递给异步调用的上下文对象
     * @param stat 指定路径上节点的Stat对象
     * @return 
     **/
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        if(stat != null){
            //getData的异步实现版本
            zk.getData(ZKConstants.ZK_NODE,this,this,"status");
        }
    }
    /** @Author mxn
     * @Description //TODO  此回调用于检索节点的数据和stat
     * @Date 21:23 2021/10/20
     * @param rc 调用返回的code或结果
     * @param path 传递给异步调用的路径
     * @param ctx 传递给异步调用的上下文对象
     * @param data 节点的数据
     * @param stat 指定节点的Stat对象
     * @return
     **/
    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        if(data != null ){
            String s = new String(data);
            conf.setConf(s);
            cc.countDown();
        }
    }
    /** @Author mxn
     * @Description //TODO Watcher接口的实现。
     *                      Watcher接口指定事件处理程序类必须实现的公共接口。
     *                      ZooKeeper客户机将从它连接到的ZooKeeper服务器获取各种事件。
     *                      使用这种客户机的应用程序通过向客户机注册回调对象来处理这些事件。
     *                      回调对象应该是实现监视器接口的类的实例。
     * @Date 21:24 2021/10/20
     * @Param  watchedEvent WatchedEvent表示监视者能够响应的ZooKeeper上的更改。
     *          WatchedEvent包含发生了什么,
     *          ZooKeeper的当前状态,以及事件中涉及的znode的路径。
     * @return 
     **/
    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                //当一个node被创建后,获取node
                //getData中又会触发StatCallback的回调processResult
                zk.getData(ZKConstants.ZK_NODE,this,this,"sdfs");
                break;
            case NodeDeleted:
                //节点删除
                conf.setConf("");
                //重新开启CountDownLatch
                cc = new CountDownLatch(1);
                break;
            case NodeDataChanged:
                //节点数据被改变了
                //触发DataCallback的回调
                zk.getData(ZKConstants.ZK_NODE,this,this,"sdfs");
                break;
                //子节点发生变化的时候
            case NodeChildrenChanged:
                break;
        }
    }
}

当前面准备好了之后,我们可以编写测试用例了:

ZKUtils 工具类

import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
/**
 * @program: mxnzookeeper
 * @ClassName ZKUtils
 * @description:
 * @author: muxiaonong
 * @create: 2021-10-19 21:59
 * @Version 1.0
 **/
public class ZKUtils {
    private static ZooKeeper zk;
    //192.168.5.130:2181/mxn 这个后面/mxn,表示客户端如果成功建立了到zk集群的连接,
    // 那么默认该客户端工作的根path就是/mxn,如果不带/mxn,默认根path是/
    //当然我们要保证/mxn这个节点在ZK上是存在的
    private static String address ="192.18.5.129:2181,192.168.5.130:2181,192.168.5.130:2181/mxn";
    private static DefaultWatch watch = new DefaultWatch();
    private static CountDownLatch init = new CountDownLatch(1);
    public static ZooKeeper getZK(){
        try {
            //因为是异步的,所以要await,等到连接上zk集群之后再进行后续操作
            zk = new ZooKeeper(address,1000,watch);
            watch.setCc(init);
            init.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return zk;
    }
}

测试类:

import org.apache.zookeeper.ZooKeeper;
import org.junit.Before;
import org.junit.Test;
/**
 * @program: mxnzookeeper
 * @ClassName TestConfig
 * @description:
 * @author: muxiaonong
 * @create: 2021-10-19 22:04
 * @Version 1.0
 **/
public class TestConfig {
    ZooKeeper zk;
    @Before
    public void conn(){
        zk = ZKUtils.getZK();
    }
    /** @Author mxn
     * @Description //TODO 关闭ZK
     * @Date 21:16 2021/10/20
     * @Param
     * @return
     **/
    public void close(){
        try {
            zk.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    @Test
    public void getConf(){
        WatchCallBack watchCallBack = new WatchCallBack();
        watchCallBack.setZk(zk);
        MyConfig myConfig = new MyConfig();
        watchCallBack.setConf(myConfig);
        //阻塞等待
        watchCallBack.aWait();
        while(true){
            if(myConfig.getConf().equals("")){
                System.out.println("zk node 节点丢失了 ......");
                watchCallBack.aWait();
            }else{
                System.out.println(myConfig.getConf());
            }
//
            try {
                //每隔500毫秒打印一次
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
3月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
3月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
24天前
|
存储 运维 NoSQL
分布式读写锁的奥义:上古世代 ZooKeeper 的进击
本文作者将介绍女娲对社区 ZooKeeper 在分布式读写锁实践细节上的思考,希望帮助大家理解分布式读写锁背后的原理。
|
27天前
|
数据库
如何在Seata框架中配置分布式事务的隔离级别?
总的来说,配置分布式事务的隔离级别是实现分布式事务管理的重要环节之一,需要认真对待和仔细调整,以满足业务的需求和性能要求。你还可以进一步深入研究和实践 Seata 框架的配置和使用,以更好地应对各种分布式事务场景的挑战。
28 6
|
2月前
|
消息中间件 监控 Ubuntu
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
96 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
|
2月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
55 2
|
2月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
53 1
|
3月前
|
Java 网络安全
zookeeper的环境搭建和配置
本文介绍了如何在多台节点上搭建和配置Zookeeper环境。内容包括Zookeeper的下载、解压、环境变量配置、配置文件修改、zkdata目录创建、myid文件设置,以及将Zookeeper及其配置文件复制到其他节点。还提供了运行测试的命令,包括启动、状态检查和停止Zookeeper服务。
zookeeper的环境搭建和配置
|
2月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
54 0
|
3月前
|
Java
分布式-Zookeeper-分布式锁
分布式-Zookeeper-分布式锁