【ZooKeeper】③ Java 代码使用 ZooKeeper

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: ZooKeeper 中的 get 命令可查看节点中存储的数据,并绑定【节点数据改变事件】(是一次性事件)ZooKeeper 中的 list 命令可查看子节点列表,并绑定【节点改变事件】(是一次性事件)

零、创建 SpringBoot 项目

在这里插入图片描述
https://start.aliyun.com
在这里插入图片描述


Lombok
SpringWeb

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.2.2.RELEASE</version>
            </plugin>
        </plugins>
    </build>


一、Java 客户端(MAVEN 依赖)

(1) ZooKeeper 自带的客户端

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.0</version>
</dependency>

(2) Apache 的开源客户端 Curator

在这里插入图片描述

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>4.3.0</version>
</dependency>

(3) Apache 开源的 zkclient(旧)

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.11</version>
</dependency>


二、Java 客户端 API

(1) 创建 Java 与 ZooKeeper 的连接会话

创建 ZooKeeper 对象,放入 IoC 容器即可。

① Java 代码

@Configuration
public class ZooKeeperClient {
    // 要连接的 ZooKeeper 所在的服务器地址和端口号, 多个服务器地址用【逗号】分隔
    private static final String CONNECT_STRING = "192.168.80.128:2888:3888,192.168.80.128:2888:3888,192.168.80.128:2889:3889";
    // 会话超时时间 (单位: 毫秒)
    private static final int SESSION_TIMEOUT = 30000; // 30s

    @Bean
    public ZooKeeper zooKeeper() throws Exception {
        return new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {
            public void process(WatchedEvent event) {
                System.out.println("event = " + event);
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("Java 连接 ZooKeeper 成功!");
                }
            }
        });
    }

}

② 若无法连接成功

在这里插入图片描述
若无法连接成功,尝试关闭防火墙。

③ Linux 防火墙相关命令

a. 启动防火墙

systemctl start firewalld  

b. 关闭防火墙

systemctl stop firewalld

c. 关闭防火墙

systemctl stop firewalld

d. 重启防火墙

firewall-cmd --reload

e. 查看防火墙状态

systemctl status firewalld

(2) 创建节点

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper;

    /**
     * 创建节点
     *
     * @param path     路径
     * @param data     数据
     * @param nodeType 节点类型
     *                 (常用取值: PERSISTENT、PERSISTENT_SEQUENTIAL、EPHEMERAL、EPHEMERAL_SEQUENTIAL)
     * @return 新创建的节点的路径
     */
    @PostMapping("/create")
    public String create(String path, String data, String nodeType) throws Exception {
        return zooKeeper.create(path,
                data.getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.valueOf(nodeType));
    }

}

(3) 获取节点中的数据

① 同步获取

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper;

    /**
     * 同步获取指定节点下的数据
     */
    @GetMapping("/get")
    public String get(String path) throws Exception {
        return path + " 节点下的数据: " +
                new String(zooKeeper.getData(path, false,
                        getStatVersionInfo(path)));
    }

    /**
     * 查询指定路径的节点的状态(版本)信息 (版本不存在返回 null)
     */
    public Stat getStatVersionInfo(String path) throws Exception {
        return zooKeeper.exists(path, false);
    }

}

② 异步回调

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper;

    /**
     * 异步获取指定节点下的数据
     */
    @GetMapping("/getAsync")
    public String getAsync(String path) throws Exception {
        String statString = getStatVersionInfo(path).toString();

        zooKeeper.getData(path, false, new AsyncCallback.DataCallback() {
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                System.out.println("数据:" + new String(data));
                System.out.println("context:" + ctx);
            }
        }, path + "_" + statString);

        return "获取 " + path + " 路径(节点)下的数据";
    }

    /**
     * 查询指定路径的节点的状态(版本)信息 (版本不存在返回 null)
     */
    public Stat getStatVersionInfo(String path) throws Exception {
        return zooKeeper.exists(path, false);
    }

}

③ 获取子节点列表

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper;

    /**
     * 获取指定路径下的子节点列表
     */
    @GetMapping("/listChildNodes")
    public List<String> listChildNodes(String path) throws Exception {
        return zooKeeper.getChildren(path, false);
    }
    
}

