分布式调用与高并发处理 Zookeeper分布式协调服务(三)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 分布式调用与高并发处理 Zookeeper分布式协调服务(三)

3.5 Watcher监听机制

ZooKeeper 提供了分布式数据的发布/订阅功能。一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使它们能够做出相应的处理。

注意:

在ZooKeeper中,引入了Watcher机制来实现这种分布式的通知功能。ZooKeeper 允许客户端向服务端注册一个 Watcher 监听,当服务端的一些指定事件触发了这个Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。

监听节点变化

#纯净客户端监听Java节点的变化
ls -w /java
#在zk-1客户端创建/java/spring节点
[zk: localhost:2181(CONNECTED) 1] create /java/spring   spirngmvc
Created /java/spring
#纯净客户端监听到了Java节点的变化
[zk: localhost:2181(CONNECTED) 11] 
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/java

注意:

命令如果使用 -w ,那么监听的是节点的变化,而不是值的变化。

监听节点的值的变化

#纯净客户端监听节点/java/spring的值
get -w /java/spring
#zk-1客户端修改/java/spring节点的值
[zk: localhost:2181(CONNECTED) 2] set /java/spring  mybatis
#纯净客户端监听到节点值的变化
[zk: localhost:2181(CONNECTED) 2] 
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/java/spring

注意:

watch监听机制只能够使用一次,如果下次想要使用,必须重新监听,就比如ls path watch命令,只能监听节点路径的改变一次,如果还想监听,那么需要再执行一次ls path watch命令。

3.6 权限控制ACL

在ZooKeeper的实际使用中,我们的做法往往是搭建一个共用的ZooKeeper集群,统一为若干个应用提供服务。在这种情况下,不同的应用之间往往是不会存在共享数据的使用场景的,因此需要解决不同应用之间的权限问题。

注意:

  1. ZooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限
  2. 每个znode支持设置多种权限控制方案和多个权限
  3. 子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点

权限模式schema

ZooKeeper内置了一些权限控制方案,可以用以下方案为每个节点设置权限:

方案 描述
world 只有一个用户:anyone,代表所有人(默认)
ip 使用IP地址认证
auth 使用已添加认证的用户认证
digest 使用“用户名:密码”方式认证

授权对象ID

授权对象ID是指,权限赋予的用户或者一个实体,例如:IP 地址或者机器。授权模式 schema 与 授权对象 ID 之间关系:

权限模式 授权对象
IP 通常是一个IP地址或是IP段,例如“192.168.66.101”
Digest 自定义,通常是“username:BASE64(SHA-1(username:password))”
World 只有一个ID:"anyone"
Super 与Digest模式一致

权限permission

权限 ACL简写 描述
CREATE c 可以创建子节点
DELETE d 可以删除子节点(仅下一级节点)
READ r 可以读取节点数据及显示子节点列表
WRITE w 可以设置节点数据
ADMIN a 可以设置节点访问控制列表权限

权限相关命令

命令 使用方式 描述
getAcl getAcl 读取ACL权限
setAcl setAcl 设置ACL权限
addauth addauth 添加认证用户

示例:World方案

[zk: localhost:2181(CONNECTED) 0] create /node1 1
Created /node1
[zk: localhost:2181(CONNECTED) 1] getAcl /node1
'world,'anyone  #默认为world方案
: cdrwa #任何人都拥有所有权限

示例:IP方案

#创建节点
[zk: localhost:2181(CONNECTED) 0] create /node2 1
Created /node2
#设置权限(只有ip地址为192.168.66.100的客户端才能操作该节点)
[zk: localhost:2181(CONNECTED) 1] setAcl /node2 ip:192.168.66.100:cdrwa 
#使用IP非 192.168.66.100 的机器
[zk: localhost:2181(CONNECTED) 0] get /node2
Authentication is not valid : /node2 #没有权限

示例:Auth方案

#创建节点
[zk: localhost:2181(CONNECTED) 0] create /node3 1
Created /node3
#添加认证用户zj,密码123456
[zk: localhost:2181(CONNECTED) 1] addauth digest zj:123456
#为创建的权限用户zj设置权限
[zk: localhost:2181(CONNECTED) 2] setAcl /node3 auth:zj:cdrwa
#获取权限
[zk: localhost:2181(CONNECTED) 3] getAcl /node3
'digest,'zj:UvJWhBril5yzpEiA2eV7bwwhfLs=
: cdrwa

示例:Digest方案

