【zookeeper 第四篇章】监控 Watcher

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: ZooKeeper通过Watcher机制实现了数据的发布/订阅功能。多个订阅者可以监听同一主题对象,一旦该对象状态变化,如节点内容或子节点列表变动,ZooKeeper会实时通知所有订阅者。Watcher架构包括ZooKeeper服务端、客户端及其Watcher管理器。客户端向服务端注册Watcher并保存至本地管理器中;当状态变化时,服务端通知客户端,触发相关Watcher回调处理逻辑。

一、Watcher 概述

zookeeper 提供了数据的 发布/订阅功能,多个订阅者可同时监听某一特定的主题对象,当主题对象的自身状态发生了变化时(例如节点内容发生了改变、节点下的子节点列表发生改变等),会实时、主动的通知所有订阅者。

二、Watcher 架构

Watcher 由三部分组成 zookeeper服务端zookeeper客户端客户端的watchManager对象
watcher.png

客户端首先将 Watcher 注册到服务器,同时将 Watcher 对象保存到客户端的 Watcher管理器 中。当 zookeeper 服务器端监听数据状态发生变化时,服务端会主动通知客户端,接着 客户端的Watcher 管理器会触发相关的 Watcher 来回调相应的处理逻辑,从而完成整体的数据的发布/订阅流程。

三、Watcher 特性

特性 说明
一次性 watcher是一次性的,一旦被触发就会被移除,再次使用时需要重新注册。
客户端顺序回调 watcher 回调的顺序是串行化执行的,只有回调后客户端才能看到最新的数据状态。
轻量级 watcherEvent是最小的通信单元,结构上只包含通知状态、事件类型和节点路径,并不会告诉数据节点变化前后的具体内容。
时效性 watcher只有在当前的session彻底失效时才会无效,若在session有效内快速重连成功,则watcher依然存在,依然可以接收到通知。

四、Watcher 通知状态(KeeperState)

枚举属性 说明
SyncConnected 客户端和服务端正常连接
Disconnected 客户端和服务端断开连接
Expired 会话session失效
AuthFailed 身份认证失败

五、Watcher 事件类型(EventType)

枚举属性 说明
None
NodeCreated 数据的节点创建
NodeDeleted 数据的节点删除
NodeDataChanged 数据节点内容发生变更时
NodeChildrenChanged 节点的子节点列表发生变更时

六、案例

1、状态获取

package com.snails.zookeeper;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

public class ZkConnectionWatcher implements Watcher {
   
   

    //计数器对象
    static CountDownLatch countDownLatch = new CountDownLatch(1);
    //链接对象
    static ZooKeeper zookeeper;

    @Override
    public void process(WatchedEvent event) {
   
   

        try {
   
   
            //事件类型
            if(event.getType() == Event.EventType.None) {
   
   
                if(event.getState() == Event.KeeperState.SyncConnected ) {
   
   
                    System.out.println("创建连接成功"); //当连接到zookeeper服务器时,会执行这一块的代码块。
                    countDownLatch.countDown();
                } else if (event.getState() == Event.KeeperState.Disconnected) {
   
   
                    System.out.println("断开连接");    //  当和zookeeper服务器断开连接时,会执行这一块代码。
                                                      // 当网络重新连接成功连接成功之后,并且在sessionTimeOut设置的范围之内,会自动重新连接到服务器上。
                } else if (event.getState() == Event.KeeperState.Expired) {
   
   
                    System.out.println("会话超时");  // 当网络时间超过sessionTimeOut设置的时长时,会执行这一块代码。
                    zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new ZkConnectionWatcher()); //网络超时一般设置重新连接
                } else if (event.getState() == Event.KeeperState.AuthFailed) {
   
   
                    System.out.println("认证失败");
                }
            }
        } catch (Exception e) {
   
   
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
   
   
        try {
   
   
            zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new ZkConnectionWatcher());

            //通过授权模式获取节点数据
            zookeeper.addAuthInfo("digest", "woniu:123456".getBytes());
            byte[] data = zookeeper.getData("/woniu", false, null);
            System.out.println(new String(data));

            countDownLatch.await();

            System.out.println(zookeeper.getSessionId());

            Thread.sleep(50000);

            zookeeper.close();

            System.out.println("结束");
        } catch (Exception e) {
   
   
            e.printStackTrace();
        }
    }

}

2、Watcher三种API之Exists

package com.snails.zookeeper;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZkWatcherExists {
   
   

    String Ip = "127.0.0.1:2181";
    ZooKeeper zooKeeper = null;

    @Before
    public void before() throws IOException, InterruptedException {
   
   
        CountDownLatch countDownLatch = new CountDownLatch(1);

        zooKeeper = new ZooKeeper(Ip, 60000, new Watcher() {
   
   
            @Override
            public void process(WatchedEvent event) {
   
   
                if(event.getState() == Event.KeeperState.SyncConnected) {
   
   
                    countDownLatch.countDown();
                }
                try {
   
   
                    //解决一次性的问题,就是每次节点的修改,不用重启客户端。
                    zooKeeper.exists("/woniu1",this);
                } catch (KeeperException e) {
   
   
                    e.printStackTrace();
                } catch (InterruptedException e) {
   
   
                    e.printStackTrace();
                }
                System.out.println("path= " + event.getPath());
                System.out.println("path= " + event.getType());
            }
        });
        countDownLatch.await();
    }

    @After
    public void after() throws InterruptedException {
   
   
        zooKeeper.close();
    }


    @Test
    public void watchderFunction01() throws KeeperException, InterruptedException {
   
   
        zooKeeper.exists("/woniu1", true);

        Thread.sleep(500000);

        System.out.println("end-----");
    }

    //注册多个监听器
    @Test
    public void watchderFunction02() throws KeeperException, InterruptedException {
   
   
        zooKeeper.exists("/woniu1", new Watcher() {
   
   
            @Override
            public void process(WatchedEvent event) {
   
   
                System.out.println("监听woniu1 ===================");
                System.out.println("path= " + event.getPath());
                System.out.println("path= " + event.getType());
            }
        });

        zooKeeper.exists("/woniu2", new Watcher() {
   
   
            @Override
            public void process(WatchedEvent event) {
   
   
                System.out.println("监听woniu2 ===================");
                System.out.println("path= " + event.getPath());
                System.out.println("path= " + event.getType());
            }
        });

        Thread.sleep(500000);
        System.out.println("end-----");
    }
}

