【ZooKeeper】③ Java 代码使用 ZooKeeper

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: ZooKeeper 中的 get 命令可查看节点中存储的数据,并绑定【节点数据改变事件】(是一次性事件)ZooKeeper 中的 list 命令可查看子节点列表,并绑定【节点改变事件】(是一次性事件)

零、创建 SpringBoot 项目

在这里插入图片描述
https://start.aliyun.com
在这里插入图片描述


Lombok
SpringWeb

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.2.2.RELEASE</version>
            </plugin>
        </plugins>
    </build>


一、Java 客户端(MAVEN 依赖)

(1) ZooKeeper 自带的客户端

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.0</version>
</dependency>

(2) Apache 的开源客户端 Curator

在这里插入图片描述

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>4.3.0</version>
</dependency>

(3) Apache 开源的 zkclient(旧)

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.11</version>
</dependency>


二、Java 客户端 API

(1) 创建 Java 与 ZooKeeper 的连接会话

创建 ZooKeeper 对象,放入 IoC 容器即可。

① Java 代码

@Configuration
public class ZooKeeperClient {
    // 要连接的 ZooKeeper 所在的服务器地址和端口号, 多个服务器地址用【逗号】分隔
    private static final String CONNECT_STRING = "192.168.80.128:2888:3888,192.168.80.128:2888:3888,192.168.80.128:2889:3889";
    // 会话超时时间 (单位: 毫秒)
    private static final int SESSION_TIMEOUT = 30000; // 30s

    @Bean
    public ZooKeeper zooKeeper() throws Exception {
        return new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {
            public void process(WatchedEvent event) {
                System.out.println("event = " + event);
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("Java 连接 ZooKeeper 成功!");
                }
            }
        });
    }

}

② 若无法连接成功

在这里插入图片描述
若无法连接成功,尝试关闭防火墙。

③ Linux 防火墙相关命令

a. 启动防火墙

systemctl start firewalld  

b. 关闭防火墙

systemctl stop firewalld

c. 关闭防火墙

systemctl stop firewalld

d. 重启防火墙

firewall-cmd --reload

e. 查看防火墙状态

systemctl status firewalld

(2) 创建节点

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper;

    /**
     * 创建节点
     *
     * @param path     路径
     * @param data     数据
     * @param nodeType 节点类型
     *                 (常用取值: PERSISTENT、PERSISTENT_SEQUENTIAL、EPHEMERAL、EPHEMERAL_SEQUENTIAL)
     * @return 新创建的节点的路径
     */
    @PostMapping("/create")
    public String create(String path, String data, String nodeType) throws Exception {
        return zooKeeper.create(path,
                data.getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.valueOf(nodeType));
    }

}

(3) 获取节点中的数据

① 同步获取

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper;

    /**
     * 同步获取指定节点下的数据
     */
    @GetMapping("/get")
    public String get(String path) throws Exception {
        return path + " 节点下的数据: " +
                new String(zooKeeper.getData(path, false,
                        getStatVersionInfo(path)));
    }

    /**
     * 查询指定路径的节点的状态(版本)信息 (版本不存在返回 null)
     */
    public Stat getStatVersionInfo(String path) throws Exception {
        return zooKeeper.exists(path, false);
    }

}

② 异步回调

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper;

    /**
     * 异步获取指定节点下的数据
     */
    @GetMapping("/getAsync")
    public String getAsync(String path) throws Exception {
        String statString = getStatVersionInfo(path).toString();

        zooKeeper.getData(path, false, new AsyncCallback.DataCallback() {
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                System.out.println("数据:" + new String(data));
                System.out.println("context:" + ctx);
            }
        }, path + "_" + statString);

        return "获取 " + path + " 路径(节点)下的数据";
    }

    /**
     * 查询指定路径的节点的状态(版本)信息 (版本不存在返回 null)
     */
    public Stat getStatVersionInfo(String path) throws Exception {
        return zooKeeper.exists(path, false);
    }

}

③ 获取子节点列表

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper;

    /**
     * 获取指定路径下的子节点列表
     */
    @GetMapping("/listChildNodes")
    public List<String> listChildNodes(String path) throws Exception {
        return zooKeeper.getChildren(path, false);
    }
    
}

(4) 删除节点

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper;
     
    /**
     * 删除节点
     */
    @PostMapping("/delete")
    public String delete(String path) throws Exception {
        Stat statVersionInfo = getStatVersionInfo(path);
        if (statVersionInfo == null) return "节点不存在";

        zooKeeper.delete(path, statVersionInfo.getVersion());
        return getStatVersionInfo(path) == null ? "删除成功" : "删除失败";
    } 

    /**
     * 查询指定路径的节点的状态(版本)信息
     */
    public Stat getStatVersionInfo(String path) throws Exception {
        return zooKeeper.exists(path, false);
    }

}

