尚硅谷Zookeeper学习笔记(三)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 尚硅谷Zookeeper学习笔记

3.3.4 获取 子节点 并 监听 节点

@Test
    public void getChildRen() throws KeeperException, InterruptedException {
        //true:使用init中创建的监听器. 每次出现变化,将重新调用监听器中的 方法.  也可以自定一个监听器.
        //监听某个路径的节点变化情况
        List <String> children = zkClient.getChildren("/", true);
        for (String child : children) {
            System.out.println(child);
        }
        //延时 让其一直监听
        Thread.sleep(Long.MAX_VALUE);
    }

1)在 IDEA 控制台上看到如下节点:

zookeeper
sanguo
atguigu


(2)在 hadoop102 的客户端上创建再创建一个节点/atguigu1,观察 IDEA 控制台

[zk: localhost:2181(CONNECTED) 3] create /atguigu1 "atguigu1"


(3)在 hadoop102 的客户端上删除节点/atguigu1,观察 IDEA 控制台

[zk: localhost:2181(CONNECTED) 4] delete /atguigu1


3.3.5 判断 Znode 是否存在

@Test
    public void exist() throws KeeperException, InterruptedException {
        Stat stat = zkClient.exists("/atguigu", false);
        System.out.println(stat==null ? "not exist ":"exist");
    }

3.4 客户端向服务端写数据流程

045d1e633aad4de5a2bd39c661cf74ef.png


e52f63c9999245f2ade92d53136f1463.png

第 4 章 服务器动态上下线监听案例

4.1 需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

4.2 需求分析

d16ac7cd75d64d56b0382e04266759f2.png

注:服务器上线的过程就是Zookeeper集群创建节点的过程.

服务器和客户端相对于Zookeeper都是 “客户端”,只不过服务器是创建节点的操作,客户端是监听节点的操作(一旦那个节点不存在了,下次就不去访问这个节点了)

4.3 具体 实现

(1)先在集群上创建/servers 节点

[zk: localhost:2181(CONNECTED) 10] create /servers "servers"
Created /servers

(2)在 Idea 中创建包名:com.rg.case1

(3)服务器端向 Zookeeper 注册代码

public class DistributeServer {
    private String connectString  = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private int sessionTimeout = 2000;
    private ZooKeeper zk;
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        DistributeServer server = new DistributeServer();
        //1.获取zk连接 ==>连接zk客户端.
        server.getConnect();
        //2.注册服务器到zk集群  在/servers下创建节点
        server.regist(args[0]);
        //3.启动业务逻辑(睡觉)
        server.business();
    }
    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }
    private void regist(String hostname) throws KeeperException, InterruptedException {
        // 节点类型应该是 临时的(上线创建节点,下线节点消失),有序的(可以得知服务器上线的顺序)
        String create = zk.create("/servers/" + hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname+"is online");
    }
    private void getConnect() throws IOException {
        //ctrl+alt+f
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
            }
        });
    }
}

(4)客户端代码

public class DistributeClient {
    private String connectString  = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private int sessionTimeout = 2000;
    private ZooKeeper zk;
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributeClient client = new DistributeClient();
        //1.获取zk连接
        client.getConnect();
        //2.监听 /servers/下面子节点的增加和删除==>监听服务器的上线下线情况
        client.getServerList();
        //3.业务逻辑(睡觉)
        client.business();
    }
    private void getServerList() throws KeeperException, InterruptedException {
        //进行注册监听器..
        List <String> children = zk.getChildren("/servers", true);
        //存放 /servers下的节点
        List <String> servers = new ArrayList <>();
        for (String child : children) {
            //获取该节点上的内容 不适用监听器
            byte[] data = zk.getData("/servers/" + child, false, null);
            servers.add(new String(data));
        }
        System.out.println(servers);
    }
    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }
    private void getConnect() throws IOException {
        //ctrl+alt+f
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    //注册一次监听一次..
                    getServerList();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

4.4 测试

1)在 Linux 命令行上操作增加减少服务器

(1)启动 DistributeClient 客户端

(2)在 hadoop102 上 zk 的客户端/servers 目录上创建临时带序号节点

[zk: localhost:2181(CONNECTED) 1] create -e -s
/servers/hadoop102 "hadoop102"
[zk: localhost:2181(CONNECTED) 2] create -e -s
/servers/hadoop103 "hadoop103"

(3)观察 Idea 控制台变化

[hadoop102, hadoop103]

(4)执行删除操作

[zk: localhost:2181(CONNECTED) 8] delete
/servers/hadoop1020000000000