(4) 删除节点

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper;
     
    /**
     * 删除节点
     */
    @PostMapping("/delete")
    public String delete(String path) throws Exception {
        Stat statVersionInfo = getStatVersionInfo(path);
        if (statVersionInfo == null) return "节点不存在";

        zooKeeper.delete(path, statVersionInfo.getVersion());
        return getStatVersionInfo(path) == null ? "删除成功" : "删除失败";
    } 

    /**
     * 查询指定路径的节点的状态(版本)信息
     */
    public Stat getStatVersionInfo(String path) throws Exception {
        return zooKeeper.exists(path, false);
    }

}

(5) 更新数据

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper; 

    /**
     * 同步获取指定节点下的数据
     */
    @GetMapping("/get")
    public String get(String path) throws Exception {
        return path + " 节点下的数据: " +
                new String(zooKeeper.getData(path, false,
                        getStatVersionInfo(path)));
    }  

    /**
     * 更新节点数据
     */
    @PostMapping("/updateNodeData")
    public Map<String, String> updateNodeData(String path, String data) throws Exception {
        Map<String, String> resMap = new HashMap<>();

        Stat oldStat = getStatVersionInfo(path);
        if (oldStat == null) {
            resMap.put("msg", "节点不存在");
            return resMap;
        }

        // 查询节点数据
        String curNodeData = get(path);

        Stat stat = zooKeeper.setData(path, data.getBytes(), oldStat.getVersion());

        if (oldStat.getVersion() != stat.getVersion()) {
            resMap.put("msg", "更新节点数据成功");
            resMap.put("oldData", curNodeData);
            resMap.put("newData", data);
        } else {
            resMap.put("msg", "更新节点数据失败");
        }
        return resMap;

    } 

    /**
     * 查询指定路径的节点的状态(版本)信息
     */
    public Stat getStatVersionInfo(String path) throws Exception {
        return zooKeeper.exists(path, false);
    }

}

(6) 事件处理

ZooKeeper 中的 get 命令可查看节点中存储的数据,并绑定【节点数据改变事件】(是一次性事件)
ZooKeeper 中的 list 命令可查看子节点列表,并绑定【节点改变事件】(是一次性事件)

① 绑定一次性事件

a. 获取节点数据绑定的节点改变事件

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper; 
    
    /**
     * 查询指定路径的节点的状态(版本)信息
     */
    public Stat getStatVersionInfo(String path) throws Exception {
        return zooKeeper.exists(path, false);
    }

    /**
     * 获取节点数据绑定的一次性节点数据改变事件【NodeDataChanged】
     */
    @GetMapping("/addWatchByGet")
    public String addWatchByGet(String path) throws Exception {
        Stat statVersion = getStatVersionInfo(path);
        if (statVersion == null) return "节点不存在";

        zooKeeper.getData(path, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("【addWatchByGet】触发节点类型:" + event.getType());
            }
        }, statVersion);
        return "获取节点数据绑定的一次性节点数据改变事件";
    }

}

b. 获取子节点列表绑定的子节点改变事件

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper; 
    
    /**
     * 查询指定路径的节点的状态(版本)信息
     */
    public Stat getStatVersionInfo(String path) throws Exception {
        return zooKeeper.exists(path, false);
    } 

    /**
     * 获取子节点列表绑定的一次性子节点改变事件【NodeChildrenChanged】
     */
    @GetMapping("/addWatchByGetChildren")
    public String addWatchByGetChildren(String path) throws Exception {
        Stat statVersion = getStatVersionInfo(path);
        if (statVersion == null) return "节点不存在";

        zooKeeper.getChildren(path, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("【addWatchByGetChildren】触发节点类型:" + event.getType());
            }
        });

        return "获取子节点列表绑定的一次性子节点改变事件";
    }

}

② 绑定永久事件

监听当前节点的数据改变和当前节点(路径)的子节点的创建和删除,子节点数据发生改变不会触发事件监听。

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper;
    
    /**
     * 查询指定路径的节点的状态(版本)信息
     */
    public Stat getStatVersionInfo(String path) throws Exception {
        return zooKeeper.exists(path, false);
    } 
    
    /**
     * 给指定路径(节点)绑定永久事件 (1. 子节点改变事件; 2. 节点数据改变事件)
     */
    @PostMapping("/addWatch")
    public String addWatch(String path) throws Exception {
        Stat statVersion = getStatVersionInfo(path);
        if (statVersion == null) return "节点不存在";

        zooKeeper.addWatch(path, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                Event.EventType eventType = event.getType();
                if (eventType == Event.EventType.NodeChildrenChanged) {
                    System.out.println("触发了子节点改变事件");
                    // 重新获取子节点列表的代码
                } else if (eventType == Event.EventType.NodeDataChanged) {
                    System.out.println("触发了 NodeDataChanged");
                    // 重新获取节点数据的代码
                }
            }
        }, AddWatchMode.PERSISTENT);

        return "绑定永久事件";
    }

}

