Zookeeper客户端ZkClient、Curator的使用,史上最详细的教程来啦~

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: Zookeeper客户端ZkClient、Curator的使用,史上最详细的教程来啦~

1 前言

本文主要介绍了操作Zookeeper的几种客户端的基础使用,希望对老铁们会有所帮助。

可以去操作zookeeper创建、删除、查询、修改znode节点


2 Zookeeper服务器客户端分类

目前,Zookeeper服务器有三种Java客户端: Zookeeper、Zkclient和Curator


Zookeeper: Zookeeper是官方提供的原生java客户端

Zkclient: 是在原生zookeeper客户端基础上进行扩展的开源第三方Java客户端

Curator: Netflix公司在原生zookeeper客户端基础上开源的第三方Java客户端

3 Java客户端Zookeeper的使用(官方)

使用官方客户端工具Zookeeper,操作Zookeeper节点。

官网文档: https://zookeeper.apache.org/doc/r3.6.1/apidocs/zookeeper-server/index.html


该客户端是zookeeper官方自带的,编程不是那么方便:

会话超时异常时,不支持自动重连,需要手动重新连接,编程繁琐

watcher 是一次性的,注册一次后会失效

节点数据是二进制,对象数据都需要转换为二进制保存

不支持递归创建节点,需要先创建父节点再创建子节点

不支持递归删除节点,需要先删除子节点再删除父节点

原生zookeeper客户端和服务器端会话的建立是一个异步的过程,也就是说在程序中,我们程序方法在处理完客户端初始化后,立即返回(程序往下执行代码,这样,大多数情况下我们并没有真正构建好一个可用会话,在会话的声明周期处于”CONNECTED”时才算真正建立完毕,所以我们需要使用多线程中的一个工具类CountDownLatch来控制,真正的连接上zk客户端后,才可以继续操作zNode节点)

3.1 Zookeeper(官方)主要API

Zookeeper(官方)主要api如下:

3.2 添加jar包依赖

使用Java客户端Zookeeper,首先需要添加zookeeper的官方客户端jar包依赖:

<!--zookeeper的官方客户端jar包依赖-->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.5</version>
</dependency>

3.3 客户端Zookeeper的使用

3.3.1 创建zookeeper连接

代码示例如下:

/** * <p> * 初始化zookeeper,创建zookeeper客户端对象 * <p/> * * @param connectAddress * @param sessionTimeout * @return void * @Date 2020/6/20 13:22 */
    private static void initConnect(String connectAddress, int sessionTimeout) { 
        try { 
            //创建zookeeper客户端对象
            //zookeeper = new ZooKeeper(connectAddress, sessionTimeout, null);
            //以上这种方式,由于zookeeper连接是异步的,如果new ZooKeeper(connectStr, sessionTimeout, null)完之后马上使用,有可能会报错。
            //解决办法:增加watcher监听事件,如果为SyncConnected,那么才做其他的操作。(这里利用CountDownLatch倒数器来控制)
            zookeeper = new ZooKeeper(connectAddress, sessionTimeout, watchedEvent -> { 
                //获取监听事件的状态
                Watcher.Event.KeeperState state = watchedEvent.getState();
                //获取监听事件类型
                Watcher.Event.EventType type = watchedEvent.getType();
                //如果已经建立上了连接
                if (Watcher.Event.KeeperState.SyncConnected == state) { 
                    if (Watcher.Event.EventType.None == type) { 
                        System.out.println("zookeeper连接成功......");
                        countDownLatch.countDown();
                    }
                }
                if (Watcher.Event.EventType.NodeCreated == type) { 
                    System.out.println("zookeeper有新节点【" + watchedEvent.getPath() + "】创建!");
                }
                if (Watcher.Event.EventType.NodeDataChanged == type) { 
                    System.out.println("zookeeper有节点【" + watchedEvent.getPath() + "】数据变化!");
                }
                if (Watcher.Event.EventType.NodeDeleted == type) { 
                    System.out.println("zookeeper有节点【" + watchedEvent.getPath() + "】被删除!");
                }
                if (Watcher.Event.EventType.NodeChildrenChanged == type) { 
                    System.out.println("zookeeper有子节点【" + watchedEvent.getPath() + "】变化!");
                }
            });
            //倒计数器没有倒数完成,不能执行下面的代码,因为需要等zookeeper连上了,才可以进行node的操作,否则可能会报错
            countDownLatch.await();
            System.out.println("init connect success:" + zookeeper);
        } catch (IOException e) { 
            e.printStackTrace();
        } catch (InterruptedException e) { 
            e.printStackTrace();
        }
    }

3.3.2 创建zNode节点

代码示例如下:

