分布式调用与高并发处理 Zookeeper分布式协调服务(三)

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 分布式调用与高并发处理 Zookeeper分布式协调服务(三)

3.5 Watcher监听机制

ZooKeeper 提供了分布式数据的发布/订阅功能。一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使它们能够做出相应的处理。

注意:

在ZooKeeper中,引入了Watcher机制来实现这种分布式的通知功能。ZooKeeper 允许客户端向服务端注册一个 Watcher 监听,当服务端的一些指定事件触发了这个Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。

监听节点变化

#纯净客户端监听Java节点的变化
ls -w /java
#在zk-1客户端创建/java/spring节点
[zk: localhost:2181(CONNECTED) 1] create /java/spring   spirngmvc
Created /java/spring
#纯净客户端监听到了Java节点的变化
[zk: localhost:2181(CONNECTED) 11] 
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/java

注意:

命令如果使用 -w ,那么监听的是节点的变化,而不是值的变化。

监听节点的值的变化

#纯净客户端监听节点/java/spring的值
get -w /java/spring
#zk-1客户端修改/java/spring节点的值
[zk: localhost:2181(CONNECTED) 2] set /java/spring  mybatis
#纯净客户端监听到节点值的变化
[zk: localhost:2181(CONNECTED) 2] 
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/java/spring

注意:

watch监听机制只能够使用一次,如果下次想要使用,必须重新监听,就比如ls path watch命令,只能监听节点路径的改变一次,如果还想监听,那么需要再执行一次ls path watch命令。

3.6 权限控制ACL

在ZooKeeper的实际使用中,我们的做法往往是搭建一个共用的ZooKeeper集群,统一为若干个应用提供服务。在这种情况下,不同的应用之间往往是不会存在共享数据的使用场景的,因此需要解决不同应用之间的权限问题。

注意:

  1. ZooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限
  2. 每个znode支持设置多种权限控制方案和多个权限
  3. 子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点

权限模式schema

ZooKeeper内置了一些权限控制方案,可以用以下方案为每个节点设置权限:

方案 描述
world 只有一个用户:anyone,代表所有人(默认)
ip 使用IP地址认证
auth 使用已添加认证的用户认证
digest 使用“用户名:密码”方式认证

授权对象ID

授权对象ID是指,权限赋予的用户或者一个实体,例如:IP 地址或者机器。授权模式 schema 与 授权对象 ID 之间关系:

权限模式 授权对象
IP 通常是一个IP地址或是IP段,例如“192.168.66.101”
Digest 自定义,通常是“username:BASE64(SHA-1(username:password))”
World 只有一个ID:"anyone"
Super 与Digest模式一致

权限permission

权限 ACL简写 描述
CREATE c 可以创建子节点
DELETE d 可以删除子节点(仅下一级节点)
READ r 可以读取节点数据及显示子节点列表
WRITE w 可以设置节点数据
ADMIN a 可以设置节点访问控制列表权限

权限相关命令

命令 使用方式 描述
getAcl getAcl 读取ACL权限
setAcl setAcl 设置ACL权限
addauth addauth 添加认证用户

示例:World方案

[zk: localhost:2181(CONNECTED) 0] create /node1 1
Created /node1
[zk: localhost:2181(CONNECTED) 1] getAcl /node1
'world,'anyone  #默认为world方案
: cdrwa #任何人都拥有所有权限

示例:IP方案

#创建节点
[zk: localhost:2181(CONNECTED) 0] create /node2 1
Created /node2
#设置权限(只有ip地址为192.168.66.100的客户端才能操作该节点)
[zk: localhost:2181(CONNECTED) 1] setAcl /node2 ip:192.168.66.100:cdrwa 
#使用IP非 192.168.66.100 的机器
[zk: localhost:2181(CONNECTED) 0] get /node2
Authentication is not valid : /node2 #没有权限

示例:Auth方案

#创建节点
[zk: localhost:2181(CONNECTED) 0] create /node3 1
Created /node3
#添加认证用户zj,密码123456
[zk: localhost:2181(CONNECTED) 1] addauth digest zj:123456
#为创建的权限用户zj设置权限
[zk: localhost:2181(CONNECTED) 2] setAcl /node3 auth:zj:cdrwa
#获取权限
[zk: localhost:2181(CONNECTED) 3] getAcl /node3
'digest,'zj:UvJWhBril5yzpEiA2eV7bwwhfLs=
: cdrwa

