【分布式】Zookeeper使用--开源客户端(二)

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
简介: 上一篇博客已经介绍了如何使用Zookeeper提供的原生态Java API进行操作,本篇博文主要讲解如何通过开源客户端来进行操作。

 3.8 节点监听  


package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class NodeCache_Sample {
    static String path = "/zk-book/nodecache";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
    public static void main(String[] args) throws Exception {
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
        final NodeCache cache = new NodeCache(client, path, false);
        cache.start(true);
        cache.getListenable().addListener(new NodeCacheListener() {
            public void nodeChanged() throws Exception {
                System.out.println("Node data update, new data: " + new String(cache.getCurrentData().getData()));
            }
        });
        client.setData().forPath(path, "u".getBytes());
        Thread.sleep(1000);
        client.delete().deletingChildrenIfNeeded().forPath(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

 运行结果:  


Node data update, new data: u

  当节点数据变更后收到了通知。NodeCache不仅可以监听数据节点的内容变更,也能监听指定节点是否存在。

  

3.9 子节点监听 

package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class PathChildrenCache_Sample {
    static String path = "/zk-book";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).sessionTimeoutMs(5000).build();
    public static void main(String[] args) throws Exception {
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, path, true);
        cache.start(StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("CHILD_ADDED," + event.getData().getPath());
                    break;
                case CHILD_UPDATED:
                    System.out.println("CHILD_UPDATED," + event.getData().getPath());
                    break;
                case CHILD_REMOVED:
                    System.out.println("CHILD_REMOVED," + event.getData().getPath());
                    break;
                default:
                    break;
                }
            }
        });
        client.create().withMode(CreateMode.PERSISTENT).forPath(path);
        Thread.sleep(1000);
        client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
        Thread.sleep(1000);
        client.delete().forPath(path + "/c1");
        Thread.sleep(1000);
        client.delete().forPath(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

 运行结果:

CHILD_ADDED,/zk-book/c1
CHILD_REMOVED,/zk-book/c1

  监听节点的子节点,包括新增、数据变化、删除三类事件。


3.10 Master选举

  借助Zookeeper,开发者可以很方便地实现Master选举功能,其大体思路如下:选择一个根节点,如/master_select,多台机器同时向该节点创建一个子节点/master_select/lock,利用Zookeeper特性,最终只有一台机器能够成功创建,成功的那台机器就是Master。

package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class Recipes_MasterSelect {
    static String master_path = "/curator_recipes_master_path";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
    public static void main(String[] args) throws Exception {
        client.start();
        LeaderSelector selector = new LeaderSelector(client, master_path, new LeaderSelectorListenerAdapter() {
            public void takeLeadership(CuratorFramework client) throws Exception {
                System.out.println("成为Master角色");
                Thread.sleep(3000);
                System.out.println("完成Master操作,释放Master权利");
            }
        });
        selector.autoRequeue();
        selector.start();
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运行结果:

成为Master角色
完成Master操作,释放Master权利
成为Master角色

  以上结果会反复循环,并且当一个应用程序完成Master逻辑后,另外一个应用程序的相应方法才会被调用,即当一个应用实例成为Master后,其他应用实例会进入等待,直到当前Master挂了或者推出后才会开始选举Master。


3.11 分布式锁

  为了保证数据的一致性,经常在程序的某个运行点需要进行同步控制。以流水号生成场景为例,普通的后台应用通常采用时间戳方式来生成流水号,但是在用户量非常大的情况下,可能会出现并发问题。 

package com.hust.grid.leesf.curator.examples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
public class Recipes_NoLock {
    public static void main(String[] args) throws Exception {
        final CountDownLatch down = new CountDownLatch(1);
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                public void run() {
                    try {
                        down.await();
                    } catch (Exception e) {
                    }
                    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                    String orderNo = sdf.format(new Date());
                    System.err.println("生成的订单号是 : " + orderNo);
                }
            }).start();
        }
        down.countDown();
    }
}

 运行结果: 

生成的订单号是 : 16:29:10|590
生成的订单号是 : 16:29:10|590
生成的订单号是 : 16:29:10|591
生成的订单号是 : 16:29:10|591
生成的订单号是 : 16:29:10|590
生成的订单号是 : 16:29:10|590
生成的订单号是 : 16:29:10|591
生成的订单号是 : 16:29:10|590
生成的订单号是 : 16:29:10|592
生成的订单号是 : 16:29:10|591

 结果表示订单号出现了重复,即普通的方法无法满足业务需要,因为其未进行正确的同步。可以使用Curator来实现分布式锁功能。