(5) 更新数据

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper; 

    /**
     * 同步获取指定节点下的数据
     */
    @GetMapping("/get")
    public String get(String path) throws Exception {
        return path + " 节点下的数据: " +
                new String(zooKeeper.getData(path, false,
                        getStatVersionInfo(path)));
    }  

    /**
     * 更新节点数据
     */
    @PostMapping("/updateNodeData")
    public Map<String, String> updateNodeData(String path, String data) throws Exception {
        Map<String, String> resMap = new HashMap<>();

        Stat oldStat = getStatVersionInfo(path);
        if (oldStat == null) {
            resMap.put("msg", "节点不存在");
            return resMap;
        }

        // 查询节点数据
        String curNodeData = get(path);

        Stat stat = zooKeeper.setData(path, data.getBytes(), oldStat.getVersion());

        if (oldStat.getVersion() != stat.getVersion()) {
            resMap.put("msg", "更新节点数据成功");
            resMap.put("oldData", curNodeData);
            resMap.put("newData", data);
        } else {
            resMap.put("msg", "更新节点数据失败");
        }
        return resMap;

    } 

    /**
     * 查询指定路径的节点的状态(版本)信息
     */
    public Stat getStatVersionInfo(String path) throws Exception {
        return zooKeeper.exists(path, false);
    }

}

(6) 事件处理

ZooKeeper 中的 get 命令可查看节点中存储的数据,并绑定【节点数据改变事件】(是一次性事件)
ZooKeeper 中的 list 命令可查看子节点列表,并绑定【节点改变事件】(是一次性事件)

① 绑定一次性事件

a. 获取节点数据绑定的节点改变事件

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper; 
    
    /**
     * 查询指定路径的节点的状态(版本)信息
     */
    public Stat getStatVersionInfo(String path) throws Exception {
        return zooKeeper.exists(path, false);
    }

    /**
     * 获取节点数据绑定的一次性节点数据改变事件【NodeDataChanged】
     */
    @GetMapping("/addWatchByGet")
    public String addWatchByGet(String path) throws Exception {
        Stat statVersion = getStatVersionInfo(path);
        if (statVersion == null) return "节点不存在";

        zooKeeper.getData(path, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("【addWatchByGet】触发节点类型:" + event.getType());
            }
        }, statVersion);
        return "获取节点数据绑定的一次性节点数据改变事件";
    }

}

b. 获取子节点列表绑定的子节点改变事件

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper; 
    
    /**
     * 查询指定路径的节点的状态(版本)信息
     */
    public Stat getStatVersionInfo(String path) throws Exception {
        return zooKeeper.exists(path, false);
    } 

    /**
     * 获取子节点列表绑定的一次性子节点改变事件【NodeChildrenChanged】
     */
    @GetMapping("/addWatchByGetChildren")
    public String addWatchByGetChildren(String path) throws Exception {
        Stat statVersion = getStatVersionInfo(path);
        if (statVersion == null) return "节点不存在";

        zooKeeper.getChildren(path, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("【addWatchByGetChildren】触发节点类型:" + event.getType());
            }
        });

        return "获取子节点列表绑定的一次性子节点改变事件";
    }

}

② 绑定永久事件

监听当前节点的数据改变和当前节点(路径)的子节点的创建和删除,子节点数据发生改变不会触发事件监听。

@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {

    @Autowired
    private ZooKeeper zooKeeper;
    
    /**
     * 查询指定路径的节点的状态(版本)信息
     */
    public Stat getStatVersionInfo(String path) throws Exception {
        return zooKeeper.exists(path, false);
    } 
    
    /**
     * 给指定路径(节点)绑定永久事件 (1. 子节点改变事件; 2. 节点数据改变事件)
     */
    @PostMapping("/addWatch")
    public String addWatch(String path) throws Exception {
        Stat statVersion = getStatVersionInfo(path);
        if (statVersion == null) return "节点不存在";

        zooKeeper.addWatch(path, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                Event.EventType eventType = event.getType();
                if (eventType == Event.EventType.NodeChildrenChanged) {
                    System.out.println("触发了子节点改变事件");
                    // 重新获取子节点列表的代码
                } else if (eventType == Event.EventType.NodeDataChanged) {
                    System.out.println("触发了 NodeDataChanged");
                    // 重新获取节点数据的代码
                }
            }
        }, AddWatchMode.PERSISTENT);

        return "绑定永久事件";
    }

}