3、Watcher三种API之getData

package com.snails.zookeeper;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZkWatcherGetData {
   
   

    String Ip = "127.0.0.1:2181";
    ZooKeeper zooKeeper = null;

    @Before
    public void before() throws IOException, InterruptedException {
   
   
        CountDownLatch countDownLatch = new CountDownLatch(1);

        zooKeeper = new ZooKeeper(Ip, 60000, new Watcher() {
   
   
            @Override
            public void process(WatchedEvent event) {
   
   
                if(event.getState() == Event.KeeperState.SyncConnected) {
   
   
                    countDownLatch.countDown();
                }
                try {
   
   
                    //解决一次性的问题,就是每次节点的修改,不用重启客户端。
                    zooKeeper.exists("/woniu1",this);
                } catch (KeeperException e) {
   
   
                    e.printStackTrace();
                } catch (InterruptedException e) {
   
   
                    e.printStackTrace();
                }
                System.out.println("path= " + event.getPath());
                System.out.println("path= " + event.getType());
            }
        });
        countDownLatch.await();
    }

    @After
    public void after() throws InterruptedException {
   
   
        zooKeeper.close();
    }


    @Test
    public void watchderFunction01() throws KeeperException, InterruptedException {
   
   
        zooKeeper.getData("/woniu1", true, null);
        Thread.sleep(500000);
        System.out.println("end-----");

    }

    @Test
    public void watchderFunction02() throws KeeperException, InterruptedException {
   
   
        zooKeeper.getData("/woniu1", new Watcher() {
   
   
            @Override
            public void process(WatchedEvent event) {
   
   
                System.out.println("自定义watcher");
                System.out.println("path= " + event.getPath());
                System.out.println("path= " + event.getType());
            }
        }, null);

        Thread.sleep(500000);
        System.out.println("end-----");

    }
}

4、Watcher三种API之getChild:

package com.snails.zookeeper;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZkWatcherGetChild {
   
   

    String Ip = "127.0.0.1:2181";
    ZooKeeper zooKeeper = null;

    @Before
    public void before() throws IOException, InterruptedException {
   
   
        CountDownLatch countDownLatch = new CountDownLatch(1);

        zooKeeper = new ZooKeeper(Ip, 60000, new Watcher() {
   
   
            @Override
            public void process(WatchedEvent event) {
   
   
                if(event.getState() == Event.KeeperState.SyncConnected) {
   
   
                    countDownLatch.countDown();
                }
                System.out.println("path= " + event.getPath());
                System.out.println("path= " + event.getType());
            }
        });
        countDownLatch.await();
    }

    @After
    public void after() throws InterruptedException {
   
   
        zooKeeper.close();
    }


    @Test
    public void watchderFunction01() throws KeeperException, InterruptedException {
   
   
        zooKeeper.getChildren("/woniu1", true);
        Thread.sleep(500000);
        System.out.println("end-----");
    }

}
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
4月前
|
监控 NoSQL Java
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
|
19天前
|
监控 Dubbo Java
dubbo学习三:springboot整合dubbo+zookeeper,并使用dubbo管理界面监控服务是否注册到zookeeper上。
这篇文章详细介绍了如何将Spring Boot与Dubbo和Zookeeper整合,并通过Dubbo管理界面监控服务注册情况。
31 0
dubbo学习三:springboot整合dubbo+zookeeper,并使用dubbo管理界面监控服务是否注册到zookeeper上。
|
22天前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
34 1
|
22天前
|
分布式计算 Hadoop Unix
Hadoop-28 ZooKeeper集群 ZNode简介概念和测试 数据结构与监听机制 持久性节点 持久顺序节点 事务ID Watcher机制
Hadoop-28 ZooKeeper集群 ZNode简介概念和测试 数据结构与监听机制 持久性节点 持久顺序节点 事务ID Watcher机制
39 1
|
6月前
|
存储 API
深入理解Zookeeper系列-4.Watcher原理
深入理解Zookeeper系列-4.Watcher原理
55 1
|
6月前
|
存储 监控 网络协议
Zookeeper监控之四字监控
Zookeeper监控之四字监控
213 0
|
算法
Zookeeper 的读写机制、保证机制、Watcher(数据变更的通知)
Zookeeper 的读写机制、保证机制、Watcher(数据变更的通知)
141 0
|
监控 Java Apache
Apache ZooKeeper - JMX监控 ZooKeeper 的运行状态
Apache ZooKeeper - JMX监控 ZooKeeper 的运行状态
160 0
|
Go 数据安全/隐私保护 微服务
48-微服务技术栈(高级):分布式协调服务zookeeper源码篇(Watcher机制-3[Zookeeper])
  前面已经分析了Watcher机制中的大多数类,本篇对于ZKWatchManager的外部类Zookeeper进行分析。
152 0
|
微服务
46-微服务技术栈(高级):分布式协调服务zookeeper源码篇(Watcher机制-1)
  前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。
91 0