#密码加密
echo -n zj:123456 | openssl dgst -binary -sha1 | openssl base64
UvJWhBril5yzpEiA2eV7bwwhfLs=
#创建节点
[zk: localhost:2181(CONNECTED) 0] create /node4 1
Created /node4
#使用是算好的密文密码添加权限:
[zk: localhost:2181(CONNECTED) 1] setAcl /node4 digest:zj:UvJWhBril5yzpEiA2eV7bwwhfLs=:cdrwa
 #获取节点数据没有权限
[zk: localhost:2181(CONNECTED) 3] get /node4
Authentication is not valid : /node4
#添加认证用户
[zk: localhost:2181(CONNECTED) 4] addauth digest zj:123456
#成功读取数据
[zk: localhost:2181(CONNECTED) 5] get /node4 1

四、原生api操作Zookeeper

利用Zookeeper官方的原生java api进行连接,然后演示一些创建、删除、修改、查询节点的操作。

1.创建maven项目,引入依赖。

<dependencies>
  <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.8</version>
  </dependency>
</dependencies>

2.创建连接和节点

注意:在使用Java连接zookeeper服务的时候先要在虚拟机开启zookeeper的服务。(zkSserver.sh start)

public class ZKMain {
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //1.创建与服务端的对话
        /**
         * 第一个参数:zookeeper的服务地址
         * 第二个参数:会话超时时间
         * 第三个参数:监听机制
         */
        ZooKeeper zooKeeper = new ZooKeeper("192.168.66.100:2181," +
                                                         "192.168.66.101:2181," +
                                                          "192.168.66.102:2181", 4000, null);
        //2.查看连接状态
        System.out.println(zooKeeper.getState());
        //3.创建节点
        /*
         * 第一个参数:节点的名字
         * 第二个参数:节点的数据
         * 第三个参数:节点的权限策略
         *            ZooDefs.Ids.OPEN_ACL_UNSAFE:完全开放任何人都能操作
         *            ZooDefs.Ids.CREATOR_ALL_ACL:创建者才能操作
         *            ZooDefs.Ids.READ_ACL_UNSAFE:只读权限
         * 第四个参数:节点类型(持久、临时、……)
         *           CreateMode.PERSISTENT:持久型节点
         *           CreateMode.EPHEMERAL:临时节点
         *           CreateMode.PERSISTENT_SEQUENTIAL:持久顺序节点
         *           CreateMode.EPHEMERAL_SEQUENTIAL:临时顺序节点
         */
            zooKeeper.create("/zkTest","1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
}

znode 类型有四种

  • PERSISTENT - 持久化目录节点,客户端与zookeeper断开连接后,该节点依旧存在
  • PERSISTENT_SEQUENTIAL - 持久化,并带有序列号
  • EPHEMERAL - 临时目录节点,客户端与zookeeper断开连接后,该节点被删除
  • EPHEMERAL_SEQUENTIAL - 临时,并带有序列号

3.修改节点数据

//5.修改节点
        /**
         * 参数一:节点名称
         * 参数二:修改的数据
         * 参数三:版本,全部修改设为-1
         */
        zooKeeper.setData("/zkTest","2".getBytes(),-1);

4.获取节点数据和节点信息

//6.获取节点数据
        /**
         * 参数一:节点名称
         * 参数二:监听机制
         * 参数三:状态信息
         */
        byte[] data = zooKeeper.getData("/zkTest", null, null);
        System.out.println(new String(data));
        //7.获取节点的子节点信息
        /**
         * 参数一:节点名称
         * 参数二:监听机制
         */
        List<String> children = zooKeeper.getChildren("/zkTest", null);
        for (String child : children) {
            System.out.println(child);
        }

5.监听节点

public class ZKWatcher {
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //1.创建与服务端的对话
        /*
         * 第一个参数:zookeeper的服务地址
         * 第二个参数:会话超时时间
         * 第三个参数:监听机制,接口类型,使用匿名内部类实现该接口
         */
        ZooKeeper zooKeeper = new ZooKeeper("192.168.66.100:2181," +
                "192.168.66.101:2181," +
                "192.168.66.102:2181", 4000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("正在监听……");
            }
        });
        //2.注册监听机制监听节点
        zooKeeper.getChildren("/zkTest", new Watcher() {
            /*对 /zkTest 的子节点进行操作的时候会回调该监听方法*/
            @Override
            public void process(WatchedEvent watchedEvent) {
                //查看监听路径
                System.out.println("监听路径:"+watchedEvent.getPath());
                //查看监听的事件
                System.out.println("监听的事件:"+watchedEvent.getType());
            }
        });
        //线程休眠,保持持续监听
        Thread.sleep(Long.MAX_VALUE);
    }
}

删除 /zkTest 的子节点  /zkTest1 节点的数据时触发监听:

6.监听节点数据