(5)观察 Idea 控制台变化

[hadoop103]


2)在 Idea 上操作增加减少服务器

(1)启动 DistributeClient 客户端(如果已经启动过,不需要重启)

(2)启动 DistributeServer 服务


①点击 Edit Configurations…

8514214507db4994b937efe5d7392d8b.png

②在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102

4845c8694df94b04b6fa8181bef7530c.png

③ 回 到 DistributeServer 的 main 方 法 , 右 键 , 在 弹 出 的 窗 口 中 点 击 Run“DistributeServer.main()”

c844c753ce7049e5911efab41802fb9c.png

④观察 DistributeServer 控制台,提示 hadoop102 is working

⑤观察 DistributeClient 控制台,提示 hadoop102 已经上线


第 5 章 ZooKeeper 分布式锁案例

什么叫做分布式锁呢?


比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。


33c4c880fdd641c39b614dae2f3eddff.png

5.1 原生 Zookeeper 实现

1)分布式锁实现

/**
 * CountDownLatch典型用法:1、某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n),
 * 每当一个任务线程执行完毕,就将计数器减1 .countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。
 * 一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行
 *
 */
public class DistributedLock {
    private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private final int sessionTimeout = 2000;
    private final ZooKeeper zk;
    //CountDownLatch 用来等待 连接成功
    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);
    private String currentMode;
    private String waitPath;
    //分布式锁的初始化
    public DistributedLock() throws IOException, InterruptedException, KeeperException {
        //获取连接
         zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                //connection 如果连接上zk 可以释放
                // 如果监听的状态 是 连接上的状态,则释放connectLatch,继续往下执行.
                if(event.getState()==Event.KeeperState.SyncConnected){
                    connectLatch.countDown();
                }
                //waitLatch 需要释放
                //如果监听到了 监听路径的节点删除操作. 并且该操作的路径是当前节点的上一个节点,则释放waitLatch
                if(event.getType()==Event.EventType.NodeDeleted && event.getPath().equals(waitPath)){
                    waitLatch.countDown();
                }
            }
        });
         //等待zk正常连接后,往下走程序
        connectLatch.await();
        //判断根节点 /locks 是否存在
        Stat stat = zk.exists("/locks", false);
        if(stat==null){
            //创建下一个根节点
            zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }
    //对zk加锁
    public void zklock() throws KeeperException, InterruptedException {
        //创建对应的临时带序号节点(目的是对资源进行操作.)
        currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        //判断创建的节点是否是最小的序号节点,如果是获取到锁; 如果不是,监听序号前一个节点.
        List <String> children = zk.getChildren("/locks", false);
        //如果children只有一个值, 那就直接获取锁;  如果有多个节点,则需要判断谁最小.
        if(children.size()==1){
            return;
        }else{
            Collections.sort(children);
            //获取节点名称 seq-000000
            /**
             * substring(int biginIndex)
             * substring(int biginIndex,int endIndex)
             */
            String thisNode = currentMode.substring("/locks/".length());
            //通过seq-000000获取该节点在children集合的位置
            int index = children.indexOf(thisNode);
            //判断
            if(index==-1){
                System.out.println("数据异常...");
            }else if(index == 0){
                //如果当前的是序号最小的节点,则直接获取锁
                return;
            }else{//当前的节点并不是序号最小的
                //需要监听它前一个结点的变化
                waitPath = "/locks/" + children.get(index - 1);
                zk.getData(waitPath,true,null);
                //等待前一个节点操作完成,监听结束,本节点再获取锁.
                waitLatch.await();
                return;
            }
        }
    }
    //解锁
    public void unZkLock() throws KeeperException, InterruptedException {
        // 操作处理完毕要解锁----删除当前节点.
        zk.delete(this.currentMode,-1);
    }
}

2)分布式锁 测试

public class DistributedLockTest {
    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
        final DistributedLock lock1 = new DistributedLock();
        final DistributedLock lock2 = new DistributedLock();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.zklock();
                    System.out.println("线程1启动, 获取到锁");
                    System.out.println("线程1使用资源中...");
                    Thread.sleep(5*1000);
                    lock1.unZkLock();
                    System.out.println("线程1使用资源完毕,释放锁");
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.zklock();
                    System.out.println("线程2启动, 获取到锁");
                    System.out.println("线程2使用资源中...");
                    Thread.sleep(5*1000);
                    lock2.unZkLock();
                    System.out.println("线程2使用资源完毕,释放锁");
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

(1)创建两个线程

(2)观察控制台变化:

8021bae853fc446e9e6e2b5b6c2efa48.png

5.2 Curator 框架实现分布式锁案例

1) 原生的 Java API 开发存在的问题