③ 递归绑定事件

  • 给当前节点及其子节点都绑定节点改变事件和节点数据改变事件
  • 与绑定永久事件的代码的不同点是 addWatch 的最后一个参数不一样,绑定永久事件的最后一个参数是 AddWatchMode.PERSISTENT,递归绑定永久事件的最后一个参数是 AddWatchMode.PERSISTENT_RECURSIVE
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
17小时前
|
Java 数据处理 开发者
Java中的Lambda表达式:简化你的代码之路
【8月更文挑战第66天】Lambda表达式在Java 8中首次引入,它为Java开发者提供了一种更简洁、更灵活的编程方式。本文将通过简单易懂的语言和实际代码示例,引导你理解Lambda表达式的基本概念、语法结构以及如何在Java项目中应用它来简化代码。无论你是Java新手还是有经验的开发者,这篇文章都将帮助你更好地掌握这一强大的工具。
28 11
|
17天前
|
设计模式 Java
Java设计模式:组合模式的介绍及代码演示
组合模式是一种结构型设计模式,用于将多个对象组织成树形结构,并统一处理所有对象。例如,统计公司总人数时,可先统计各部门人数再求和。该模式包括一个通用接口、表示节点的类及其实现类。通过树形结构和节点的通用方法,组合模式使程序更易扩展和维护。
Java设计模式:组合模式的介绍及代码演示
|
7天前
|
Java
java小工具util系列4:基础工具代码(Msg、PageResult、Response、常量、枚举)
java小工具util系列4:基础工具代码(Msg、PageResult、Response、常量、枚举)
20 5
|
9天前
|
Java API 开发者
探索Java中的Lambda表达式:简洁与强大的代码实践
本文深入探讨Java中Lambda表达式的定义、用法及优势,通过实例展示其如何简化代码、提升可读性,并强调在使用中需注意的兼容性和效率问题。Lambda作为Java 8的亮点功能,不仅优化了集合操作,还促进了函数式编程范式的应用,为开发者提供了更灵活的编码方式。
|
5天前
|
Java 开发者
探索Java中的Lambda表达式:简化你的代码之旅##
【8月更文挑战第62天】 Java 8的发布为开发者带来了诸多新特性,其中最引人注目的无疑是Lambda表达式。这一特性不仅让代码变得更加简洁,还极大地提升了开发的效率。本文将通过实际示例,展示如何利用Lambda表达式来优化我们的代码结构,同时探讨其背后的工作原理和性能考量。 ##
|
8天前
|
Java API 开发者
探索Java中的Lambda表达式:简化代码,提升效率
【9月更文挑战第27天】在Java 8中引入的Lambda表达式为编程带来了革命性的变化。通过简洁的语法和强大的功能,它不仅简化了代码编写过程,还显著提升了程序的执行效率。本文将深入探讨Lambda表达式的本质、用法和优势,并结合实例演示其在实际开发中的应用。无论你是Java新手还是资深开发者,都能从中获得启发,优化你的代码设计。
|
9天前
|
Java Linux Python
Linux环境下 代码java调用python出错
Linux环境下 代码java调用python出错
24 3
|
8天前
|
存储 Java 索引
使用java代码实现左右括号查找
使用java代码实现左右括号查找
|
9天前
|
算法 Java
java 概率抽奖代码实现
java 概率抽奖代码实现
|
17天前
|
Java 程序员 API
Java中的Lambda表达式:简化代码的秘密武器
在Java 8中引入的Lambda表达式是一种强大的编程工具,它可以显著简化代码,提高可读性。本文将介绍Lambda表达式的基本概念、优势以及在实际开发中的应用。通过具体示例,您将了解如何使用Lambda表达式来简化集合操作、线程编程和函数式编程。让我们一起探索这一革命性的特性,看看它是如何改变Java编程方式的。
25 4
下一篇
无影云桌面