package com.zj;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class ZKWatcher {
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //1.创建与服务端的对话
        /*
         * 第一个参数:zookeeper的服务地址
         * 第二个参数:会话超时时间
         * 第三个参数:监听机制,接口类型,使用匿名内部类实现该接口
         */
        ZooKeeper zooKeeper = new ZooKeeper("192.168.66.100:2181," +
                "192.168.66.101:2181," +
                "192.168.66.102:2181", 4000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("正在监听……");
            }
        });
        //3.监听节点数据
        zooKeeper.getData("/zkTest", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //查看监听路径
                System.out.println("监听路径:"+watchedEvent.getPath());
                //查看监听的事件
                System.out.println("监听的事件:"+watchedEvent.getType());
            }
        },null);
        //线程休眠,保持持续监听
        Thread.sleep(Long.MAX_VALUE);
    }
}

修改 /zkTest 节点的数据:

注意:

通过zooKeeper.getchildren("/",new watch()){}来注册监听,监听的是整个根节点,但是这个监听只能监听一次。线程休眠是为了让监听等待事件发生,不然会随着程序直接运行完。


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
1月前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
124 2
|
6天前
|
存储 大数据 Apache
深入理解ZooKeeper:分布式协调服务的核心与实践
【5月更文挑战第7天】ZooKeeper是Apache的分布式协调服务,确保大规模分布式系统中的数据一致性与高可用性。其特点包括强一致性、高可用性、可靠性、顺序性和实时性。使用ZooKeeper涉及安装配置、启动服务、客户端连接及执行操作。实际应用中,面临性能瓶颈、不可伸缩性和单点故障等问题,可通过水平扩展、集成其他服务和多集群备份来解决。理解ZooKeeper原理和实践,有助于构建高效分布式系统。
|
17天前
使用JWT的服务分布式部署之后报错:JWT Check Failure:
使用JWT的服务分布式部署之后报错:JWT Check Failure:
24 1
|
26天前
|
存储 Linux 数据库
ZooKeeper【搭建 01】apache-zookeeper-3.6.2 单机版安装+配置+添加到service服务+开机启动配置+验证+chkconfig配置(一篇入门zookeeper)
【4月更文挑战第8天】ZooKeeper【搭建 01】apache-zookeeper-3.6.2 单机版安装+配置+添加到service服务+开机启动配置+验证+chkconfig配置(一篇入门zookeeper)
38 0
|
1月前
|
Java 网络安全 Apache
搭建Zookeeper集群:三台服务器,一场分布式之舞
搭建Zookeeper集群:三台服务器,一场分布式之舞
48 0
|
15天前
|
NoSQL Java 关系型数据库
【Redis系列笔记】分布式锁
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。 分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路
113 2
|
1月前
|
NoSQL Java Redis
redis分布式锁
redis分布式锁
|
10天前
|
监控 NoSQL 算法
探秘Redis分布式锁:实战与注意事项
本文介绍了Redis分区容错中的分布式锁概念,包括利用Watch实现乐观锁和使用setnx防止库存超卖。乐观锁通过Watch命令监控键值变化,在事务中执行修改,若键值被改变则事务失败。Java代码示例展示了具体实现。setnx命令用于库存操作,确保无超卖,通过设置锁并检查库存来更新。文章还讨论了分布式锁存在的问题,如客户端阻塞、时钟漂移和单点故障,并提出了RedLock算法来提高可靠性。Redisson作为生产环境的分布式锁实现,提供了可重入锁、读写锁等高级功能。最后,文章对比了Redis、Zookeeper和etcd的分布式锁特性。
113 16
探秘Redis分布式锁:实战与注意事项
|
12天前
|
NoSQL Java 大数据
介绍redis分布式锁
分布式锁是解决多进程在分布式环境中争夺资源的问题,与本地锁相似但适用于不同进程。以Redis为例,通过`setIfAbsent`实现占锁,加锁同时设置过期时间避免死锁。然而,获取锁与设置过期时间非原子性可能导致并发问题,解决方案是使用`setIfAbsent`的超时参数。此外,释放锁前需验证归属,防止误删他人锁,可借助Lua脚本确保原子性。实际应用中还有锁续期、重试机制等复杂问题,现成解决方案如RedisLockRegistry和Redisson。
|
12天前
|
缓存 NoSQL Java
【亮剑】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护,如何使用注解来实现 Redis 分布式锁的功能?
【4月更文挑战第30天】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护。基于 Redis 的分布式锁利用 SETNX 或 SET 命令实现,并考虑自动过期、可重入及原子性以确保可靠性。在 Java Spring Boot 中,可通过 `@EnableCaching`、`@Cacheable` 和 `@CacheEvict` 注解轻松实现 Redis 分布式锁功能。