/** * <p> * 根据指定路径,创建zNode节点,并赋值数据 * <p/> * * @param nodePath * @param data * @return void * @Date 2020/6/20 13:58 */
    private static void createNode(String nodePath, String data) throws KeeperException, InterruptedException { 
        if (StringUtils.isEmpty(nodePath)) { 
            System.out.println("节点【" + nodePath + "】不能为空");
            return;
        }
        //对节点是否存在进行判断,否则会报错:【NodeExistsException: KeeperErrorCode = NodeExists for /root】
        Stat exists = zookeeper.exists(nodePath, true);
        if (null != exists) { 
            System.out.println("节点【" + nodePath + "】已存在,不能新增");
            return;
        }
        String result = zookeeper.create(nodePath, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("create:" + "【" + nodePath + "-->" + data + "】,result:" + result);
    }
    /** * <p> * 根据指定路径,递归创建zNode节点,并赋值数据 * <p/> * * @param nodePath * @param data * @return void * @Date 2020/6/20 14:28 */
    private static void createNodeRecursion(String nodePath, String data) throws KeeperException, InterruptedException { 
        if (StringUtils.isEmpty(nodePath)) { 
            System.out.println("节点【" + nodePath + "】不能为空");
            return;
        }
        String paths[] = nodePath.substring(1).split("/");
        for (int i = 0; i < paths.length; i++) { 
            String childPath = "";
            for (int j = 0; j <= i; j++) { 
                childPath += "/" + paths[j];
            }
            createNode(childPath, data);
        }
    }

3.3.3 查询zNode节点

代码示例如下:

/** * <p> * 查询节点 * <p/> * * @param nodePath * @return void * @Date 2020/6/20 15:12 */
    private static void queryNode(String nodePath) throws KeeperException, InterruptedException { 
        System.out.println("--------------------华丽的分割线-------------------------");
        byte[] bytes = zookeeper.getData(nodePath, false, null);
        System.out.println(new String(bytes));
        Stat stat = new Stat();
        byte[] data = zookeeper.getData(nodePath, true, stat);
        System.out.println("queryNode:" + "【" + nodePath + "】,result:" + new String(data) + ",stat:" + stat);
    }

3.3.4 更新zNode节点

代码示例如下:

/** * <p> * 更新指定节点的数据 * <p/> * * @param nodePath * @param data * @return void * @Date 2020/6/20 16:01 */
    private static void updateNodeData(String nodePath, String data) throws KeeperException, InterruptedException { 
        //version = -1代表不指定版本
        Stat stat = zookeeper.setData(nodePath, data.getBytes(), -1);
        System.out.println("setData:" + "【" + nodePath + "】,stat:" + stat);
    }

3.3.5 删除zNode

代码示例如下:

/** * <p> * 根据某个节点,删除节点 * <p/> * * @param nodePath * @return void * @Date 2020/6/20 15:28 */
    private static void deleteNode(String nodePath) throws KeeperException, InterruptedException { 
        System.out.println("--------------------华丽的分割线-------------------------");
        Stat exists = zookeeper.exists(nodePath, true);
        if (null == exists) { 
            System.out.println(nodePath + "不存在,请核实后在进行相关操作!");
            return;
        }
        zookeeper.delete(nodePath, -1);//version:-1表示删除节点时,不指定版本
        System.out.println("delete node:" + "【" + nodePath + "】");
    }
    /** * <p> * 根据某个路径,递归删除节点(该方式会删除父节点) * <p/> * * @param nodePath * @return void * @Date 2020/6/20 15:29 */
    private static void deleteRecursion(String nodePath) throws KeeperException, InterruptedException { 
        System.out.println("--------------------华丽的分割线-------------------------");
        Stat exists = zookeeper.exists(nodePath, true);
        if (null == exists) { 
            System.out.println(nodePath + "不存在,请核实后在进行相关操作!");
            return;
        }
        //获取当前nodePath下,子节点的数据
        List<String> list = zookeeper.getChildren(nodePath, true);
        if (CollectionUtils.isEmpty(list)) { 
            deleteNode(nodePath);
            String parentPath = nodePath.substring(0, nodePath.lastIndexOf("/"));
            System.out.println("parentPath=" + parentPath);
            //如果当前节点存在父节点,连带的删除父节点,以及父节点下所有的子节点
            if (StringUtils.isNotBlank(parentPath)) { 
                deleteRecursion(parentPath);
            }
        } else { 
            for (String child : list) { 
                deleteRecursion(nodePath + "/" + child);
            }
        }
    }

3.3.6 zookeeper基础api使用小结

zk节点的基础增删改查,完整代码示例如下:

/** * <p> * 原生的zookeeper客户端(官方) * 1.连接是异步的,使用时需要注意,增加watcher,监听事件如果为SyncConnected,那么才做其他的操作。(可以使用CountDownLatch或者其他栅栏控制--并发编程) * 2.监听事件是一次性的,如果操作多次需要注册多次 * <p> * api:https://zookeeper.apache.org/doc/r3.6.1/apidocs/zookeeper-server/index.html * <p/> * * @author smilehappiness * @Date 2020/6/20 16:09 */
public class ZookeeperClient01 { 
    /** * 客户端连接地址 */
    private static final String ZK_ADDRESS = "ip:2181";
    /** * 客户端根节点 */
    private static final String ROOT_NODE = "/root";
    /** * 客户端子节点 */
    private static final String ROOT_NODE_CHILDREN = "/root/user";
    /** * 倒计数器,倒数1个 */
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    /** * ZooKeeper对象 */
    private static ZooKeeper zookeeper = null;
    /** * <p> * zookeeper客户端使用 * <p> * 注意:不要使用debug进行断点测试,否则可能会报错(如:org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /root) * <p/> * * @param args * @return void * @Date 2020/6/20 15:42 */
    public static void main(String[] args) throws Exception { 
        //1、初始化zookeeper,创建zookeeper客户端对象
        initConnect(ZK_ADDRESS, 5000);
        //2、创建节点
        createNode(ROOT_NODE, "root data1");
        createNode(ROOT_NODE + "/home", "home data1");
        //递归创建节点(递归每个节点,并赋相同的值,这种场景用的不是很多)
        createNodeRecursion(ROOT_NODE_CHILDREN, "recursion data1");
        //3、查询节点
        queryNode(ROOT_NODE);
        //4、修改节点
        updateNodeData(ROOT_NODE, "nice");
        //5、单个节点删除(注意:如果节点下有子节点,不能删除--NotEmptyException: KeeperErrorCode = Directory not empty for /root)
        //zookeeper客户端的api里,暂时没找到可以直接删除当前节点以及子节点的方法
        //deleteNode(ROOT_NODE);
        //递归删除节点
        deleteRecursion(ROOT_NODE_CHILDREN);
    }
    /** * <p> * 初始化zookeeper,创建zookeeper客户端对象 * <p/> * * @param connectAddress * @param sessionTimeout * @return void * @Date 2020/6/20 13:22 */
    private static void initConnect(String connectAddress, int sessionTimeout) { 
        try { 
            //创建zookeeper客户端对象
            //zookeeper = new ZooKeeper(connectAddress, sessionTimeout, null);
            //以上这种方式,由于zookeeper连接是异步的,如果new ZooKeeper(connectStr, sessionTimeout, null)完之后马上使用,有可能会报错。
            //解决办法:增加watcher监听事件,如果为SyncConnected,那么才做其他的操作。(这里利用CountDownLatch倒数器来控制)
            zookeeper = new ZooKeeper(connectAddress, sessionTimeout, watchedEvent -> { 
                //获取监听事件的状态
                Watcher.Event.KeeperState state = watchedEvent.getState();
                //获取监听事件类型
                Watcher.Event.EventType type = watchedEvent.getType();
                //如果已经建立上了连接
                if (Watcher.Event.KeeperState.SyncConnected == state) { 
                    if (Watcher.Event.EventType.None == type) { 
                        System.out.println("zookeeper连接成功......");
                        countDownLatch.countDown();
                    }
                }
                if (Watcher.Event.EventType.NodeCreated == type) { 
                    System.out.println("zookeeper有新节点【" + watchedEvent.getPath() + "】创建!");
                }
                if (Watcher.Event.EventType.NodeDataChanged == type) { 
                    System.out.println("zookeeper有节点【" + watchedEvent.getPath() + "】数据变化!");
                }
                if (Watcher.Event.EventType.NodeDeleted == type) { 
                    System.out.println("zookeeper有节点【" + watchedEvent.getPath() + "】被删除!");
                }
                if (Watcher.Event.EventType.NodeChildrenChanged == type) { 
                    System.out.println("zookeeper有子节点【" + watchedEvent.getPath() + "】变化!");
                }
            });
            //倒计数器没有倒数完成,不能执行下面的代码,因为需要等zookeeper连上了,才可以进行node的操作,否则可能会报错
            countDownLatch.await();
            System.out.println("init connect success:" + zookeeper);
        } catch (IOException e) { 
            e.printStackTrace();
        } catch (InterruptedException e) { 
            e.printStackTrace();
        }
    }
    /** * <p> * 根据指定路径,创建zNode节点,并赋值数据 * <p/> * * @param nodePath * @param data * @return void * @Date 2020/6/20 13:58 */
    private static void createNode(String nodePath, String data) throws KeeperException, InterruptedException { 
        if (StringUtils.isEmpty(nodePath)) { 
            System.out.println("节点【" + nodePath + "】不能为空");
            return;
        }
        //对节点是否存在进行判断,否则会报错:【NodeExistsException: KeeperErrorCode = NodeExists for /root】
        Stat exists = zookeeper.exists(nodePath, true);
        if (null != exists) { 
            System.out.println("节点【" + nodePath + "】已存在,不能新增");
            return;
        }
        String result = zookeeper.create(nodePath, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("create:" + "【" + nodePath + "-->" + data + "】,result:" + result);
    }
    /** * <p> * 根据指定路径,递归创建zNode节点,并赋值数据 * <p/> * * @param nodePath * @param data * @return void * @Date 2020/6/20 14:28 */
    private static void createNodeRecursion(String nodePath, String data) throws KeeperException, InterruptedException { 
        if (StringUtils.isEmpty(nodePath)) { 
            System.out.println("节点【" + nodePath + "】不能为空");
            return;
        }
        String paths[] = nodePath.substring(1).split("/");
        for (int i = 0; i < paths.length; i++) { 
            String childPath = "";
            for (int j = 0; j <= i; j++) { 
                childPath += "/" + paths[j];
            }
            createNode(childPath, data);
        }
    }
    /** * <p> * 查询节点 * <p/> * * @param nodePath * @return void * @Date 2020/6/20 15:12 */
    private static void queryNode(String nodePath) throws KeeperException, InterruptedException { 
        System.out.println("--------------------华丽的分割线-------------------------");
        byte[] bytes = zookeeper.getData(nodePath, false, null);
        System.out.println(new String(bytes));
        Stat stat = new Stat();
        byte[] data = zookeeper.getData(nodePath, true, stat);
        System.out.println("queryNode:" + "【" + nodePath + "】,result:" + new String(data) + ",stat:" + stat);
    }
    /** * <p> * 更新指定节点的数据 * <p/> * * @param nodePath * @param data * @return void * @Date 2020/6/20 16:01 */
    private static void updateNodeData(String nodePath, String data) throws KeeperException, InterruptedException { 
        //version = -1代表不指定版本
        Stat stat = zookeeper.setData(nodePath, data.getBytes(), -1);
        System.out.println("setData:" + "【" + nodePath + "】,stat:" + stat);
    }
    /** * <p> * 根据某个节点,删除节点 * <p/> * * @param nodePath * @return void * @Date 2020/6/20 15:28 */
    private static void deleteNode(String nodePath) throws KeeperException, InterruptedException { 
        System.out.println("--------------------华丽的分割线-------------------------");
        Stat exists = zookeeper.exists(nodePath, true);
        if (null == exists) { 
            System.out.println(nodePath + "不存在,请核实后在进行相关操作!");
            return;
        }
        zookeeper.delete(nodePath, -1);//version:-1表示删除节点时,不指定版本
        System.out.println("delete node:" + "【" + nodePath + "】");
    }
    /** * <p> * 根据某个路径,递归删除节点(该方式会删除父节点) * <p/> * * @param nodePath * @return void * @Date 2020/6/20 15:29 */
    private static void deleteRecursion(String nodePath) throws KeeperException, InterruptedException { 
        System.out.println("--------------------华丽的分割线-------------------------");
        Stat exists = zookeeper.exists(nodePath, true);
        if (null == exists) { 
            System.out.println(nodePath + "不存在,请核实后在进行相关操作!");
            return;
        }
        //获取当前nodePath下,子节点的数据
        List<String> list = zookeeper.getChildren(nodePath, true);
        if (CollectionUtils.isEmpty(list)) { 
            deleteNode(nodePath);
            String parentPath = nodePath.substring(0, nodePath.lastIndexOf("/"));
            System.out.println("parentPath=" + parentPath);
            //如果当前节点存在父节点,连带的删除父节点,以及父节点下所有的子节点
            if (StringUtils.isNotBlank(parentPath)) { 
                deleteRecursion(parentPath);
            }
        } else { 
            for (String child : list) { 
                deleteRecursion(nodePath + "/" + child);
            }
        }
    }
}

4 Java客户端Zkclient的使用

Zkclient是在原生zookeeper基础上进行扩展的开源第三方Java客户端,使用起来相比较原生客户端,方便太多了。

Github源码:https://github.com/sgroschupf/zkclient


4.1 官方自带的zookeeper客户端存在的问题

session会话超时异常时,不支持自动重连,需要手动重新连接,编程繁琐(生产环境中如果网络出现不稳定情况,那么这种情况出现的更加明显)
ZooKeeper的watcher监听是一次性的,注册一次后会失效
节点数据是二进制,对象数据都需要转换为二进制保存
不支持递归创建节点,需要先创建父节点再创建子节点
不支持递归删除节点,需要先删除子节点再删除父节点
没有领导选举机制,集群情况下可能需要实现stand by,一个服务挂了,另一个需要接替的效果
客户端只提供了存储byte数组的接口,而项目中一般都会使用对象
客户端接口需要处理的异常太多,并且通常,我们也不知道如何处理这些异常
原生zookeeper客户端和服务器端会话的建立是一个异步的过程,也就是说在程序中,我们程序方法在处理完客户端初始化后,立即返回(程序往下执行代码,这样,大多数情况下我们并没有真正构建好一个可用会话,在会话的声明周期处于”CONNECTED”时才算真正建立完毕,所以我们需要使用多线程中的一个工具类CountDownLatch来控制,真正的连接上zk客户端后,才可以继续操作zNode节点)

4.2 为什么使用ZkClient客户端

ZkClient在原生的Zookeeper API接口上进行了包装,更加简单易用。ZkClient内部还实现了Session超时重连、Watcher反复注册等功能,使得这些操作对开发透明,提高了开发的效率。


ZkClient主要解决了原生api的两个大问题:


ZkClient解决了watcher的一次性注册问题,将znode的事件重新定义为子节点的变化、数据的变化、连接状态的变化三类,有ZkClient统一将watcher的WatchedEvent转换到以上三种情况中去处理,watcher执行后重新读取数据的同时,在注册新的相同的watcher。


ZkClient将Zookeeper的watcher机制转化为一种更加容易理解的订阅形式,并且这种关系是可以保持的,而非一次性的。也就是说子节点的变化、数据的变化、状态的变化是可以订阅的。当watcher使用完后,zkClient会自动增加一个相同的watcher。


在session loss和session expire时自动创建新的ZooKeeper实例进行重连


101tec这个zookeeper客户端主要有以下特性:


提供了zookeeper重连的特性——能够在断链的时候,重新建立连接,无论session失效与否.

持久的event监听器机制—— ZKClient框架将事件重新定义分为了stateChanged、znodeChanged、dataChanged三种情况,用户可以注册这三种情况下的监听器(znodeChanged和dataChanged和路径有关),而不是注册Watcher。

zookeeper异常处理——-zookeeper中繁多的Exception,以及每个Exception所需要关注的事情各有不同,I0Itec简单的做了封装。

data序列化——简单的data序列化.(Serialzer/Deserialzer)

有默认的领导选举机制

注意:使用101tec-zkClient,有一个方法还需要完善,create()方法,使用该方法创建节点时,如果节点已经存在,仍然抛出NodeExistException,需要自己手动去判断是否存在某个节点,这一步其实可以优化的。


4.3 添加ZkClient客户端依赖

使用Zkclient需要依赖jar包,使用以下方式,添加maven依赖

添加以下依赖:


<!--zkclient客户端的jar包依赖-->
<dependency>
   <groupId>com.101tec</groupId>
   <artifactId>zkclient</artifactId>
   <version>0.11</version>
</dependency>


4.4 三方客户端ZkClient常用APi的使用

4.4.1 创建ZkClient连接对象

有以下几种方式:


public ZkClient(String serverstring)
public ZkClient(String zkServers, int connectionTimeout)
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout)
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer)
public ZkClient(IZkConnection connection)
public ZkClient(IZkConnection connection, int connectionTimeout)
public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer)


