3.3.4 获取 子节点 并 监听 节点
@Test public void getChildRen() throws KeeperException, InterruptedException { //true:使用init中创建的监听器. 每次出现变化,将重新调用监听器中的 方法. 也可以自定一个监听器. //监听某个路径的节点变化情况 List <String> children = zkClient.getChildren("/", true); for (String child : children) { System.out.println(child); } //延时 让其一直监听 Thread.sleep(Long.MAX_VALUE); }
1)在 IDEA 控制台上看到如下节点:
zookeeper sanguo atguigu
(2)在 hadoop102 的客户端上创建再创建一个节点/atguigu1,观察 IDEA 控制台
[zk: localhost:2181(CONNECTED) 3] create /atguigu1 "atguigu1"
(3)在 hadoop102 的客户端上删除节点/atguigu1,观察 IDEA 控制台
[zk: localhost:2181(CONNECTED) 4] delete /atguigu1
3.3.5 判断 Znode 是否存在
@Test public void exist() throws KeeperException, InterruptedException { Stat stat = zkClient.exists("/atguigu", false); System.out.println(stat==null ? "not exist ":"exist"); }
3.4 客户端向服务端写数据流程
第 4 章 服务器动态上下线监听案例
4.1 需求
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
4.2 需求分析
注:服务器上线的过程就是Zookeeper集群创建节点的过程.
服务器和客户端相对于Zookeeper都是 “客户端”,只不过服务器是创建节点的操作,客户端是监听节点的操作(一旦那个节点不存在了,下次就不去访问这个节点了)
4.3 具体 实现
(1)先在集群上创建/servers 节点
[zk: localhost:2181(CONNECTED) 10] create /servers "servers" Created /servers
(2)在 Idea 中创建包名:com.rg.case1
(3)服务器端向 Zookeeper 注册代码
public class DistributeServer { private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; private int sessionTimeout = 2000; private ZooKeeper zk; public static void main(String[] args) throws IOException, KeeperException, InterruptedException { DistributeServer server = new DistributeServer(); //1.获取zk连接 ==>连接zk客户端. server.getConnect(); //2.注册服务器到zk集群 在/servers下创建节点 server.regist(args[0]); //3.启动业务逻辑(睡觉) server.business(); } private void business() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); } private void regist(String hostname) throws KeeperException, InterruptedException { // 节点类型应该是 临时的(上线创建节点,下线节点消失),有序的(可以得知服务器上线的顺序) String create = zk.create("/servers/" + hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname+"is online"); } private void getConnect() throws IOException { //ctrl+alt+f zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { } }); } }
(4)客户端代码
public class DistributeClient { private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; private int sessionTimeout = 2000; private ZooKeeper zk; public static void main(String[] args) throws IOException, InterruptedException, KeeperException { DistributeClient client = new DistributeClient(); //1.获取zk连接 client.getConnect(); //2.监听 /servers/下面子节点的增加和删除==>监听服务器的上线下线情况 client.getServerList(); //3.业务逻辑(睡觉) client.business(); } private void getServerList() throws KeeperException, InterruptedException { //进行注册监听器.. List <String> children = zk.getChildren("/servers", true); //存放 /servers下的节点 List <String> servers = new ArrayList <>(); for (String child : children) { //获取该节点上的内容 不适用监听器 byte[] data = zk.getData("/servers/" + child, false, null); servers.add(new String(data)); } System.out.println(servers); } private void business() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); } private void getConnect() throws IOException { //ctrl+alt+f zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { try { //注册一次监听一次.. getServerList(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }
4.4 测试
1)在 Linux 命令行上操作增加减少服务器
(1)启动 DistributeClient 客户端
(2)在 hadoop102 上 zk 的客户端/servers 目录上创建临时带序号节点
[zk: localhost:2181(CONNECTED) 1] create -e -s /servers/hadoop102 "hadoop102" [zk: localhost:2181(CONNECTED) 2] create -e -s /servers/hadoop103 "hadoop103"
(3)观察 Idea 控制台变化
[hadoop102, hadoop103]
(4)执行删除操作
[zk: localhost:2181(CONNECTED) 8] delete /servers/hadoop1020000000000
(5)观察 Idea 控制台变化
[hadoop103]
2)在 Idea 上操作增加减少服务器
(1)启动 DistributeClient 客户端(如果已经启动过,不需要重启)
(2)启动 DistributeServer 服务
①点击 Edit Configurations…
②在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102
③ 回 到 DistributeServer 的 main 方 法 , 右 键 , 在 弹 出 的 窗 口 中 点 击 Run“DistributeServer.main()”
④观察 DistributeServer 控制台,提示 hadoop102 is working
⑤观察 DistributeClient 控制台,提示 hadoop102 已经上线
第 5 章 ZooKeeper 分布式锁案例
什么叫做分布式锁呢?
比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
5.1 原生 Zookeeper 实现
1)分布式锁实现
/** * CountDownLatch典型用法:1、某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n), * 每当一个任务线程执行完毕,就将计数器减1 .countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。 * 一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行 * */ public class DistributedLock { private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; private final int sessionTimeout = 2000; private final ZooKeeper zk; //CountDownLatch 用来等待 连接成功 private CountDownLatch connectLatch = new CountDownLatch(1); private CountDownLatch waitLatch = new CountDownLatch(1); private String currentMode; private String waitPath; //分布式锁的初始化 public DistributedLock() throws IOException, InterruptedException, KeeperException { //获取连接 zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { //connection 如果连接上zk 可以释放 // 如果监听的状态 是 连接上的状态,则释放connectLatch,继续往下执行. if(event.getState()==Event.KeeperState.SyncConnected){ connectLatch.countDown(); } //waitLatch 需要释放 //如果监听到了 监听路径的节点删除操作. 并且该操作的路径是当前节点的上一个节点,则释放waitLatch if(event.getType()==Event.EventType.NodeDeleted && event.getPath().equals(waitPath)){ waitLatch.countDown(); } } }); //等待zk正常连接后,往下走程序 connectLatch.await(); //判断根节点 /locks 是否存在 Stat stat = zk.exists("/locks", false); if(stat==null){ //创建下一个根节点 zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } //对zk加锁 public void zklock() throws KeeperException, InterruptedException { //创建对应的临时带序号节点(目的是对资源进行操作.) currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //判断创建的节点是否是最小的序号节点,如果是获取到锁; 如果不是,监听序号前一个节点. List <String> children = zk.getChildren("/locks", false); //如果children只有一个值, 那就直接获取锁; 如果有多个节点,则需要判断谁最小. if(children.size()==1){ return; }else{ Collections.sort(children); //获取节点名称 seq-000000 /** * substring(int biginIndex) * substring(int biginIndex,int endIndex) */ String thisNode = currentMode.substring("/locks/".length()); //通过seq-000000获取该节点在children集合的位置 int index = children.indexOf(thisNode); //判断 if(index==-1){ System.out.println("数据异常..."); }else if(index == 0){ //如果当前的是序号最小的节点,则直接获取锁 return; }else{//当前的节点并不是序号最小的 //需要监听它前一个结点的变化 waitPath = "/locks/" + children.get(index - 1); zk.getData(waitPath,true,null); //等待前一个节点操作完成,监听结束,本节点再获取锁. waitLatch.await(); return; } } } //解锁 public void unZkLock() throws KeeperException, InterruptedException { // 操作处理完毕要解锁----删除当前节点. zk.delete(this.currentMode,-1); } }
2)分布式锁 测试
public class DistributedLockTest { public static void main(String[] args) throws InterruptedException, IOException, KeeperException { final DistributedLock lock1 = new DistributedLock(); final DistributedLock lock2 = new DistributedLock(); new Thread(new Runnable() { @Override public void run() { try { lock1.zklock(); System.out.println("线程1启动, 获取到锁"); System.out.println("线程1使用资源中..."); Thread.sleep(5*1000); lock1.unZkLock(); System.out.println("线程1使用资源完毕,释放锁"); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { lock2.zklock(); System.out.println("线程2启动, 获取到锁"); System.out.println("线程2使用资源中..."); Thread.sleep(5*1000); lock2.unZkLock(); System.out.println("线程2使用资源完毕,释放锁"); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
(1)创建两个线程
(2)观察控制台变化:
5.2 Curator 框架实现分布式锁案例
1) 原生的 Java API 开发存在的问题
(1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
(2)Watch 需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高的
(4)不支持多节点删除和创建。需要自己去递归
2) Curator 是一个专门解决分布式锁的框架,解决了原生Java API 开发分布式遇到的问题。
详情请查看官方文档:https://curator.apache.org/index.html
3 )Curator 案例实操
(1)添加依赖
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version> </dependency>
2)代码实现
public class CuratorLockTest { public static void main(String[] args) { //创建分布式锁1 InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(),"/locks"); InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(),"/locks"); new Thread(new Runnable() { @Override public void run() { try { lock1.acquire();//获取锁 System.out.println("线程1获取到锁"); lock1.acquire(); System.out.println("线程1再次获取到锁"); Thread.sleep(5*1000); lock1.release();//释放锁 System.out.println("线程1释放锁.."); lock1.release(); System.out.println("线程1再次释放锁..."); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { lock2.acquire();//获取锁 System.out.println("线程2获取到锁"); lock2.acquire(); System.out.println("线程2再次获取到锁"); Thread.sleep(5*1000); lock2.release();//释放锁 System.out.println("线程2释放锁.."); lock2.release(); System.out.println("线程2再次释放锁..."); } catch (Exception e) { e.printStackTrace(); } } }).start(); } private static CuratorFramework getCuratorFramework() { ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3); CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181,hadoop104:2181") .connectionTimeoutMs(2000) //设置连接超时时间 .sessionTimeoutMs(2000) .retryPolicy(policy).build(); //尝试策略 //启动客户端 client.start(); System.out.println("Zookeeper启动成功.."); return client; } }
(2)观察控制台变化:
第 6 章 企业面试真题(面试 重点) )
6.1 选举 机制
半数机制,超过半数的投票通过,即通过。
(1)第一次启动选举规则:
投票过半数时,服务器 id 大的胜出
(2)第二次启动选举规则:
①EPOCH 大的直接胜出
②EPOCH 相同,事务 id 大的胜出
③事务 id 相同,服务器 id 大的胜出
6.2 生产集群至少安装多少 zk 合适?
安装奇数台。
生产经验:
⚫ 10 台服务器:3 台 zk;
⚫ 20 台服务器:5 台 zk;
⚫ 100 台服务器:11 台 zk;
⚫ 200 台服务器:11 台 zk
服务器台数多:好处,提高可靠性;坏处:提高通信延时
6.3 常用命令
ls、get、create、delete