分布式系列教程(09) -分布式协调工具Zookeeper(Java基本操作)

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,182元/月
简介: 分布式系列教程(09) -分布式协调工具Zookeeper(Java基本操作)

上一篇博客《分布式协调工具Zookeeper(介绍&安装&配置详解)》讲到了分布式协调工具Zookeeper,主要讲解Zookeeper的概念、应用场景以及安装配置。基于上一篇博客安装的环境,本文讲解如何使用Java操作Zookeeper?

代码已提交至Gtihub,有兴趣的同学可以下载看看(git版本号:e9d27b6df05095bb50c3666a1e8965102c85bb01):https://github.com/ylw-github/Zookeeper-Demo.git

本文目录结构:

l____1. 基本概念

l________1.1 创建节点(znode)方法

l________1.2 Watcher

l____2. Java操作Zookeeper

l____总结

1. 基本概念

1.1 创建节点(znode)方法

Zookeeper提供了两套创建节点的方法,同步异步创建节点方式。

其中同步的方式,有几个节点需要注意:

  1. 节点路径(名称) InodeName: (不允许递归创建节点,也就是说在父节点不存在
    的情况下,不允许创建子节点)
  2. 节点内容: 要求类型是字节数组(也就是说,不支持序列化方式,如果需要实现序
    列化,可使用java相关序列化框架,如Hessian、Kryo框架)
  3. 节点权限: 使用Ids.OPEN_ACL_UNSAFE开放权限即可。(这个参数一般在权展
    没有太高要求的场景下,没必要关注)
  4. 节点类型: 创建节点的类型: CreateMode,提供四种首点象型,如下:
名称 解析
PERSISTENT 持久化节点
PERSISTENT_SEQUENTIAL 顺序自动编号持久化节点,这种节点会根据当前已存在的节点数自动加 1
EPHEMERAL 临时节点, 客户端session超时这类节点就会被自动删除
EPHEMERAL_SEQUENTIAL 临时自动编号节点

1.2 Watcher

在ZooKeeper中,接口类Watcher用于表示一个标准的事件处理器,其定义了事件通知相关的逻辑,包含KeeperState和EventType两个枚举类,分别代表了通知状态事件类型,同时定义了事件的回调方法:process(WatchedEvent event)

什么是watcher接口?

同一个事件类型在不同的通知状态中代表的含义有所不同,下表例举了常见的通知状态和事件类型:

KeeperState EventType 触发条件 说明
None(-1) 客户端与服务端成功建立连接
SyncConnected(0) NodeCreated(1) Watcher监听的对应数据节点被创建
NodeDeleted(2) Watcher监听的对应数据节点被删除 此时客户端和服务器处于连接状态
NodeDataChanged(3) Watcher监听的对应数据节点的数据内容发生变更
NodeChildChanged(4) Wather监听的对应数据节点的子节点列表发生变更
Disconnected(0) None(-1) 客户端与ZooKeeper服务器断开连接 此时客户端和服务器处于断开连接状态
Expired(-112) Node(-1) 会话超时 此时客户端会话失效,通常同时也会受到SessionExpiredException异常
AuthFailed(4) None(-1) 通常有两种情况,1:使用错误的schema进行权限检查 2:SASL权限检查失败 通常同时也会收到AuthFailedException异常

回调方法process():

  • process方法是Watcher接口中的一个回调方法,当ZooKeeper向客户端发送一个Watcher事件通知时,客户端就会对相应的process方法进行回调,从而实现对事件的处理,process方法的定义如下:
abstract public void process(WatchedEvent event);

这个回调方法的定义非常简单,我们重点看下方法的参数定义WatchedEvent

  • WatchedEvent包含了每一个事件的三个基本属性:通知状态(keeperState)事件类型(EventType)节点路径(path)。ZooKeeper使用WatchedEvent对象来封装服务端事件并传递给Watcher,从而方便回调方法process对服务端事件进行处理。
  • 提到WatchedEvent,两者表示的是同一个事物,都是对一个服务端事件的封装。不同的是,WatchedEvent是一个逻辑事件,用于服务端和客户端程序执行过程中所需的逻辑对象,而WatcherEvent因为实现了序列化接口,因此可以用于网络传输。
  • 服务端在生成WatchedEvent事件之后,会调用getWrapper方法将自己包装成一个可序列化的WatcherEvent事件,以便通过网络传输到客户端。客户端在接收到服务端的这个事件对象后,首先会将WatcherEvent还原成一个WatchedEvent事件,并传递给process方法处理,回调方法process根据入参就能够解析出完整的服务端事件了。
  • 需要注意的一点是,无论是WatchedEvent还是WatcherEvent,其对ZooKeeper服务端事件的封装都是机及其简单的。

2.Java操作Zookeeper

1.创建项目

2.添加maven依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>

3.Zookeeper客户端连接