方法参数说明:


zkServers: Zookeeper的服务器列表(如:ip:2181),是一个host:port字符串,如果有多个,由,分割开

sessionTimeout: 会话超时时间,单位ms

connectionTimeout:连接超时时间,单位ms。如果在该时间内还未成功与Zookeeper建立连接,则放弃连接并抛出异常

connection:IZkConnection接口的实现类,IZkConnection是对Zookeeper原生接口最直接的包装。IZkConnection有两种主要实现ZkConnection和InMemoryConnection,最常用的就是ZkConnection

zkSerializer:自定义序列化器。Zookeeper支持byte[]类型的数据,因此需要开发人员自己定义序列化器,允许用户传入一个自定义的序列化实现,如果不传的话,默认使用Java自带的序列化方式进行序列化和反序列化,默认的序列化的数据格式不是很友好,建议使用自定义的序列化器。

4.4.2 创建zNode节点

4.4.2.1 创建zNode节点

有以下几种方法:

public String create(final String path, Object data, final CreateMode mode) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException
public void createEphemeral(final String path) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException
public void createEphemeral(final String path, final Object data) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException
public String createEphemeralSequential(final String path, final Object data) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException
public void createPersistent(String path) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException
public void createPersistent(String path, boolean createParents) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException
public void createPersistent(String path, Object data) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException
public String createPersistentSequential(String path, Object data) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException

以上方法中,第一个方法create()和原生方法一样,可以创建一个节点,createPersistent()是对原生api的封装,使得创建节点更加的简单。createPersistentSequential()也是对原生api的封装,允许创建一个持久化,且有序列号的节点。


方法参数说明:


path:创建的节点路径

data:创建的节点的同时,赋值的内容

createParents:如果父节点不存在,是否创建父节点


4.4.2.2 创建节点状态监听

public void subscribeStateChanges(final IZkStateListener listener)
public interface IZkStateListener { 
    // 会话状态发生变化时回调
    public void handleStateChanged(KeeperState state) throws Exception;
    // 当一个会话过期,重新创建新的会话的时候回调
    public void handleNewSession() throws Exception;
}

注册一个Watcher,用来监听当前客户端会话的状态变化,如果客户端会话状态发生变化,将会回调IZkStateListener中的方法。

4.4.3 获取节点列表

4.4.3.1 获取节点内容

主要的api方法如下:

// 判断节点是否存在,通常情况下,不存在节点,采取创建节点,否则会报错
public boolean exists(final String path)
public <T extends Object> T readData(String path)
public <T extends Object> T readData(String path, boolean returnNullIfPathNotExists)
public <T extends Object> T readData(String path, Stat stat)
// 不在同一个包下无法调用
protected <T extends Object> T readData(final String path, final Stat stat, final boolean watch)
public void subscribeDataChanges(String path, IZkDataListener listener)

参数说明:


path:指定要读取的节点

returnNullIfPathNotExists:字面意思就可以理解,如果节点不存在返回null,而不是抛出异常

stat:指定数据节点的节点状态信息,在交互过程中,会被服务端响应的新stat替换


监听方法subscribeDataChanges()说明:

public interface IZkDataListener { 
    public void handleDataChange(String dataPath, Object data) throws Exception;
    public void handleDataDeleted(String dataPath) throws Exception;
}

参数说明:


dataPath:发生变化的节点的全路径名称

data:节点的新内容


最后一个方法subscribeDataChanges()相当于注册一个Watcher监听,IZkDataListener主要在以下事件发生时通知客户端:


节点内容发生变化时,将会回调handleDataChange()方法

节点被删除时,将会回调handleDataDeleted()方法


4.4.3.2 获取子节点列表

常用的api方法如下:

// 获取当前节点的子节点
public List<String> getChildren(String path)
// 获取当前节点的子节点的数量
public int countChildren(String path)
// 不在同一个包下无法调用,watch:是否使用默认的Watcher
protected List<String> getChildren(final String path, final boolean watch)
// 订阅当前节点的子节点的变化
public List<String> subscribeChildChanges(String path, IZkChildListener listener)

监听方法subscribeChildChanges()说明:

最后一个方法subscribeChildChanges()相当于注册了一个Watcher监听,该方法与注册Watcher的区别是IZkChildListener在注册的时候,节点可以不存在也可以监听,而且一次注册永久有效。

public interface IZkChildListener { 
    public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception;
}

IZkChildListener主要在以下事件发生时通知客户端:


path节点被新增时,parentPath为节点名称,currentChilds为空的集合

path节点被删除时,parentPath为节点名称,currentChilds为null

path节点有子节点被创建时,parentPath为节点名称,currentChilds为当前子节点的名称(相对路径)列表