package com.hust.grid.leesf.curator.examples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class Recipes_Lock {
    static String lock_path = "/curator_recipes_lock_path";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
    public static void main(String[] args) throws Exception {
        client.start();
        final InterProcessMutex lock = new InterProcessMutex(client, lock_path);
        final CountDownLatch down = new CountDownLatch(1);
        for (int i = 0; i < 30; i++) {
            new Thread(new Runnable() {
                public void run() {
                    try {
                        down.await();
                        lock.acquire();
                    } catch (Exception e) {
                    }
                    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                    String orderNo = sdf.format(new Date());
                    System.out.println("生成的订单号是 : " + orderNo);
                    try {
                        lock.release();
                    } catch (Exception e) {
                    }
                }
            }).start();
        }
        down.countDown();
    }
}

 运行结果:


生成的订单号是 : 16:31:50|293
生成的订单号是 : 16:31:50|319
生成的订单号是 : 16:31:51|278
生成的订单号是 : 16:31:51|326
生成的订单号是 : 16:31:51|402
生成的订单号是 : 16:31:51|420
生成的订单号是 : 16:31:51|546
生成的订单号是 : 16:31:51|602
生成的订单号是 : 16:31:51|626
生成的订单号是 : 16:31:51|656
生成的订单号是 : 16:31:51|675
生成的订单号是 : 16:31:51|701
生成的订单号是 : 16:31:51|708
生成的订单号是 : 16:31:51|732
生成的订单号是 : 16:31:51|763
生成的订单号是 : 16:31:51|785
生成的订单号是 : 16:31:51|805
生成的订单号是 : 16:31:51|823
生成的订单号是 : 16:31:51|839
生成的订单号是 : 16:31:51|853
生成的订单号是 : 16:31:51|868
生成的订单号是 : 16:31:51|884
生成的订单号是 : 16:31:51|897
生成的订单号是 : 16:31:51|910
生成的订单号是 : 16:31:51|926
生成的订单号是 : 16:31:51|939
生成的订单号是 : 16:31:51|951
生成的订单号是 : 16:31:51|965
生成的订单号是 : 16:31:51|972
生成的订单号是 : 16:31:51|983

 结果表明此时已经不存在重复的流水号。


3.12 分布式计数器

  分布式计数器的典型应用是统计系统的在线人数,借助Zookeeper也可以很方便实现分布式计数器功能:指定一个Zookeeper数据节点作为计数器,多个应用实例在分布式锁的控制下,通过更新节点的内容来实现计数功能。 

package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
public class Recipes_DistAtomicInt {
    static String distatomicint_path = "/curator_recipes_distatomicint_path";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
    public static void main(String[] args) throws Exception {
        client.start();
        DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client, distatomicint_path,
                new RetryNTimes(3, 1000));
        AtomicValue<Integer> rc = atomicInteger.add(8);
        System.out.println("Result: " + rc.succeeded());
    }
}

运行结果:

Result: true

  结果表明已经将数据成功写入数据节点中。


3.13 分布式Barrier

  如同JDK的CyclicBarrier,Curator提供了DistributedBarrier来实现分布式Barrier。

package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class Recipes_Barrier {
    static String barrier_path = "/curator_recipes_barrier_path";
    static DistributedBarrier barrier;
    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                public void run() {
                    try {
                        CuratorFramework client = CuratorFrameworkFactory.builder()
                                .connectString("127.0.0.1:2181")
                                .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
                        client.start();
                        barrier = new DistributedBarrier(client, barrier_path);
                        System.out.println(Thread.currentThread().getName() + "号barrier设置");
                        barrier.setBarrier();
                        barrier.waitOnBarrier();
                        System.err.println("启动...");
                    } catch (Exception e) {
                    }
                }
            }).start();
        }
        Thread.sleep(2000);
        barrier.removeBarrier();
    }
}

  运行结果:

Thread-1号barrier设置
Thread-2号barrier设置
Thread-4号barrier设置
Thread-3号barrier设置
Thread-0号barrier设置
启动...
启动...
启动...
启动...
启动...

结果表明通过DistributedBarrier可以实现类似于CyclicBarrier的分布式Barrier功能。


四、Curator工具类


  4.1 ZKPaths

  其提供了简单的API来构建znode路径、递归创建、删除节点等。

