【分布式技术专题】「Zookeeper中间件」给大家学习一下Zookeeper的”开发伴侣”—Curator-Framework(组件篇)

本文涉及的产品
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
云原生网关 MSE Higress,422元/月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
简介: 【分布式技术专题】「Zookeeper中间件」给大家学习一下Zookeeper的”开发伴侣”—Curator-Framework(组件篇)

CuratorFramework


Curator-Framework是ZooKeeper Client更高的抽象API,最佳核心的功能就是自动连接管理:


  1. 当ZooKeeper客户端内部出现异常, 将自动进行重连或重试, 该过程对外几乎完全透明


  1. 监控节点数据变化事件NodeDataChanged,需要时调用updateServerList()方法


  1. Curator recipes自动移除监控



CuratorFramework版本


目前Curator有2.x.x和3.x.x两个系列的版本,支持不同版本的Zookeeper。其中Curator 2.x.x兼容Zookeeper的3.4.x和3.5.x。而Curator 3.x.x只兼容Zookeeper 3.5.x,并且提供了一些诸如动态重新配置、watch删除等新特性。



更加清晰的API


简化了ZooKeeper原生的方法, 事件等, 提供流式fluent的接口,提供Recipes实现 : 选举,共享锁, 路径cache, 分布式队列,分布式优先队列等。



maven配置依赖

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>
复制代码


事务管理

/** 事务管理:碰到异常,事务会回滚
 * 使用transaction()来控制事务
 * @throws Exception
 */
public void testTransaction() throws Exception{
    //定义几个基本操作
    CuratorOp createOp = client.transactionOp().create()
            .forPath("/curator/one_path","some data".getBytes());
    CuratorOp setDataOp = client.transactionOp().setData()
            .forPath("/curator","other data".getBytes());
    CuratorOp deleteOp = client.transactionOp().delete()
            .forPath("/curator");
    //事务执行结果
    List<CuratorTransactionResult> results = client.transaction()
            .forOperations(createOp,setDataOp,deleteOp);
    //遍历输出结果
    for(CuratorTransactionResult result : results){
        System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType());
    }
}
//因为节点“/curator”存在子节点,所以在删除的时候将会报错,事务回滚
复制代码




监听器


Curator提供了三种Watcher(Cache)来监听结点的变化:


Path Cache:监视一个路径下


  • 1)孩子结点的创建
  • 2)删除
  • 3)以及结点数据的更新。


产生的事件会传递给注册的PathChildrenCacheListener。


  • Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。
  • Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
/**
 * 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理
 */
ExecutorService pool = Executors.newFixedThreadPool(2);
/**
 * 监听数据节点的变化情况
 */
final NodeCache nodeCache = new NodeCache(client, "/zk-huey/cnode", false);
    nodeCache.start(true);
            nodeCache.getListenable().addListener(
            new NodeCacheListener() {
    @Override
    public void nodeChanged() throws Exception {
        System.out.println("Node data is changed, new data: " +
        new String(nodeCache.getCurrentData().getData()));
    }}, pool);
/**
 * 监听子节点的变化情况
 */
final PathChildrenCache childrenCache = new PathChildrenCache(client, "/zk-huey", true);
        childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
        childrenCache.getListenable().addListener(
        new PathChildrenCacheListener() {
        @Override
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("CHILD_ADDED: " + event.getData().getPath());
                    break;
                case CHILD_REMOVED:
                    System.out.println("CHILD_REMOVED: " + event.getData().getPath());
                    break;
                case CHILD_UPDATED:
                    System.out.println("CHILD_UPDATED: " + event.getData().getPath());
                    break;
                default:
                    break;
                }
            }
            },
        pool
        );
        client.setData().forPath("/zk-huey/cnode", "world".getBytes());
        Thread.sleep(10 * 1000);
        pool.shutdown();
        client.close();
复制代码




分布式锁思路


最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。例如,现在一台应用正在rebuild缓存内容,要临时锁住某个区域暂时不让访问;又比如调度程序每次只想一个任务被一台应用执行等等。


下面的程序会启动两个线程t1和t2去争夺锁,拿到锁的线程会占用5秒。运行多次可以观察到,有时是t1先拿到锁而t2等待,有时又会反过来。Curator会用我们提供的lock路径的结点作为全局锁,每次获得锁时会生成这种串,释放锁时清空数据。


import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
import java.util.concurrent.TimeUnit;
/**
 * Curator framework's distributed lock test.
 */