path节点有子节点被删除时,parentPath为节点名称,currentChilds为当前子节点的名称(相对路径)列表,有可能为空的集合

(子节点都被删除的情况)


4.4.4 设置和修改节点内容

修改节点内容主要有以下方法:

// 为节点设置内容
public void writeData(String path, Object data)
// 为节点的某个版本,设置内容
public void writeData(final String path, Object data, final int expectedVersion)
//该方式设置内容更加的灵活,可以使用DataUpdater<T>设置内容
public <T> void updateDataSerialized(String path, DataUpdater<T> updater)

4.4.5 删除zNode节点

有以下两种方法:

public boolean delete(final String path)
//根据节点路径,版本号,删除节点
public boolean delete(final String path, final int version)
//递归删除当前节点,以及当前节点的子节点
public boolean deleteRecursive(String path)
deleteRecursive()方法支持删除当前节点的子节点,也即是递归删除

4.4.6 ZkClient使用小结

以上介绍了ZkClient基础api,下面是测试的完整代码。

完整代码示例如下:

/** * <p> * Zookeeper 客户端ZkClient基础使用 * <p/> * * @author smilehappiness * @Date 2020/6/20 16:56 */
public class ZookeeperClient02 { 
    /** * 客户端连接地址 */
    private static final String ZK_ADDRESS = "ip:2181";
    /** * 客户端根节点 */
    private static final String ROOT_NODE = "/root";
    /** * 客户端子节点 */
    private static final String ROOT_NODE_CHILDREN = "/root/children/node";
    public static void main(String[] args) { 
        //1、建立zookeeper连接
        //ZkClient zkClient = new ZkClient(ZK_ADDRESS, 5000, 15000);
        //zkClient支持定义自己的序列化格式(zkClient保存的数据格式不好看,可以自定义格式)
        ZkClient zkClient = new ZkClient(ZK_ADDRESS, 5000, 15000, new MyZkSerializer());
        //2、创建一个持久化节点
        //zkClient.createPersistent(ROOT_NODE);
        //判断节点是否存在
        if (!zkClient.exists(ROOT_NODE)) { 
            //创建持久化节点,并赋值
            zkClient.createPersistent(ROOT_NODE, "hello");
            //创建持久化,且有序的节点(如:sequential0000000000,可用于分布式环境下的主键生成器),并赋值
            zkClient.createPersistentSequential(ROOT_NODE + "/sequential", "sequential data");
            //zkClient也支持调用原生方法,创建节点
            String node = zkClient.create(ROOT_NODE + "/home", "zkclient data", CreateMode.PERSISTENT);
            System.out.println(node);
        }
        //创建一个持久化节点,如果父节点不存在,创建父节点
        zkClient.createPersistent(ROOT_NODE_CHILDREN, true);
        //3、查询节点
        //读取节点数据
        Object object = zkClient.readData(ROOT_NODE);
        System.out.println(object);
        //获取当前节点的子节点
        List<String> children = zkClient.getChildren(ROOT_NODE);
        children.forEach(item -> { 
            System.out.println(item);
        });
        //获取当前节点的子节点的数量
        Integer number = zkClient.countChildren(ROOT_NODE);
        System.out.println(number);
        //4、修改节点
        zkClient.writeData(ROOT_NODE, "update hello2");
        //DataUpdater只有一个接口,属于函数式接口,所以可以使用lambda表达式简写
        DataUpdater<String> dataUpdater = new DataUpdater<String>() { 
            @Override
            public String update(String s) { 
                return "update hello3";
            }
        };
        //上面几行代码,可以使用lambda表达式简写
        //DataUpdater<String> dataUpdater2 = s -> "update hello3";
        zkClient.updateDataSerialized(ROOT_NODE, dataUpdater);
        //5、删除节点
        //删除节点,这个方法不能删除含有子节点的节点(如果有子节点,会报错:NotEmptyException: KeeperErrorCode = Directory not empty for /root)
        //zkClient.delete(ROOT_NODE);
        //根据节点路径,版本号,删除节点
        zkClient.delete(ROOT_NODE + "/home", -1);
        //递归删除当前节点,以及当前节点的子节点
        zkClient.deleteRecursive(ROOT_NODE);
    }
}

4.5 ZkClient的事件监听机制总结

在原生Zk API中,提供了watcher的机制监听节点,而zkClient将之转换成Listener的概念,就是订阅服务端的事件,从而我们只要实现IZkChildListener 接口相应的方法就能够对事件进行处理。


订阅/取消订阅节点子变化事件:

public List<String> subscribeChildChanges(String path, IZkChildListener listener)
public void unsubscribeChildChanges(String path, IZkChildListener childListener)

订阅/取消订阅数据变化事件:


public void subscribeDataChanges(String path, IZkDataListener listener)
public void unsubscribeDataChanges(String path, IZkDataListener dataListener)

订阅/取消订阅连接状态变化事件:


public void subscribeStateChanges(final IZkStateListener listener)
public void unsubscribeStateChanges(IZkStateListener stateListener)

取消所有订阅事件:


public void unsubscribeAll()

以上监听内容,摘自:zkClient的事件监听机制


这个ZkClient客户端也不错,就是更新的慢,感觉不怎么维护,现在主流的是Curator客户端,下面,介绍下Curator客户端的使用。


5 Java客户端Curator的使用

Netflix Curator(已经捐献给Apache维护):https://github.com/Netflix/curator

Apache curator官网:http://curator.apache.org


Curator是Netflix公司在原生zookeeper基础上开源的一个Zookeeper Java客户端,目前Curator捐献给了Apache,现在是Apache下的一个开源项目,与Zookeeper提供的原生客户端相比,Curator的进行了高度的抽象和封装,简化了Zookeeper客户端的开发操作。


Curator与ZkClient相比,功能更加的强大,不仅除了解决原生的zk Api的遗留问题,还提供了很多常用的工具类,也提供了很多解决方案,比如分布式锁。Curator API的使用更简洁方便,提供了流式的操作,可以点.点.点.的方式进行方法的调用,所以Curator是目前比较主流的zk客户端。


通过查看官方文档,可以发现Curator主要解决了三类问题:


封装ZooKeeper client与ZooKeeper server之间的连接处理

提供了一套Fluent风格的操作API(点.点.点.)

提供ZooKeeper各种应用场景(recipe)的抽象封装,比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等

5.1 添加Curator客户端依赖

使用Curator需要依赖jar包,使用以下方式,添加maven依赖

添加以下依赖:

<!-- curator-framework -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.2.0</version>
</dependency>
<!-- curator-recipes -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
</dependency>

5.2 三方客户端Curator常用APi的使用

5.2.1 创建Curator连接对象

主要有以下几种方式:


public static CuratorFramework newClient(String connectString,

RetryPolicy retryPolicy) public static CuratorFramework

newClient(String connectString, int sessionTimeoutMs, int

connectionTimeoutMs, RetryPolicy retryPolicy) 方法参数说明:

connectString:指的是需要连接的zookeeperAddress ip retryPolicy:指的是连接zk时使用哪种重试策略

sessionTimeoutMs:指的是会话超时时间 connectionTimeoutMs:指的是连接超时时间


代码示例:


/** * <p> * 创建Curator连接对象 * <p/> * * @param * @return * @Date 2020/6/21 12:29 */
    public static void connectCuratorClient() { 
        //老版本的方式,创建zookeeper连接客户端
        client = CuratorFrameworkFactory.builder().
                connectString(ZK_ADDRESS).
                sessionTimeoutMs(5000).
                connectionTimeoutMs(10000).
                retryPolicy(retry).
                build();
        //创建zookeeper连接,新版本
        //client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, retry);
        //启动客户端(Start the client. Most mutator methods will not work until the client is started)
        client.start();
        System.out.println("zookeeper初始化连接成功:" + client);
    }

在Curator中,首先需要通过CuratorFrameworkFactory创建zookeeperClient连接实例,然后才能继续进行各种基本操作。需要说明的是,关于连接重试策略RetryPolicy,Curator客户端默认提供了以下几种:


RetryNTimes:重试N次

ExponentialBackoff Retry:重试一定次数,每次重试时间依次递增

RetryOneTime:重试一次

RetryUntilElapsed:重试一定时间


5.2.2 创建zNode节点

主要有以下几种方法:


public T forPath(String path) 
     //创建节点,并赋值内容
     public T forPath(String path, byte[] data)
     //判断节点是否存在,节点存在了,创建时仍然会报错
     public ExistsBuilder checkExists()

代码示例:


/** * <p> * 创建节点,并支持赋值数据内容 * <p/> * * @param nodePath * @param data * @return void * @Date 2020/6/21 12:39 */
    private static void createNode(String nodePath, String data) throws Exception { 
        if (StringUtils.isEmpty(nodePath)) { 
            System.out.println("节点【" + nodePath + "】不能为空");
            return;
        }
        //1、对节点是否存在进行判断,否则会报错:【NodeExistsException: KeeperErrorCode = NodeExists for /root】
        Stat exists = client.checkExists().forPath(nodePath);
        if (null != exists) { 
            System.out.println("节点【" + nodePath + "】已存在,不能新增");
            return;
        } else { 
            System.out.println(StringUtils.join("节点【", nodePath, "】不存在,可以新增节点!"));
        }
        //2、创建节点, curator客户端开发提供了Fluent风格的API,是一种流式编码方式,可以不断地点.点.调用api方法
        //创建永久节点(默认就是持久化的)
        client.create().forPath(nodePath);
        //3、手动指定节点的类型
        client.create()
                .withMode(CreateMode.PERSISTENT)
                .forPath(nodePath);
        //4、如果父节点不存在,创建当前节点的父节点
        String node = client.create()
                .creatingParentsIfNeeded()
                .forPath(nodePath);
        System.out.println(node);
        //创建节点,并为当前节点赋值内容
        if (StringUtils.isNotBlank(data)) { 
            //5、创建永久节点,并为当前节点赋值内容
            client.create()
                    .forPath(nodePath, data.getBytes());
            //6、创建永久有序节点
            client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(nodePath, data.getBytes());
            //7、创建临时节点
            client.create()
                    .withMode(CreateMode.EPHEMERAL)
                    .forPath(nodePath, data.getBytes());
        }
        //8、创建临时有序节点
        client.create()
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .forPath(nodePath, data.getBytes());
    }

5.2.3 获取节点列表

5.2.3.1 获取节点内容

主要的api方法如下:


//获取某个节点数据
    client.getData().forPath(nodePath)
    //读取zookeeper的数据,并放到Stat中
    client.getData().storingStatIn(stat1).forPath(nodePath)

参数说明:


path:指定要读取的节点

stat:指定数据节点的节点状态信息


代码示例如下:


/** * <p> * 获取节点数据 * <p/> * * @param nodePath * @return void * @Date 2020/6/21 13:13 */
    private static void getNode(String nodePath) throws Exception { 
        //获取某个节点数据
        byte[] bytes = client.getData().forPath(nodePath);
        System.out.println(StringUtils.join("节点:【", nodePath, "】,数据:", new String(bytes)));
        //读取zookeeper的数据,并放到Stat中
        Stat stat1 = new Stat();
        byte[] bytes2 = client.getData().storingStatIn(stat1).forPath(nodePath);
        System.out.println(StringUtils.join("节点:【", nodePath, "】,数据:", new String(bytes2)));
        System.out.println(stat1);
    }

5.2.3.2 获取子节点列表

常用的api方法如下:


//获取某个节点的所有子节点
    client.getChildren().forPath(nodePath)

代码示例如下:


/** * <p> * 获取节点数据 * <p/> * * @param nodePath * @return void * @Date 2020/6/21 13:20 */
    private static void getNode(String nodePath) throws Exception { 
        //获取某个节点的所有子节点
        List<String> stringList = client.getChildren().forPath(nodePath);
        if (CollectionUtils.isEmpty(stringList)) { 
            return;
        }
        //遍历节点
        stringList.forEach(System.out::println);
    }

5.2.4 设置和修改节点内容

修改节点内容主要有以下方法:


//更新节点
    client.setData().forPath(nodePath, data.getBytes())
    //指定版本号,更新节点
    client.setData().withVersion(-1).forPath(nodePath, data.getBytes())
    //异步设置某个节点数据
    client.setData().inBackground().forPath(nodePath, data.getBytes())
代码示例如下:


/** * <p> * 设置(修改)节点数据 * <p/> * * @param nodePath * @param data * @return void * @Date 2020/6/21 13:46 */
    private static void updateNode(String nodePath, String data) throws Exception { 
        //更新节点
        Stat stat = client.setData().forPath(nodePath, data.getBytes());
        //指定版本号,更新节点,更新的时候如果指定数据版本的话,那么需要和zookeeper中当前数据的版本要一致,-1表示匹配任何版本
        //Stat stat = client.setData().withVersion(-1).forPath(nodePath, data.getBytes());
        System.out.println(stat);
        //异步设置某个节点数据
        Stat stat1 = client.setData().inBackground().forPath(nodePath, data.getBytes());
        System.out.println(stat1.toString());
    }

5.2.5 删除zNode节点

有以下几方法:


//删除节点
    client.delete().forPath(nodePath);
    //删除节点,即使出现网络故障,zookeeper也可以保证删除该节点
    client.delete().guaranteed().forPath(nodePath);
    //级联删除节点(如果当前节点有子节点,子节点也可以一同删除)
    client.delete().deletingChildrenIfNeeded().forPath(nodePath);

5.2.6 Curator使用小结

Curator客户端使用起来非常方便,而且有好多builder对象,比如CreateBuilder、DeleteBuilder、ExistsBuilder、GetDataBuilder、SetDataBuilder等等。


以上介绍了Curator基础Api的使用,如果想了解更多的Api使用,可以参考官网进行学习使用,下面是以上Curator客户端测试的完整代码。


完整代码示例如下:


/** * <p> * Curator客户端基础使用 * <p/> * * @author smilehappiness * @Date 2020/6/21 9:41 */
public class ZookeeperCuratorClient { 
    /** * 客户端连接地址 */
    private static final String ZK_ADDRESS = "ip:2181";
    /** * 客户端根节点 */
    private static final String ROOT_NODE = "/root";
    /** * 客户端子节点 */
    private static final String ROOT_NODE_CHILDREN = "/root/children";
    /** * 会话超时时间 */
    private final int SESSION_TIMEOUT = 20 * 1000;
    /** * 连接超时时间 */
    private final int CONNECTION_TIMEOUT = 5 * 1000;
    /** * 创建zookeeper连接实例 */
    private static CuratorFramework client = null;
    /** * 重试策略 * baseSleepTimeMs:初始的重试等待时间,单位毫秒 * maxRetries:最多重试次数 */
    private static final RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    /** * 重试策略 * n:最多重试次数 * sleepMsBetweenRetries:重试时间间隔,单位毫秒 */
    private static final RetryPolicy retry = new RetryNTimes(3, 2000);
    static { 
        // 创建Curator连接对象
        connectCuratorClient();
    }
    /** * <p> * 创建Curator连接对象 * <p/> * * @param * @return * @Date 2020/6/21 12:29 */
    public static void connectCuratorClient() { 
        //老版本的方式,创建zookeeper连接客户端
        /*client = CuratorFrameworkFactory.builder(). connectString(ZK_ADDRESS). sessionTimeoutMs(5000). connectionTimeoutMs(10000). retryPolicy(retry). build();*/
        //创建zookeeper连接,新版本
        client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, retry);
        //启动客户端(Start the client. Most mutator methods will not work until the client is started)
        client.start();
        System.out.println("zookeeper初始化连接成功:" + client);
    }
    public static void main(String[] args) throws Exception { 
        //创建节点
        //createNode(ROOT_NODE, null);
        //createNode(ROOT_NODE_CHILDREN, "child data");
        //获取节点数据
        getNode(ROOT_NODE);
        //设置(修改)节点数据
        updateNode(ROOT_NODE, "update curator data");
        //异步设置某个节点数据
        updateNode(ROOT_NODE, "update curator data with Async");
        //删除指定节点(这个在原生zk里面,是不能直接删除有子节点的数据的)
        deleteNode(ROOT_NODE);
    }
    /** * <p> * 创建节点,并支持赋值数据内容 * <p/> * * @param nodePath * @param data * @return void * @Date 2020/6/21 12:39 */
    private static void createNode(String nodePath, String data) throws Exception { 
        if (StringUtils.isEmpty(nodePath)) { 
            System.out.println("节点【" + nodePath + "】不能为空");
            return;
        }
        //1、对节点是否存在进行判断,否则会报错:【NodeExistsException: KeeperErrorCode = NodeExists for /root】
        Stat exists = client.checkExists().forPath(nodePath);
        if (null != exists) { 
            System.out.println("节点【" + nodePath + "】已存在,不能新增");
            return;
        } else { 
            System.out.println(StringUtils.join("节点【", nodePath, "】不存在,可以新增节点!"));
        }
        //2、创建节点, curator客户端开发提供了Fluent风格的API,是一种流式编码方式,可以不断地点.点.调用api方法
        //创建永久节点(默认就是持久化的)
        //client.create().forPath(nodePath);
        //3、手动指定节点的类型
        /*client.create() .withMode(CreateMode.PERSISTENT) .forPath(nodePath);*/
        //4、如果父节点不存在,创建当前节点的父节点
        /*String node = client.create() .creatingParentsIfNeeded() .forPath(nodePath); System.out.println(node);*/
        //创建节点,并为当前节点赋值内容
        if (StringUtils.isNotBlank(data)) { 
            //5、创建永久节点,并为当前节点赋值内容
            /*client.create() .forPath(nodePath, data.getBytes());*/
            String node = client.create()
                    .creatingParentsIfNeeded()
                    .forPath(nodePath, data.getBytes());
            System.out.println(node);
            //6、创建永久有序节点
            client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(nodePath + "-sequential", data.getBytes());
            //7、创建临时节点
            client.create()
                    .withMode(CreateMode.EPHEMERAL)
                    .forPath(nodePath + "/temp", data.getBytes());
            //8、创建临时有序节点
            client.create()
                    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                    .forPath(nodePath + "/temp-sequential", data.getBytes());
        }
    }
    /** * <p> * 获取节点数据 * <p/> * * @param nodePath * @return void * @Date 2020/6/21 13:13 */
    private static void getNode(String nodePath) throws Exception { 
        //获取某个节点数据
        byte[] bytes = client.getData().forPath(nodePath);
        System.out.println(StringUtils.join("节点:【", nodePath, "】,数据:", new String(bytes)));
        //读取zookeeper的数据,并放到Stat中
        Stat stat1 = new Stat();
        byte[] bytes2 = client.getData().storingStatIn(stat1).forPath(nodePath);
        System.out.println(StringUtils.join("节点:【", nodePath, "】,数据:", new String(bytes2)));
        System.out.println(stat1);
        //获取某个节点的所有子节点
        List<String> stringList = client.getChildren().forPath(nodePath);
        if (CollectionUtils.isEmpty(stringList)) { 
            return;
        }
        //遍历节点
        stringList.forEach(System.out::println);
    }
    /** * <p> * 设置(修改)节点数据 * <p/> * * @param nodePath * @param data * @return void * @Date 2020/6/21 13:46 */
    private static void updateNode(String nodePath, String data) throws Exception { 
        //更新节点
        Stat stat = client.setData().forPath(nodePath, data.getBytes());
        //指定版本号,更新节点,更新的时候如果指定数据版本的话,那么需要和zookeeper中当前数据的版本要一致,-1表示匹配任何版本
        //Stat stat = client.setData().withVersion(-1).forPath(nodePath, data.getBytes());
        System.out.println(stat);
        //异步设置某个节点数据
        Stat stat1 = client.setData().inBackground().forPath(nodePath, data.getBytes());
        if (null != stat1) { 
            System.out.println(stat1);
        }
    }
    /** * <p> * 删除指定节点 * <p/> * * @param nodePath * @return void * @Date 2020/6/21 13:50 */
    private static void deleteNode(String nodePath) throws Exception { 
        //删除节点
        //client.delete().forPath(nodePath);
        //删除节点,即使出现网络故障,zookeeper也可以保证删除该节点
        //client.delete().guaranteed().forPath(nodePath);
        //级联删除节点(如果当前节点有子节点,子节点也可以一同删除)
        client.delete().deletingChildrenIfNeeded().forPath(nodePath);
    }
}