③ 递归绑定事件

  • 给当前节点及其子节点都绑定节点改变事件和节点数据改变事件
  • 与绑定永久事件的代码的不同点是 addWatch 的最后一个参数不一样,绑定永久事件的最后一个参数是 AddWatchMode.PERSISTENT,递归绑定永久事件的最后一个参数是 AddWatchMode.PERSISTENT_RECURSIVE
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
4天前
|
Java
在 Java 中捕获和处理自定义异常的代码示例
本文提供了一个 Java 代码示例,展示了如何捕获和处理自定义异常。通过创建自定义异常类并使用 try-catch 语句,可以更灵活地处理程序中的错误情况。
|
18天前
|
XML 安全 Java
Java反射机制:解锁代码的无限可能
Java 反射(Reflection)是Java 的特征之一,它允许程序在运行时动态地访问和操作类的信息,包括类的属性、方法和构造函数。 反射机制能够使程序具备更大的灵活性和扩展性
32 5
Java反射机制:解锁代码的无限可能
|
14天前
|
jenkins Java 测试技术
如何使用 Jenkins 自动发布 Java 代码,通过一个电商公司后端服务的实际案例详细说明
本文介绍了如何使用 Jenkins 自动发布 Java 代码,通过一个电商公司后端服务的实际案例,详细说明了从 Jenkins 安装配置到自动构建、测试和部署的全流程。文中还提供了一个 Jenkinsfile 示例,并分享了实践经验,强调了版本控制、自动化测试等关键点的重要性。
47 3
|
20天前
|
存储 安全 Java
系统安全架构的深度解析与实践:Java代码实现
【11月更文挑战第1天】系统安全架构是保护信息系统免受各种威胁和攻击的关键。作为系统架构师,设计一套完善的系统安全架构不仅需要对各种安全威胁有深入理解,还需要熟练掌握各种安全技术和工具。
57 10
|
15天前
|
分布式计算 Java MaxCompute
ODPS MR节点跑graph连通分量计算代码报错java heap space如何解决
任务启动命令:jar -resources odps-graph-connect-family-2.0-SNAPSHOT.jar -classpath ./odps-graph-connect-family-2.0-SNAPSHOT.jar ConnectFamily 若是设置参数该如何设置
|
14天前
|
Java
Java代码解释++i和i++的五个主要区别
本文介绍了前缀递增(++i)和后缀递增(i++)的区别。两者在独立语句中无差异,但在赋值表达式中,i++ 返回原值,++i 返回新值;在复杂表达式中计算顺序不同;在循环中虽结果相同但使用方式有别。最后通过 `Counter` 类模拟了两者的内部实现原理。
Java代码解释++i和i++的五个主要区别
|
22天前
|
搜索推荐 Java 数据库连接
Java|在 IDEA 里自动生成 MyBatis 模板代码
基于 MyBatis 开发的项目,新增数据库表以后,总是需要编写对应的 Entity、Mapper 和 Service 等等 Class 的代码,这些都是重复的工作,我们可以想一些办法来自动生成这些代码。
29 6
|
22天前
|
Java
通过Java代码解释成员变量(实例变量)和局部变量的区别
本文通过一个Java示例,详细解释了成员变量(实例变量)和局部变量的区别。成员变量属于类的一部分,每个对象有独立的副本;局部变量则在方法或代码块内部声明,作用范围仅限于此。示例代码展示了如何在类中声明和使用这两种变量。
|
23天前
|
存储 Java API
优雅地使用Java Map,通过掌握其高级特性和技巧,让代码更简洁。
【10月更文挑战第19天】本文介绍了如何优雅地使用Java Map,通过掌握其高级特性和技巧,让代码更简洁。内容包括Map的初始化、使用Stream API处理Map、利用merge方法、使用ComputeIfAbsent和ComputeIfPresent,以及Map的默认方法。这些技巧不仅提高了代码的可读性和维护性,还提升了开发效率。
45 3
|
23天前
|
存储 Java 开发者
Java中的Map接口提供了一种优雅的方式来管理数据结构,使代码更加清晰、高效
【10月更文挑战第19天】在软件开发中,随着项目复杂度的增加,数据结构的组织和管理变得至关重要。Java中的Map接口提供了一种优雅的方式来管理数据结构,使代码更加清晰、高效。本文通过在线购物平台的案例,展示了Map在商品管理、用户管理和订单管理中的具体应用,帮助开发者告别混乱,提升代码质量。
26 1