(1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch

(2)Watch 需要重复注册,不然就不能生效

(3)开发的复杂性还是比较高的

(4)不支持多节点删除和创建。需要自己去递归

2) Curator 是一个专门解决分布式锁的框架,解决了原生Java API 开发分布式遇到的问题。

详情请查看官方文档:https://curator.apache.org/index.html

3 )Curator 案例实操

(1)添加依赖

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>4.3.0</version>
        </dependency>

2)代码实现

public class CuratorLockTest {
    public static void main(String[] args) {
        //创建分布式锁1
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(),"/locks");
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(),"/locks");
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.acquire();//获取锁
                    System.out.println("线程1获取到锁");
                    lock1.acquire();
                    System.out.println("线程1再次获取到锁");
                    Thread.sleep(5*1000);
                    lock1.release();//释放锁
                    System.out.println("线程1释放锁..");
                    lock1.release();
                    System.out.println("线程1再次释放锁...");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.acquire();//获取锁
                    System.out.println("线程2获取到锁");
                    lock2.acquire();
                    System.out.println("线程2再次获取到锁");
                    Thread.sleep(5*1000);
                    lock2.release();//释放锁
                    System.out.println("线程2释放锁..");
                    lock2.release();
                    System.out.println("线程2再次释放锁...");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
    private static CuratorFramework getCuratorFramework() {
        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181,hadoop104:2181")
                .connectionTimeoutMs(2000) //设置连接超时时间
                .sessionTimeoutMs(2000)
                .retryPolicy(policy).build(); //尝试策略
        //启动客户端
        client.start();
        System.out.println("Zookeeper启动成功..");
        return client;
    }
}

(2)观察控制台变化:

09ae2133d7d948c9a8902f120e0daee0.png

第 6 章 企业面试真题(面试 重点) )

6.1 选举 机制


半数机制,超过半数的投票通过,即通过。

(1)第一次启动选举规则:

投票过半数时,服务器 id 大的胜出

(2)第二次启动选举规则:

①EPOCH 大的直接胜出

②EPOCH 相同,事务 id 大的胜出

③事务 id 相同,服务器 id 大的胜出

6.2 生产集群至少安装多少 zk 合适?


安装奇数台。

生产经验:

⚫ 10 台服务器:3 台 zk;

⚫ 20 台服务器:5 台 zk;

⚫ 100 台服务器:11 台 zk;

⚫ 200 台服务器:11 台 zk

服务器台数多:好处,提高可靠性;坏处:提高通信延时


6.3 常用命令

ls、get、create、delete


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
监控 Dubbo 网络协议
【SpringBoot学习笔记 十四】SpringBoot+Dubbo+Zookeeper集成开发(下)
【SpringBoot学习笔记 十四】SpringBoot+Dubbo+Zookeeper集成开发(下)
189 0
|
消息中间件 监控 Dubbo
【SpringBoot学习笔记 十四】SpringBoot+Dubbo+Zookeeper集成开发
【SpringBoot学习笔记 十四】SpringBoot+Dubbo+Zookeeper集成开发
224 0
|
存储 负载均衡 Java
|
缓存 监控 网络协议
MSE 风险分布管理功能发布(二)| 学习笔记
快速学习 MSE 风险分布管理功能发布。
MSE 风险分布管理功能发布(二)| 学习笔记
|
API 数据安全/隐私保护
|
数据可视化 Dubbo Java
MSE 微服务测试---自动化回归最佳实践|学习笔记
快速学习 MSE 微服务测试---自动化回归最佳实践
MSE 微服务测试---自动化回归最佳实践|学习笔记
|
敏捷开发 弹性计算 Kubernetes
MSE 开发环境隔离功能介绍|学习笔记(二)
快速学习 MSE 开发环境隔离功能介绍
MSE 开发环境隔离功能介绍|学习笔记(二)
|
弹性计算 Kubernetes 安全
MSE 开发环境隔离功能介绍|学习笔记(一)
快速学习 MSE 开发环境隔离功能介绍
MSE 开发环境隔离功能介绍|学习笔记(一)
|
缓存 弹性计算 安全
MSE Nacos 配置安全最佳实践|学习笔记(二)
快速学习 MSE Nacos 配置安全最佳实践
MSE Nacos 配置安全最佳实践|学习笔记(二)
|
3月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2