6 Zookeeper、ZkClient、Curator三种客户端的选择

6.1 原生客户端Zookeeper

官方自带的zookeeper客户端本身存在的问题:


session会话超时异常时,不支持自动重连,需要手动重新连接,编程繁琐(生产环境中如果网络出现不稳定情况,那么这种情况出现的更加明显)
ZooKeeper的watcher监听是一次性的,注册一次后会失效
节点数据是二进制,对象数据都需要转换为二进制保存
不支持递归创建节点,需要先创建父节点再创建子节点
不支持递归删除节点,需要先删除子节点再删除父节点
没有领导选举机制,集群情况下可能需要实现stand by,一个服务挂了,另一个需要接替的效果
客户端只提供了存储byte数组的接口,而项目中一般都会使用对象
客户端接口需要处理的异常太多,并且通常,我们也不知道如何处理这些异常
原生zookeeper客户端和服务器端会话的建立是一个异步的过程,也就是说在程序中,我们程序方法在处理完客户端初始化后,立即返回(程序往下执行代码,这样,大多数情况下我们并没有真正构建好一个可用会话,在会话的声明周期处于”CONNECTED”时才算真正建立完毕,所以我们需要使用多线程中的一个工具类CountDownLatch来控制,真正的连接上zk客户端后,才可以继续操作zNode节点)

6.2 ZkClient客户端

ZkClient在原生的Zookeeper API接口上进行了包装,更加简单易用。ZkClient内部还实现了Session超时重连、Watcher反复注册等功能,使得这些操作对开发透明,提高了开发的效率。


ZkClient主要解决了原生api的两个大问题:


1.ZkClient解决了watcher的一次性注册问题,将znode的事件重新定义为子节点的变化、数据的变化、连接状态的变化三类,有ZkClient统一将watcher的WatchedEvent转换到以上三种情况中去处理,watcher执行后重新读取数据的同时,在注册新的相同的watcher。

2.ZkClient将Zookeeper的watcher机制转化为一种更加容易理解的订阅形式,并且这种关系是可以保持的,而非一次性的。也就是说子节点的变化、数据的变化、状态的变化是可以订阅的。当watcher使用完后,zkClient会自动增加一个相同的watcher。


在session loss和session expire时自动创建新的ZooKeeper实例进行重连

101tec这个zookeeper客户端主要有以下特性:


1.提供了zookeeper重连的特性——能够在断链的时候,重新建立连接,无论session失效与否.

2.持久的event监听器机制—— ZKClient框架将事件重新定义分为了stateChanged、znodeChanged、dataChanged三种情况,用户可以注册这三种情况下的监听器(znodeChanged和dataChanged和路径有关),而不是注册Watcher。

3.zookeeper异常处理——-zookeeper中繁多的Exception,以及每个Exception所需要关注的事情各有不同,I0Itec简单的做了封装。

4.data序列化——简单的data序列化.(Serialzer/Deserialzer)

5.有默认的领导选举机制


这个ZkClient客户端还是不错的,现在公司用的应该是比较多的,但是,这个客户端现在基本不怎么更新了,现在主流的是Curator客户端。


6.3 Curator客户端(推荐)

Curator是Netflix公司在原生zookeeper基础上开源的一个Zookeeper Java客户端,目前Curator捐献给了Apache,现在是Apache下的一个开源项目,与Zookeeper提供的原生客户端相比,Curator的进行了高度的抽象和封装,简化了Zookeeper客户端的开发操作。


通过查看官方文档,可以发现Curator主要解决了三类问题:


封装ZooKeeper client与ZooKeeper server之间的连接处理

提供了一套Fluent风格的操作API(点.点.点.)

提供ZooKeeper各种应用场景(recipe)的抽象封装,比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等

Curator与ZkClient相比,功能更加的强大,不仅除了解决原生的zk Api的遗留问题,还提供了很多常用的工具类,也提供了很多解决方案,比如分布式锁。Curator API的使用更简洁方便,提供了流式的操作,可以点.点.点.的方式进行方法的调用,所以Curator是目前比较主流的zk客户端。


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
4月前
|
Java API Maven
【zookeeper 第五篇章】Curator 库
Curator 是 Netflix 开源的 ZooKeeper 客户端框架,简化了原生 API 的使用并提供了高级功能。可通过 Maven 添加依赖 `curator-framework` 和 `curator-recipes`。示例代码展示了如何创建 Curator 连接、配置重连策略、进行节点的 CRUD 操作以及事务处理等。例如,使用 `ExponentialBackoffRetry` 实现指数退避重试,通过 `create()` 方法创建持久节点,以及利用 `inTransaction()` 启动事务来保证多个操作的原子性。
106 0
|
4月前
|
存储 API Apache
【zookeeper 第三篇章】客户端 API
本文介绍了Apache ZooKeeper客户端的一些常用命令及其用法。首先,`create`命令用于创建不同类型的节点并为其赋值,如持久化节点、有序节点及临时节点等。通过示例展示了如何创建这些节点,并演示了创建过程中的输出结果。其次,`ls`命令用于列出指定路径下的所有子节点。接着,`set`命令用于更新节点中的数据,可以指定版本号实现乐观锁机制。
36 0
|
2月前
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
68 1
|
3月前
|
负载均衡 API 数据安全/隐私保护
Zookeeper的客户端-原生的API
Zookeeper的客户端-原生的API
|
5月前
|
API
【想进大厂还不会阅读源码】ShenYu源码-替换ZooKeeper客户端
ShenYu源码阅读。相信大家碰到源码时经常无从下手,不知道从哪开始阅读😭。我认为有一种办法可以解决大家的困扰!至此,我们发现自己开始从大量堆砌的源码中脱离开来😀。ShenYu是一个异步的,高性能的,跨语言的,响应式的 API 网关。
|
6月前
|
负载均衡 Dubbo Java
zookeeper实战教程
zookeeper实战教程
63 0
zookeeper实战教程
|
7月前
|
Java API Apache
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
77 11
|
7月前
|
存储
ZooKeeper客户端常用命令
ZooKeeper客户端常用命令
69 1
|
3月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
3月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1