示例:Digest方案

#密码加密
echo -n zj:123456 | openssl dgst -binary -sha1 | openssl base64
UvJWhBril5yzpEiA2eV7bwwhfLs=
#创建节点
[zk: localhost:2181(CONNECTED) 0] create /node4 1
Created /node4
#使用是算好的密文密码添加权限:
[zk: localhost:2181(CONNECTED) 1] setAcl /node4 digest:zj:UvJWhBril5yzpEiA2eV7bwwhfLs=:cdrwa
 #获取节点数据没有权限
[zk: localhost:2181(CONNECTED) 3] get /node4
Authentication is not valid : /node4
#添加认证用户
[zk: localhost:2181(CONNECTED) 4] addauth digest zj:123456
#成功读取数据
[zk: localhost:2181(CONNECTED) 5] get /node4 1

四、原生api操作Zookeeper

利用Zookeeper官方的原生java api进行连接,然后演示一些创建、删除、修改、查询节点的操作。

1.创建maven项目,引入依赖。

<dependencies>
  <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.8</version>
  </dependency>
</dependencies>

2.创建连接和节点

注意:在使用Java连接zookeeper服务的时候先要在虚拟机开启zookeeper的服务。(zkSserver.sh start)

public class ZKMain {
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //1.创建与服务端的对话
        /**
         * 第一个参数:zookeeper的服务地址
         * 第二个参数:会话超时时间
         * 第三个参数:监听机制
         */
        ZooKeeper zooKeeper = new ZooKeeper("192.168.66.100:2181," +
                                                         "192.168.66.101:2181," +
                                                          "192.168.66.102:2181", 4000, null);
        //2.查看连接状态
        System.out.println(zooKeeper.getState());
        //3.创建节点
        /*
         * 第一个参数:节点的名字
         * 第二个参数:节点的数据
         * 第三个参数:节点的权限策略
         *            ZooDefs.Ids.OPEN_ACL_UNSAFE:完全开放任何人都能操作
         *            ZooDefs.Ids.CREATOR_ALL_ACL:创建者才能操作
         *            ZooDefs.Ids.READ_ACL_UNSAFE:只读权限
         * 第四个参数:节点类型(持久、临时、……)
         *           CreateMode.PERSISTENT:持久型节点
         *           CreateMode.EPHEMERAL:临时节点
         *           CreateMode.PERSISTENT_SEQUENTIAL:持久顺序节点
         *           CreateMode.EPHEMERAL_SEQUENTIAL:临时顺序节点
         */
            zooKeeper.create("/zkTest","1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
}

znode 类型有四种

  • PERSISTENT - 持久化目录节点,客户端与zookeeper断开连接后,该节点依旧存在
  • PERSISTENT_SEQUENTIAL - 持久化,并带有序列号
  • EPHEMERAL - 临时目录节点,客户端与zookeeper断开连接后,该节点被删除
  • EPHEMERAL_SEQUENTIAL - 临时,并带有序列号

3.修改节点数据

//5.修改节点
        /**
         * 参数一:节点名称
         * 参数二:修改的数据
         * 参数三:版本,全部修改设为-1
         */
        zooKeeper.setData("/zkTest","2".getBytes(),-1);

4.获取节点数据和节点信息

//6.获取节点数据
        /**
         * 参数一:节点名称
         * 参数二:监听机制
         * 参数三:状态信息
         */
        byte[] data = zooKeeper.getData("/zkTest", null, null);
        System.out.println(new String(data));
        //7.获取节点的子节点信息
        /**
         * 参数一:节点名称
         * 参数二:监听机制
         */
        List<String> children = zooKeeper.getChildren("/zkTest", null);
        for (String child : children) {
            System.out.println(child);
        }

5.监听节点

public class ZKWatcher {
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //1.创建与服务端的对话
        /*
         * 第一个参数:zookeeper的服务地址
         * 第二个参数:会话超时时间
         * 第三个参数:监听机制,接口类型,使用匿名内部类实现该接口
         */
        ZooKeeper zooKeeper = new ZooKeeper("192.168.66.100:2181," +
                "192.168.66.101:2181," +
                "192.168.66.102:2181", 4000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("正在监听……");
            }
        });
        //2.注册监听机制监听节点
        zooKeeper.getChildren("/zkTest", new Watcher() {
            /*对 /zkTest 的子节点进行操作的时候会回调该监听方法*/
            @Override
            public void process(WatchedEvent watchedEvent) {
                //查看监听路径
                System.out.println("监听路径:"+watchedEvent.getPath());
                //查看监听的事件
                System.out.println("监听的事件:"+watchedEvent.getType());
            }
        });
        //线程休眠,保持持续监听
        Thread.sleep(Long.MAX_VALUE);
    }
}