package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.utils.ZKPaths.PathAndNode;
import org.apache.zookeeper.ZooKeeper;
public class ZKPaths_Sample {
    static String path = "/curator_zkpath_sample";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
    public static void main(String[] args) throws Exception {
        client.start();
        ZooKeeper zookeeper = client.getZookeeperClient().getZooKeeper();
        System.out.println(ZKPaths.fixForNamespace(path, "sub"));
        System.out.println(ZKPaths.makePath(path, "sub"));
        System.out.println(ZKPaths.getNodeFromPath("/curator_zkpath_sample/sub1"));
        PathAndNode pn = ZKPaths.getPathAndNode("/curator_zkpath_sample/sub1");
        System.out.println(pn.getPath());
        System.out.println(pn.getNode());
        String dir1 = path + "/child1";
        String dir2 = path + "/child2";
        ZKPaths.mkdirs(zookeeper, dir1);
        ZKPaths.mkdirs(zookeeper, dir2);
        System.out.println(ZKPaths.getSortedChildren(zookeeper, path));
        ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
    }
}

  运行结果: 

/curator_zkpath_sample/sub
/curator_zkpath_sample/sub
sub1
/curator_zkpath_sample
sub1
[child1, child2]

 借助ZKPaths可快速方便的完成节点的创建等操作。


4.2 EnsurePath

  其提供了一种能够确保数据节点存在的机制,当上层业务希望对一个数据节点进行操作时,操作前需要确保该节点存在。 

package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;
public class EnsurePathDemo {
    static String path = "/zk-book/c1";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
    public static void main(String[] args) throws Exception {
        client.start();
        client.usingNamespace("zk-book");
        EnsurePath ensurePath = new EnsurePath(path);
        ensurePath.ensure(client.getZookeeperClient());
        ensurePath.ensure(client.getZookeeperClient());
        EnsurePath ensurePath2 = client.newNamespaceAwareEnsurePath("/c1");
        ensurePath2.ensure(client.getZookeeperClient());
    }
}

EnsurePath采取了如下节点创建方式,试图创建指定节点,如果节点已经存在,那么就不进行任何操作,也不对外抛出异常,否则正常创建数据节点。


五、总结


  本篇介绍了使用Zookeeper的开源客户端如何操作Zookeeper的方法,相应的源码也已经上传至github,谢谢各位园友的观看~


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
4天前
|
人工智能 监控 开发者
阿里云PAI发布DeepRec Extension,打造稳定高效的分布式训练,并宣布开源!
阿里云PAI发布DeepRec Extension,打造稳定高效的分布式训练,并宣布开源!
|
6月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
3月前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
4月前
|
消息中间件 监控 数据可视化
Apache Airflow 开源最顶级的分布式工作流平台
Apache Airflow 是一个用于创作、调度和监控工作流的平台,通过将工作流定义为代码,实现更好的可维护性和协作性。Airflow 使用有向无环图(DAG)定义任务,支持动态生成、扩展和优雅的管道设计。其丰富的命令行工具和用户界面使得任务管理和监控更加便捷。适用于静态和缓慢变化的工作流,常用于数据处理。
Apache Airflow 开源最顶级的分布式工作流平台
|
4月前
|
存储 运维 NoSQL
分布式读写锁的奥义:上古世代 ZooKeeper 的进击
本文作者将介绍女娲对社区 ZooKeeper 在分布式读写锁实践细节上的思考,希望帮助大家理解分布式读写锁背后的原理。
121 11
|
5月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
88 2
|
5月前
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
116 1
|
5月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
87 1
|
5月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
77 0
|
13天前
|
NoSQL Java 中间件
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
本文介绍了从单机锁到分布式锁的演变,重点探讨了使用Redis实现分布式锁的方法。分布式锁用于控制分布式系统中多个实例对共享资源的同步访问,需满足互斥性、可重入性、锁超时防死锁和锁释放正确防误删等特性。文章通过具体示例展示了如何利用Redis的`setnx`命令实现加锁,并分析了简化版分布式锁存在的问题,如锁超时和误删。为了解决这些问题,文中提出了设置锁过期时间和在解锁前验证持有锁的线程身份的优化方案。最后指出,尽管当前设计已解决部分问题,但仍存在进一步优化的空间,将在后续章节继续探讨。
450 131
【📕分布式锁通关指南 02】基于Redis实现的分布式锁