package com.ylw.zookeeper.Test;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class Test {
    //连接地址
    private static final String ADDRES = "192.168.162.131:2181";
    //session 会话
    private static final int SESSION_OUTTIME = 2000;
    //信号量,阻塞程序执行,用户等待zookeeper连接成功,发送成功信号,
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZooKeeper zk = new ZooKeeper(ADDRES, SESSION_OUTTIME, new Watcher() {
            public void process(WatchedEvent event) {
                // 获取事件状态
                Event.KeeperState keeperState = event.getState();
                // 获取事件类型
                Event.EventType eventType = event.getType();
                if (Event.KeeperState.SyncConnected == keeperState) {
                    if (Event.EventType.None == eventType) {
                        countDownLatch.countDown();
                        System.out.println("zk 启动连接...");
                    }
                }
            }
        });
        // 进行阻塞
        countDownLatch.await();
        String result = zk.create("/ylw_Lasting", "Lasting".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("result - >" + result);
        zk.close();
    }
}

注意创建节点的两种方式:

1.创建持久节点,并且允许任何服务器可以操作
String result = zk.create("/ylw_Lasting", "Lasting".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
2.创建临时节点
String result = zk.create("/ylw_temp", "temp".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

4.Zookeeper客户端连接

package com.ylw.zookeeper.Test;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
public class ZkClientWatcher implements Watcher {
    // 集群连接地址
    private static final String CONNECT_ADDRES = "192.168.162.131:2181,192.168.162.131:2182,192.168.162.131:2183";
    // 会话超时时间
    private static final int SESSIONTIME = 2000;
    // 信号量,让zk在连接之前等待,连接成功后才能往下走.
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static String LOG_MAIN = "【main】 ";
    private ZooKeeper zk;
    public void createConnection(String connectAddres, int sessionTimeOut) {
        try {
            zk = new ZooKeeper(connectAddres, sessionTimeOut, this);
            System.out.println(LOG_MAIN + "zk 开始启动连接服务器....");
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public boolean createPath(String path, String data) {
        try {
            this.exists(path, true);
            this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println(LOG_MAIN + "节点创建成功, Path:" + path + ",data:" + data);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
    /**
     * 判断指定节点是否存在
     *
     * @param path 节点路径
     */
    public Stat exists(String path, boolean needWatch) {
        try {
            return this.zk.exists(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
    public boolean updateNode(String path, String data) throws KeeperException, InterruptedException {
        exists(path, true);
        this.zk.setData(path, data.getBytes(), -1);
        return false;
    }
    public void process(WatchedEvent watchedEvent) {
        // 获取事件状态
        Event.KeeperState keeperState = watchedEvent.getState();
        // 获取事件类型
        Event.EventType eventType = watchedEvent.getType();
        // zk 路径
        String path = watchedEvent.getPath();
        System.out.println("进入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path);
        // 判断是否建立连接
        if (Event.KeeperState.SyncConnected == keeperState) {
            if (Event.EventType.None == eventType) {
                // 如果建立建立成功,让后程序往下走
                System.out.println(LOG_MAIN + "zk 建立连接成功!");
                countDownLatch.countDown();
            } else if (Event.EventType.NodeCreated == eventType) {
                System.out.println(LOG_MAIN + "事件通知,新增node节点" + path);
            } else if (Event.EventType.NodeDataChanged == eventType) {
                System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被修改....");
            } else if (Event.EventType.NodeDeleted == eventType) {
                System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被删除....");
            }
        }
        System.out.println("--------------------------------------------------------");
    }
    public static void main(String[] args) throws KeeperException, InterruptedException {
        ZkClientWatcher zkClientWatcher = new ZkClientWatcher();
        zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME);
    boolean createResult = zkClientWatcher.createPath("/del.do", "http://www.xxx001.delete");
        //zkClientWatcher.updateNode("/del.do", "http://www.xxx002.delete");
    }
}

运行结果:

更新节点:

//boolean createResult = zkClientWatcher.createPath("/del.do", "http://www.xxx001.delete");
zkClientWatcher.updateNode("/del.do", "http://www.xxx002.delete");

运行后:

3.总结

目录
相关文章
|
2月前
|
Java 关系型数据库 数据库
Java 项目实战教程从基础到进阶实战案例分析详解
本文介绍了多个Java项目实战案例,涵盖企业级管理系统、电商平台、在线书店及新手小项目,结合Spring Boot、Spring Cloud、MyBatis等主流技术,通过实际应用场景帮助开发者掌握Java项目开发的核心技能,适合从基础到进阶的学习与实践。
227 3
|
8天前
|
安全 Java
Java之泛型使用教程
Java之泛型使用教程
110 10
|
3月前
|
缓存 安全 Java
Java 并发新特性实战教程之核心特性详解与项目实战
本教程深入解析Java 8至Java 19并发编程新特性,涵盖CompletableFuture异步编程、StampedLock读写锁、Flow API响应式流、VarHandle内存访问及结构化并发等核心技术。结合电商订单处理、缓存系统、实时数据流、高性能计数器与用户资料聚合等实战案例,帮助开发者高效构建高并发、低延迟、易维护的Java应用。适合中高级Java开发者提升并发编程能力。
75 0
|
8月前
|
JavaScript NoSQL Java
接替此文【下篇-服务端+后台管理】优雅草蜻蜓z系统JAVA版暗影版为例-【蜻蜓z系列通用】-2025年全新项目整合搭建方式-这是独立吃透代码以后首次改变-独立PC版本vue版搭建教程-优雅草卓伊凡
接替此文【下篇-服务端+后台管理】优雅草蜻蜓z系统JAVA版暗影版为例-【蜻蜓z系列通用】-2025年全新项目整合搭建方式-这是独立吃透代码以后首次改变-独立PC版本vue版搭建教程-优雅草卓伊凡
391 96
接替此文【下篇-服务端+后台管理】优雅草蜻蜓z系统JAVA版暗影版为例-【蜻蜓z系列通用】-2025年全新项目整合搭建方式-这是独立吃透代码以后首次改变-独立PC版本vue版搭建教程-优雅草卓伊凡
|
4月前
|
Oracle Java 关系型数据库
java 编程基础入门级超级完整版教程详解
这份文档是针对Java编程入门学习者的超级完整版教程,涵盖了从环境搭建到实际项目应用的全方位内容。首先介绍了Java的基本概念与开发环境配置方法,随后深入讲解了基础语法、控制流程、面向对象编程的核心思想,并配以具体代码示例。接着探讨了常用类库与API的应用,如字符串操作、集合框架及文件处理等。最后通过一个学生成绩管理系统的实例,帮助读者将理论知识应用于实践。此外,还提供了进阶学习建议,引导学员逐步掌握更复杂的Java技术。适合初学者系统性学习Java编程。资源地址:[点击访问](https://pan.quark.cn/s/14fcf913bae6)。
323 2
|
9月前
|
消息中间件 Java 数据库
自研Java框架 Sunrays-Framework使用教程「博客之星」
### Sunrays-Framework:助力高效开发的Java微服务框架 **Sunrays-Framework** 是一款基于 Spring Boot 构建的高效微服务开发框架,深度融合了 Spring Cloud 生态中的核心技术组件。它旨在简化数据访问、缓存管理、消息队列、文件存储等常见开发任务,帮助开发者快速构建高质量的企业级应用。 #### 核心功能 - **MyBatis-Plus**:简化数据访问层开发,提供强大的 CRUD 操作和分页功能。 - **Redis**:实现高性能缓存和分布式锁,提升系统响应速度。 - **RabbitMQ**:可靠的消息队列支持,适用于异步
自研Java框架 Sunrays-Framework使用教程「博客之星」
|
9月前
|
存储 人工智能 算法
解锁分布式文件分享的 Java 一致性哈希算法密码
在数字化时代,文件分享成为信息传播与协同办公的关键环节。本文深入探讨基于Java的一致性哈希算法,该算法通过引入虚拟节点和环形哈希空间,解决了传统哈希算法在分布式存储中的“哈希雪崩”问题,确保文件分配稳定高效。文章还展示了Java实现代码,并展望了其在未来文件分享技术中的应用前景,如结合AI优化节点布局和区块链增强数据安全。
|
10月前
|
移动开发 前端开发 Java
Java最新图形化界面开发技术——JavaFx教程(含UI控件用法介绍、属性绑定、事件监听、FXML)
JavaFX是Java的下一代图形用户界面工具包。JavaFX是一组图形和媒体API,我们可以用它们来创建和部署富客户端应用程序。 JavaFX允许开发人员快速构建丰富的跨平台应用程序,允许开发人员在单个编程接口中组合图形,动画和UI控件。本文详细介绍了JavaFx的常见用法,相信读完本教程你一定有所收获!
8973 5
Java最新图形化界面开发技术——JavaFx教程(含UI控件用法介绍、属性绑定、事件监听、FXML)
|
9月前
|
Java 数据库连接 数据处理
探究Java异常处理【保姆级教程】
Java 异常处理是确保程序稳健运行的关键机制。它通过捕获和处理运行时错误,避免程序崩溃。Java 的异常体系以 `Throwable` 为基础,分为 `Error` 和 `Exception`。前者表示严重错误,后者可细分为受检和非受检异常。常见的异常处理方式包括 `try-catch-finally`、`throws` 和 `throw` 关键字。此外,还可以自定义异常类以满足特定需求。最佳实践包括捕获具体异常、合理使用 `finally` 块和谨慎抛出异常。掌握这些技巧能显著提升程序的健壮性和可靠性。
154 4
|
9月前
|
存储 移动开发 算法
【潜意识Java】Java基础教程:从零开始的学习之旅
本文介绍了 Java 编程语言的基础知识,涵盖从简介、程序结构到面向对象编程的核心概念。首先,Java 是一种高级、跨平台的面向对象语言,支持“一次编写,到处运行”。接着,文章详细讲解了 Java 程序的基本结构,包括包声明、导入语句、类声明和 main 方法。随后,深入探讨了基础语法,如数据类型、变量、控制结构、方法和数组。此外,还介绍了面向对象编程的关键概念,例如类与对象、继承和多态。最后,针对常见的编程错误提供了调试技巧,并总结了学习 Java 的重要性和方法。适合初学者逐步掌握 Java 编程。
165 1

热门文章

最新文章