Apache ZooKeeper - 使用原生的API操作ZK

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Apache ZooKeeper - 使用原生的API操作ZK

20201124234058164.png

概述


前面几篇系列博文我们熟悉了如何通过命令来操作ZK节点数据,下面我们来看下如何使用API来操作

主要两种方式

  1. 原生API
  2. Curator

今天我们来看下如何使用原生的API操作ZK


maven依赖

和 服务端的版本保持一致

  <dependency>
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
       <version>3.5.8</version>
   </dependency>


验证

接下来我们使用单元测试来验证下原生API的对ZK 数据的增删改查

测试基类

我们来写下测试基类

package com.artisan.zk.originalClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Slf4j
public abstract class StandAloneBaseTest {
    private static final String ZK_ADDRESS = "192.168.126.131:2181";
    private static final int SESSION_TIMEOUT = 30_000;
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    public static ZooKeeper getZooKeeper() {
        return zooKeeper;
    }
    private static ZooKeeper zooKeeper ;
    private static Watcher watcher = event -> {
        if (event.getState() == Watcher.Event.KeeperState.SyncConnected && event.getType() == Watcher.Event.EventType.None){
            log.info("ZK Connected");
            countDownLatch.countDown();
        }
    };
    @Before
    public void init() throws IOException, InterruptedException {
        log.info("start to connect zk server: {}" , ZK_ADDRESS);
        zooKeeper = new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, watcher);
        log.info("connecting to....{}", ZK_ADDRESS);
        countDownLatch.await();
    }
    @After
    public void  test(){
        try {
            TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


为了方便测试,直接在init初始化方法中创建zookeeper实例 ,不要关闭~


ZK构造函数参数


20201130152208499.png


connectString:ZooKeeper服务器列表

由英文逗号分开的host:port字符串组成,每一个都代表一台ZooKeeper机器,如 host1:port1,host2:port2,host3:port3

另外,也可以在connectString中设置客户端连接上ZooKeeper后的根目录,方法是在host:port字符串之后添加上这个根目录。例如,host1:port1,host2:port2,host3:port3/app/a,这样就指定了该客户端连接上ZooKeeper服务器之后,所有对ZooKeeper的操作,都会基于这个根目录。例如,客户端对/foo/bar 的操作,最终创建/app/a/foo/bar, 这个目录也叫Chroot,即客户端隔离命名空间。


sessionTimeout:会话的超时时间, “毫秒”为单位


在ZooKeeper中有会话的概念,在一个会话周期内,ZooKeeper客户端和服务器之间会通过心跳检测机制来维持会话的有效性.

一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。


watcher:事件通知处理器


ZooKeeper允许客户端在构造方法中传入一个接口 watcher (org.apache. zookeeper.Watcher)的实现类对象来作为默认的 Watcher事件通知处理器。

当然,该参数可以设置为null 以表明不需要设置默认的 Watcher处理器。


canBeReadOnly: 用于标识当前会话是否支持“read-only(只读)”模式。

boolean类型的参数

默认情况下,在ZooKeeper集群中,一个机器如果和集群中过半及以上机器失去了网络连接,那么这个机器将不再处理客户端请求(包括读写请

求)。


但是在某些使用场景下,当ZooKeeper服务器发生此类故障的时候,我们还是希望ZooKeeper服务器能够提供读服务(当然写服务肯定无法提供),这就是 ZooKeeper的“read-only”模式。


sessionId和 sessionPasswd:会话ID和会话秘钥


这两个参数能够唯一确定一个会话,同时客户端使用这两个参数可以实现客户端会话复用,从而达到恢复会话的效果。

具体使用方法是,第一次连接上ZooKeeper服务器时,通过调用ZooKeeper对象实例的以下两个接口,即可获得当前会话的ID和秘钥:


long getSessionId();
byte[]getSessionPasswd( );

荻取到这两个参数值之后,就可以在下次创建ZooKeeper对象实例的时候传入构造方法了


CRUD

同步创建节点

 package com.artisan.zk.originalClient;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
@Slf4j
public class BaseOperationStandAloneModeTest extends  StandAloneBaseTest{
    private  static final  String  NODE_NAME = "/artisan-node";
    @Test
    public void testCreate(){
      try{
          ZooKeeper zooKeeper = getZooKeeper();
          String s = zooKeeper.create(NODE_NAME,"artisan-node-value".getBytes(),
                  ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
          log.info("create persistent node {} , result {}" , NODE_NAME, s );
      }catch (Exception e){
          log.error("create Exception {}", e.getMessage());
      }
    } 
}

20201130154935832.png

修改数据

    @SneakyThrows
    @Test
    public void testSetData() {
        // 修改前数据
        Stat stat = new Stat();
        byte[] data = getZooKeeper().getData(NODE_NAME, null, stat);
        log.info("data before change: " + new String(data));
        int version = stat.getVersion();
        log.info("data version {} " , version);
        // 修改数据
        Stat newStat = getZooKeeper().setData(NODE_NAME, "ARTISAN - NEW-SET-DATA".getBytes(), version);
        log.info("new stat version info {} " , newStat.getVersion());
        log.info("data after change: {} " , new String(getZooKeeper().getData(NODE_NAME, null, newStat)));
    }


20201130155715762.png

20201130155811509.png


查询数据(不带watcher)

    @SneakyThrows
    @Test
    public void testGetWithOutWatch(){
        byte[] data = getZooKeeper().getData(NODE_NAME, null, null);
        log.info("data {}" , new String(data));
    }

20201130160139162.png


查询数据(带watcher)

    @SneakyThrows
    @Test
    public void testGetWithWatch(){
        Watcher  watcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 监听NodeDataChanged事件
                if (event.getPath() != null && event.getPath().equals(NODE_NAME)
                        && event.getType() == Watcher.Event.EventType.NodeDataChanged){
                    log.info("path {} changed watched " , NODE_NAME);
                    // 监听一旦触发就会失效,因此需要重新监听
                    try {
                        byte[] data = getZooKeeper().getData(NODE_NAME, this, null);
                        log.info("监听触发后的操作-- data: {}",new String(data));
                    } catch (Exception e) {
                        log.info("getData Error {} " , e.getMessage());
                    }
                }
            }
        };
        // 获取节点数据
        byte[] data = getZooKeeper().getData(NODE_NAME, watcher, null);
        log.info("data {}" , new String(data));
    } 


20201130160634722.png

因为监听的是NodeDataChanged事件,因此我们再去调用修改数据的方法,或者在客户端手动修改数据


2020113016083380.png


观察testGetWithWatch的日志


20201130160947318.png


zk里查看数据

20201130161020207.png


删除数据

    @SneakyThrows
    @Test
    public void testDelete(){
        // if the given version is -1, it matches any node's versions
        // -1 代表匹配所有版本,直接删除
        // 任意大于 -1 的代表可以指定数据版本删除
        getZooKeeper().delete(NODE_NAME,-1);
    }


查看客户端,已经删除

20201130161352434.png


异步创建节点

    @SneakyThrows
    @Test
    public void testCreateAsyn(){
        getZooKeeper().create(NODE_NAME, "DATA_VALUE".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT,
                (rc, path, ctx, name) -> {
                    String currentThreadName = Thread.currentThread().getName();
                    log.info("currentThreadName {} , rc {} , path {} , ctx {} , name {} " , currentThreadName, rc , path ,ctx ,name );
                }, "ARTISAN");
        byte[] data = getZooKeeper().getData(NODE_NAME, null, null);
        log.info("data {}" , new String(data));
    }


20201130163420666.png


EventThread创建的节点 ,而非当前线程

20201130163806791.png


行了 基本操作就这些,下篇继续



20201130163637953.png


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
2月前
|
Cloud Native API
微服务引擎 MSE 及云原生 API 网关 2024 年 9 月产品动态
微服务引擎 MSE 及云原生 API 网关 2024 年 9 月产品动态。
|
18天前
|
Cloud Native API 微服务
微服务引擎 MSE 及云原生 API 网关 2024 年 11 月产品动态
微服务引擎 MSE 及云原生 API 网关 2024 年 11 月产品动态。
|
19天前
|
运维 Cloud Native 应用服务中间件
阿里云微服务引擎 MSE 及 云原生 API 网关 2024 年 11 月产品动态
阿里云微服务引擎 MSE 面向业界主流开源微服务项目, 提供注册配置中心和分布式协调(原生支持 Nacos/ZooKeeper/Eureka )、云原生网关(原生支持Higress/Nginx/Envoy,遵循Ingress标准)、微服务治理(原生支持 Spring Cloud/Dubbo/Sentinel,遵循 OpenSergo 服务治理规范)能力。API 网关 (API Gateway),提供 APl 托管服务,覆盖设计、开发、测试、发布、售卖、运维监测、安全管控、下线等 API 生命周期阶段。帮助您快速构建以 API 为核心的系统架构.满足新技术引入、系统集成、业务中台等诸多场景需要
|
1月前
|
监控 负载均衡 API
Apache Apisix轻松打造亿级流量Api网关
Apache APISIX 是一个动态、实时、高性能的 API 网关,提供负载均衡、动态上行、灰度发布、熔断、鉴权、可观测等丰富的流量管理功能。适用于处理传统南北向流量、服务间东西向流量及 k8s 入口控制。Airflow 是一个可编程、调度和监控的工作流平台,基于有向无环图 (DAG) 定义和执行任务,提供丰富的命令行工具和 Web 管理界面,方便系统运维和管理。
Apache Apisix轻松打造亿级流量Api网关
|
4月前
|
存储 API Apache
【zookeeper 第三篇章】客户端 API
本文介绍了Apache ZooKeeper客户端的一些常用命令及其用法。首先,`create`命令用于创建不同类型的节点并为其赋值,如持久化节点、有序节点及临时节点等。通过示例展示了如何创建这些节点,并演示了创建过程中的输出结果。其次,`ls`命令用于列出指定路径下的所有子节点。接着,`set`命令用于更新节点中的数据,可以指定版本号实现乐观锁机制。
38 0
|
1月前
|
运维 Cloud Native 应用服务中间件
阿里云微服务引擎 MSE 及 云原生 API 网关 2024 年 10 月产品动态
阿里云微服务引擎 MSE 面向业界主流开源微服务项目, 提供注册配置中心和分布式协调(原生支持 Nacos/ZooKeeper/Eureka )、云原生网关(原生支持Higress/Nginx/Envoy,遵循Ingress标准)、微服务治理(原生支持 Spring Cloud/Dubbo/Sentinel,遵循 OpenSergo 服务治理规范)能力。API 网关 (API Gateway),提供 APl 托管服务,覆盖设计、开发、测试、发布、售卖、运维监测、安全管控、下线等 API 生命周期阶段。帮助您快速构建以 API 为核心的系统架构.满足新技术引入、系统集成、业务中台等诸多场景需要
|
2月前
|
运维 Cloud Native 应用服务中间件
阿里云微服务引擎 MSE 及 云原生 API 网关 2024 年 09 月产品动态
阿里云微服务引擎 MSE 面向业界主流开源微服务项目, 提供注册配置中心和分布式协调(原生支持 Nacos/ZooKeeper/Eureka )、云原生网关(原生支持Higress/Nginx/Envoy,遵循Ingress标准)、微服务治理(原生支持 Spring Cloud/Dubbo/Sentinel,遵循 OpenSergo 服务治理规范)能力。API 网关 (API Gateway),提供 APl 托管服务,覆盖设计、开发、测试、发布、售卖、运维监测、安全管控、下线等 API 生命周期阶段。帮助您快速构建以 API 为核心的系统架构.满足新技术引入、系统集成、业务中台等诸多场景需要
|
2月前
|
分布式计算 Java 大数据
大数据-147 Apache Kudu 常用 Java API 增删改查
大数据-147 Apache Kudu 常用 Java API 增删改查
39 1
|
3月前
|
Cloud Native API
微服务引擎 MSE 及云原生 API 网关 2024 年 8 月产品动态
微服务引擎 MSE 及云原生 API 网关 2024 年 8 月产品动态。
|
3月前
|
运维 Cloud Native 应用服务中间件
阿里云微服务引擎 MSE 及 云原生 API 网关 2024 年 08 月产品动态
阿里云微服务引擎 MSE 面向业界主流开源微服务项目, 提供注册配置中心和分布式协调(原生支持 Nacos/ZooKeeper/Eureka )、云原生网关(原生支持Higress/Nginx/Envoy,遵循Ingress标准)、微服务治理(原生支持 Spring Cloud/Dubbo/Sentinel,遵循 OpenSergo 服务治理规范)能力。API 网关 (API Gateway),提供 APl 托管服务,覆盖设计、开发、测试、发布、售卖、运维监测、安全管控、下线等 API 生命周期阶段。帮助您快速构建以 API 为核心的系统架构.满足新技术引入、系统集成、业务中台等诸多场景需要

热门文章

最新文章

推荐镜像

更多