public class CuratorDistrLockTest {
    /** Zookeeper info */
    private static final String ZK_ADDRESS = "192.168.1.100:2181";
    private static final String ZK_LOCK_PATH = "/zktest";
    public static void main(String[] args) throws InterruptedException {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();
        System.out.println("zk client start successfully!");
        Thread t1 = new Thread(() -> {
            doWithLock(client);
        }, "t1");
        Thread t2 = new Thread(() -> {
            doWithLock(client);
        }, "t2");
        t1.start();
        t2.start();
    }
    private static void doWithLock(CuratorFramework client) {
        InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
        try {
            if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
                System.out.println(Thread.currentThread().getName() + " hold lock");
                Thread.sleep(5000L);
                System.out.println(Thread.currentThread().getName() + " release lock");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
复制代码




Leader选举


当集群里的某个服务down机时,我们可能要从slave结点里选出一个作为新的master,这时就需要一套能在分布式环境中自动协调的Leader选举方法。Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()方法,说明它是当前的Leader。

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.EnsurePath;
/**
 * Curator framework's leader election test.
 * Output:
 *  LeaderSelector-2 take leadership!
 *  LeaderSelector-2 relinquish leadership!
 *  LeaderSelector-1 take leadership!
 *  LeaderSelector-1 relinquish leadership!
 *  LeaderSelector-0 take leadership!
 *  LeaderSelector-0 relinquish leadership! 
 *      ...
 */
public class CuratorLeaderTest {
    /** Zookeeper info */
    private static final String ZK_ADDRESS = "192.168.1.100:2181";
    private static final String ZK_PATH = "/zktest";
    public static void main(String[] args) throws InterruptedException {
        LeaderSelectorListener listener = new LeaderSelectorListener() {
            @Override
            public void takeLeadership(CuratorFramework client) throws Exception {
                System.out.println(Thread.currentThread().getName() + " take leadership!");
                // takeLeadership() method should only return when leadership is being relinquished.
                Thread.sleep(5000L);
                System.out.println(Thread.currentThread().getName() + " relinquish leadership!");
            }
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState state) {
            }
        };
        new Thread(() -> {
            registerListener(listener);
        }).start();
        new Thread(() -> {
            registerListener(listener);
        }).start();
        new Thread(() -> {
            registerListener(listener);
        }).start();
        Thread.sleep(Integer.MAX_VALUE);
    }
    private static void registerListener(LeaderSelectorListener listener) {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();
        // 2.Ensure path
        try {
            new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient());
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 3.Register listener
        LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener);
        selector.autoRequeue();
        selector.start();
    }
}
复制代码

注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。




参考资料


www.cnblogs.com/qingyunzong…




相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
2月前
|
消息中间件 存储 Kafka
分布式消息中间件设计与实现
本文深入探讨了消息中间件的核心功能实现与高并发、高可用设计。在生产者设计中,涵盖消息构造、序列化、路由策略及可靠性保障(如ACK机制)。消费者部分分析了拉取/推送模式、分区分配与消息确认机制。同时,Broker作为核心组件,负责消息路由、存储和投递,并通过索引技术实现快速检索。 高并发设计方面,重点讨论了文件存储(顺序写入、分段存储)、日志结构存储及负载均衡策略(如哈希分区、轮询分区)。为确保高可用性,文章详细解析了主从复制、故障转移机制以及同城/异地多活容灾方案。
|
4月前
|
Cloud Native 关系型数据库 分布式数据库
登顶TPC-C|云原生数据库PolarDB技术揭秘:Limitless集群和分布式扩展篇
阿里云PolarDB云原生数据库在TPC-C基准测试中以20.55亿tpmC的成绩刷新世界纪录,展现卓越性能与性价比。其轻量版满足国产化需求,兼具高性能与低成本,适用于多种场景,推动数据库技术革新与发展。
|
2月前
|
安全 JavaScript 前端开发
HarmonyOS NEXT~HarmonyOS 语言仓颉:下一代分布式开发语言的技术解析与应用实践
HarmonyOS语言仓颉是华为专为HarmonyOS生态系统设计的新型编程语言,旨在解决分布式环境下的开发挑战。它以“编码创造”为理念,具备分布式原生、高性能与高效率、安全可靠三大核心特性。仓颉语言通过内置分布式能力简化跨设备开发,提供统一的编程模型和开发体验。文章从语言基础、关键特性、开发实践及未来展望四个方面剖析其技术优势,助力开发者掌握这一新兴工具,构建全场景分布式应用。
267 35
|
2月前
|
消息中间件 存储 中间件
分布式消息中间件基础
消息中间件是一种基于异步消息传递的分布式系统通信工具,核心功能包括消息传输、存储、路由与投递,能够实现系统解耦、异步处理和流量削峰。其主要组件包括生产者、消费者、Broker、主题/队列等,支持点对点和发布-订阅两种消息模型。主流中间件如Kafka(高吞吐)、RabbitMQ(灵活路由)、RocketMQ(事务支持)各有特色,适用于不同场景。此外,中间件还涉及多种协议(AMQP、MQTT等)、可靠性传输机制(持久化、确认机制)、顺序性与重复性问题解决以及事务支持(两阶段提交、本地消息表等)。选择中间件需根据业务需求权衡性能、功能和运维成本。
|
3月前
|
Cloud Native 关系型数据库 分布式数据库
登顶TPC-C|云原生数据库PolarDB技术揭秘:Limitless集群和分布式扩展篇
云原生数据库PolarDB技术揭秘:Limitless集群和分布式扩展篇
|
5月前
|
机器学习/深度学习 存储
DeepSeek进阶开发与应用4:DeepSeek中的分布式训练技术
随着深度学习模型和数据集规模的扩大,单机训练已无法满足需求,分布式训练技术应运而生。DeepSeek框架支持数据并行和模型并行两种模式,通过将计算任务分配到多个节点上并行执行,显著提高训练效率。本文介绍DeepSeek中的分布式训练技术,包括配置与启动方法,帮助用户轻松实现大规模模型训练。数据并行通过`MirroredStrategy`同步梯度,适用于大多数模型;模型并行则通过`ParameterServerStrategy`异步处理大模型。DeepSeek简化了分布式环境配置,支持单机多卡和多机多卡等场景。
|
消息中间件 存储 负载均衡
消息中间件的选择:RabbitMQ是一个明智的选择
消息中间件的选择:RabbitMQ是一个明智的选择
184 0
|
消息中间件 存储 中间件
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
7481 0
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
357 3
|
9月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
321 2
【Docker项目实战】Docker部署RabbitMQ消息中间件

相关产品

  • 微服务引擎