删除 /zkTest 的子节点  /zkTest1 节点的数据时触发监听:

6.监听节点数据

package com.zj;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class ZKWatcher {
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //1.创建与服务端的对话
        /*
         * 第一个参数:zookeeper的服务地址
         * 第二个参数:会话超时时间
         * 第三个参数:监听机制,接口类型,使用匿名内部类实现该接口
         */
        ZooKeeper zooKeeper = new ZooKeeper("192.168.66.100:2181," +
                "192.168.66.101:2181," +
                "192.168.66.102:2181", 4000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("正在监听……");
            }
        });
        //3.监听节点数据
        zooKeeper.getData("/zkTest", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //查看监听路径
                System.out.println("监听路径:"+watchedEvent.getPath());
                //查看监听的事件
                System.out.println("监听的事件:"+watchedEvent.getType());
            }
        },null);
        //线程休眠,保持持续监听
        Thread.sleep(Long.MAX_VALUE);
    }
}

修改 /zkTest 节点的数据:

注意:

通过zooKeeper.getchildren("/",new watch()){}来注册监听,监听的是整个根节点,但是这个监听只能监听一次。线程休眠是为了让监听等待事件发生,不然会随着程序直接运行完。


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
61 3
|
2月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
1月前
|
消息中间件 监控 Ubuntu
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
69 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
|
1月前
|
监控 Dubbo Java
dubbo学习三:springboot整合dubbo+zookeeper,并使用dubbo管理界面监控服务是否注册到zookeeper上。
这篇文章详细介绍了如何将Spring Boot与Dubbo和Zookeeper整合,并通过Dubbo管理界面监控服务注册情况。
74 0
dubbo学习三:springboot整合dubbo+zookeeper,并使用dubbo管理界面监控服务是否注册到zookeeper上。
|
1月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
41 2
|
1月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
45 1
|
2月前
|
数据采集 分布式计算 MaxCompute
MaxCompute 分布式计算框架 MaxFrame 服务正式商业化公告
MaxCompute 分布式计算框架 MaxFrame 服务于北京时间2024年09月27日正式商业化!
79 3
|
1月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
46 0
|
2月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
2月前
|
存储 负载均衡 Dubbo
分布式-Zookeeper(一)
分布式-Zookeeper(一)

热门文章

最新文章

  • 1
    高并发场景下,到底先更新缓存还是先更新数据库?
    64
  • 2
    Java面试题:解释Java NIO与BIO的区别,以及NIO的优势和应用场景。如何在高并发应用中实现NIO?
    70
  • 3
    Java面试题:设计一个线程安全的单例模式,并解释其内存占用和垃圾回收机制;使用生产者消费者模式实现一个并发安全的队列;设计一个支持高并发的分布式锁
    66
  • 4
    Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
    61
  • 5
    Java面试题:结合建造者模式与内存优化,设计一个可扩展的高性能对象创建框架?利用多线程工具类与并发框架,实现一个高并发的分布式任务调度系统?设计一个高性能的实时事件通知系统
    53
  • 6
    Java面试题:假设你正在开发一个Java后端服务,该服务需要处理高并发的用户请求,并且对内存使用效率有严格的要求,在多线程环境下,如何确保共享资源的线程安全?
    68
  • 7
    在Java中实现高并发的数据访问控制
    41
  • 8
    使用Java构建一个高并发的网络服务
    27
  • 9
    微服务06----Eureka注册中心,微服务的两大服务,订单服务和用户服务,订单服务需要远程调用我们的用,户服务,消费者,如果环境改变,硬编码问题就会随之产生,为了应对高并发,我们可能会部署成一个集
    37
  • 10
    如何设计一个秒杀系统,(高并发高可用分布式集群)
    127