一、概述
1、为什么使用zk
随着应用规模的迅速扩张,单台机器的部署已经难以支撑用户大规模、高并发的请求了, 因此服务化、集群化、分布式概念应运而生。 针对这种场景,人们通常使用的做法就是将软件按照模块进行拆分,形成独立的子系统,然后在局域网内部署到多台机器上面, 形成了一个集群。 这种方式即可以分滩请求压力,又可以起到灾备的效果。
然而, 集群的维护和多节点应用程序的协作运行远比单机模式复杂,需要顾及到的细节问题实在太多,比如说同一分配置在多台机器上的同步, 客户端程序实时感知服务机状态,应用与应用之间的公共资源的互斥访问等等一系列的问题。 如果这些问题都依靠开发人员或维护人员去解决的话, 非旦消耗人力,而且也达不到实时准确的效果。
所幸的是,zookeeper能够给我们非常完美的解决这些问题,zookeeper天生的就是为解决分布式协调服务这个问题而来, 应用zookeeper,能够非常好的解决如下问题:
- 配置信息同步
- 分布式锁控制
- 消息的发布与订阅(典型的生产者消费者模型)
- 集群内节点状态的快速感知
2、概念
大数据生态系统里的很多组件的命名都是某种动物或者昆虫, 比如hadoop就是大象,hive是蜜蜂。zookeeper即动物园管理者, 顾名思义就是管理大数据生态系统各组件的管理员
Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。
3、节点模型
每个目录称为一个znode,具有唯一的路径标识
每个znode可以存储1MB数据
每个znode是有版本的,每个znode存储的数据可以设置版本,也就是说一个访问路径可以有多份数据
znode可以被监控包括这个目录节点数据修改以及子节点变化,一旦变化可以通知设置监控的客户端
4、特点
5、应用场景
提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。
统一命名服务
统一配置管理
统一集群管理
服务动态上下线
负载均衡管理
二、安装
1、window安装
官网下载:Apache ZooKeeper
找到页面中的in the achive链接打开
打开选择要下载的版本,这里选择3.7.0,注意从3.5.7开始要下载带bin的tar.gz
即apache-zookeeper-3.7.0-bin.tar.gz
下载下来解压,解压工具解压不了使用window自带的powershell解压即可
进入安装目录下的bin直接执行zkServer.cmd命令启动发现有错误,原因是默认配置是liunx配置,所以需要改配置
打开conf目录,将zoo_sample.cfg文件复制一份,文件名zoo.cfg
并且对zoo.cfg文件中的dataDir修改 dataDir=../data
然后在bin同级目录下创建data文件夹
进入bin目录,cmd中执行zkServer.cmd命令启动zk,然后重新打开一个cmd执行zkCli.cmd命令
查看根节点下的目录ls /
2、linux安装
下载:https://archive.apache.org/dist/zookeeper/
解压到/opt/module
tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/
修改conf
#(1)将/opt/module/zookeeper-3.4.10/conf这个路径下的zoo_sample.cfg修改为zoo.cfg; [atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg #(2)打开zoo.cfg文件,修改dataDir路径: [atguigu@hadoop102 zookeeper-3.4.10]$ vim zoo.cfg 修改如下内容: dataDir=/opt/module/zookeeper-3.4.10/zkData #(3)在/opt/module/zookeeper-3.4.10/这个目录上创建zkData文件夹 [atguigu@hadoop102 zookeeper-3.4.10]$ mkdir zkData # 默认不会给你创建
启动
#(1)启动Zookeeper [atguigu@hadoop102 zookeeper-3.4.10]$ /opt/module/zookeeper-3.4.10/bin/zkServer.sh start #(2)查看进程是否启动 [atguigu@hadoop102 zookeeper-3.4.10]$ jps 4020 Jps 4001 QuorumPeerMain # 这个是 #(3)查看状态: [atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg Mode: standalone #(4)启动客户端: [atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkCli.sh #(5)退出客户端: [zk: localhost:2181(CONNECTED) 0] quit #(6)停止Zookeeper [atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh stop
3、集群安装
1.集群规划 在hadoop102、hadoop103和hadoop104三个节点上部署Zookeeper。 2.解压安装 #(1)解压Zookeeper安装包到/opt/module/目录下 [atguigu@hadoop102 software]$ tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/ #(2)同步/opt/module/zookeeper-3.4.10目录内容到hadoop103、hadoop104 [atguigu@hadoop102 module]$ xsync zookeeper-3.4.10/ 3.配置服务器编号 #(1)在/opt/module/zookeeper-3.4.10/这个目录下创建zkData [atguigu@hadoop102 zookeeper-3.4.10]$ mkdir -p zkData #(2)在/opt/module/zookeeper-3.4.10/zkData目录下创建一个myid的文件 [atguigu@hadoop102 zkData]$ touch myid #添加myid文件,注意一定要在linux里面创建,在notepad++里面很可能乱码 #(3)编辑myid文件 [atguigu@hadoop102 zkData]$ vi myid #在文件中添加与server对应的编号: 2 #(4)拷贝配置好的zookeeper到其他机器上 [atguigu@hadoop102 zkData]$ xsync myid 并分别在hadoop102、hadoop103上修改myid文件中内容为3、4 4.配置zoo.cfg文件 #(1)重命名/opt/module/zookeeper-3.4.10/conf这个目录下的zoo_sample.cfg为zoo.cfg [atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg [atguigu@hadoop102 conf]$ vim zoo.cfg #修改数据存储路径配置 dataDir=/opt/module/zookeeper-3.4.10/zkData # client端口为2181,即如kafka用2181与zk交互 clientPort=2181 #增加如下配置 #######################cluster########################## server.2=hadoop102:2888:3888 server.3=hadoop103:2888:3888 server.4=hadoop104:2888:3888 #server.第几号服务器=服务器的ip:服务器与集群leader交互端口2888:选举leader的端口3888 # 这个第几号服务器是上面myid的内容 #(3)同步zoo.cfg配置文件 [atguigu@hadoop102 conf]$ xsync zoo.cfg #(4)配置参数解读
集群操作
#(1)分别启动Zookeeper [atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh start [atguigu@hadoop103 zookeeper-3.4.10]$ bin/zkServer.sh start [atguigu@hadoop104 zookeeper-3.4.10]$ bin/zkServer.sh start #(2)查看状态 [atguigu@hadoop102 zookeeper-3.4.10]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg Mode: follower [atguigu@hadoop103 zookeeper-3.4.10]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg Mode: leader [atguigu@hadoop104 zookeeper-3.4.5]# bin/zkServer.sh status JMX enabled by default Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg Mode: follower # 也可以通过以下方式加入到集群 ./zkCli.sh ‐server 192.168.60.130:2181 ./zkCli.sh ‐server 192.168.60.130:2182 ./zkCli.sh ‐server 192.168.60.130:2183
注:配置参数
Zookeeper中的配置文件zoo.cfg中参数含义解读如下:
1.tickTime =2000:通信心跳数,Zookeeper服务器与客户端心跳时间,单位毫秒
Zookeeper使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime时间就会发送一个心跳,时间单位为毫秒。
它用于心跳机制,并且设置最小的session超时时间为两倍心跳时间。(session的最小超时时间是2*tickTime)
2.initLimit =10:LF初始通信时限
集群中的Follower跟随者服务器与Leader领导者服务器之间初始连接时能容忍的最多心跳数(tickTime的数量),用它来限定集群中的Zookeeper服务器连接到Leader的时限。
3.syncLimit =5:LF同步通信时限
集群中Leader与Follower之间的最大响应时间单位,假如响应超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer。
4.dataDir:数据文件目录+数据持久化路径
主要用于保存Zookeeper中的数据。
5.clientPort =2181:客户端连接端口,即如kafka用2181与zk交互
监听客户端连接的端口。
6 、server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888
server.A=B:C:D。
A是一个数字,表示这个是第几号服务器;
集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
B是这个服务器的ip地址;
C是这个服务器与集群中的Leader服务器交换信息的端口;
D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
4、zk客户端可视化工具
4.1、ZooInspector
下载地址
https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip
解压后进入目录ZooInspector\build,运行zookeeper-dev-ZooInspector.jar
4.2、taokeeper
基于zookeeper的监控管理工具taokeeper,由淘宝团队开源的zk管理中间件,
安装前要求服务前先配置nc 和 sshd
1.下载数据库脚本
wget https://github.com/downloads/alibaba/taokeeper/taokeeper.sql
三、命令行操作
1、节点类型
zookeeper中的节点有两种, 分别为临时节点和永久节点。 节点的类型在创建时即被确定, 并且不能改变。
PERSISTENT–持久化目录节点:客户端与zookeeper断开连接后,该节点依旧存在;
PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点:客户端与zookeeper断开连接后,该节点依旧存在,允许重复创建相同key,只是Zookeeper给该节点名称进行顺序编号;
EPHEMERAL-临时目录节点:客户端与zookeeper断开连接后,该节点被删除。。 虽然每个临时的Znode都会绑定到一个客户端会话, 但他们对所有的客户端还是可见的。另外, ZooKeeper的临时节点不允许拥有子节点。;
EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点:客户端与zookeeper断开连接后(一旦会话(Session)结束),该节点被删除,允许重复创建相同key,只是Zookeeper给该节点名称进行顺序编号;
2、zookeeper常用shell命令
2.1、命令概述
2.2、查看当前znode信息
#查看当前znode信息 [zk: localhost:2181(CONNECTED) 3] ls / [dubbo, sanguo0000000003, services, zookeeper]
2.3、node详情
ls -s /命令或者stat / 命令都行
[zk: localhost:2181(CONNECTED) 2] ls -s / [dubbo, sanguo0000000003, services, zookeeper] cZxid = 0x0 ctime = Thu Jan 01 08:00:00 CST 1970 mZxid = 0x0 mtime = Thu Jan 01 08:00:00 CST 1970 pZxid = 0x66 cversion = 4 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 4
1)czxid-创建节点的事务zxid
每次修改ZooKeeper状态都会收到一个zxid形式的时间戳,也就是ZooKeeper事务ID。
事务ID是ZooKeeper中所有修改总的次序。每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生。
2)ctime - znode被创建的毫秒数(从1970年开始)
3)mzxid - znode最后更新的事务zxid
4)mtime - znode最后修改的毫秒数(从1970年开始)
5)pZxid-znode最后更新的子节点zxid
6)cversion - znode子节点变化号,znode子节点修改次数
7)dataversion - znode数据变化号
8)aclVersion - znode访问控制列表的变化号
9)ephemeralOwner- 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。
10)dataLength- znode的数据长度
11)numChildren - znode子节点数量
2.4、创建无序永久节点
[zk: localhost:2181(CONNECTED) 3] create /sanguo/shuguo "liubei"
2.5、创建有序永久节点
[zk: localhost:2181(CONNECTED) 2] create -s /sanguo/weiguo/zhangliao "zhangliao" Created /sanguo/weiguo/zhangliao0000000000 [zk: localhost:2181(CONNECTED) 3] create -s /sanguo/weiguo/zhangliao "zhangliao" Created /sanguo/weiguo/zhangliao0000000001 [zk: localhost:2181(CONNECTED) 4] create -s /sanguo/weiguo/xuchu "xuchu" Created /sanguo/weiguo/xuchu0000000002
如果原来没有序号节点,序号从 0 开始依次递增。如果原节点下已有 2 个节点,则再排序时从 2 开始,以此类推。
2.6、创建 无序号临时节点
[zk: localhost:2181(CONNECTED) 7] create -e /sanguo/wuguo "zhouyu"
当前客户端可以查看值 ,退出客户端重新进入值删除了
2.7、创建有序号临时节点
[zk: localhost:2181(CONNECTED) 2] create -e -s /sanguo/wuguo "zhouyu"
2.8、修改节点值
[zk: localhost:2181(CONNECTED) 6] set /sanguo/weiguo "simayi"
2.9、删除节点
[zk: localhost:2181(CONNECTED) 4] delete /sanguo/jin
2.10、递归删除节点
[zk: localhost:2181(CONNECTED) 15] deleteall /sanguo/shuguo
2.11、节点监听
查看监听原理节
四、javaApi操作
1、原生api
创建maven引入依赖
<dependencies> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.14.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.7.0</version> </dependency> </dependencies>
需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
1.1、创建zk客户端
public class TestZookeeper { private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183"; private int sessionTimeout = 2000; private ZooKeeper zooKeeper; @Test public void init() throws IOException { zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); } }
1.2、创建子节点
public class TestZookeeper { // private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183"; private String connectionString = "127.0.0.1:2181"; private int sessionTimeout = 2000; private ZooKeeper zkClient; @Before public void init() throws IOException { zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); } //创建节点 @Test public void create() throws InterruptedException, KeeperException { // 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型 String path = zkClient.create("/atguigu", "chenchen".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(path); } }
1.3、获取子节点并监听节点变化
public class TestZookeeper { // private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183"; private String connectionString = "127.0.0.1:2181"; private int sessionTimeout = 2000; private ZooKeeper zkClient; @Before public void init() throws IOException { zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) {//监听节点路径的变化 List<String> children = null; try { System.out.println("********************************"); children = zkClient.getChildren("/", true); for (int i = 0; i < children.size(); i++) { System.out.println(children.get(i)); } System.out.println("********************************"); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } //获取子节点并监控节点的变化 @Test public void getDataAndWatch() throws InterruptedException, KeeperException, IOException { List<String> children = zkClient.getChildren("/", true); for (int i = 0; i < children.size(); i++) { System.out.println(children.get(i)); } //让程序不要结束 System.in.read(); } }
1.4、判断znode是否存在
public class TestZookeeper { // private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183"; private String connectionString = "127.0.0.1:2181"; private int sessionTimeout = 2000; private ZooKeeper zkClient; @Before public void init() throws IOException { zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { List<String> children = null; try { System.out.println("********************************"); children = zkClient.getChildren("/", true); for (int i = 0; i < children.size(); i++) { System.out.println(children.get(i)); } System.out.println("********************************"); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } //判断节点是否存在 @Test public void exist() throws InterruptedException, KeeperException { Stat exists = zkClient.exists("/dubbo", true); if (exists == null) { System.out.println("没有该节点"); }else{ System.out.println("该节点存在"); } } }
1.5、删除znode
public class TestZookeeper { // private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183"; private String connectionString = "127.0.0.1:2181"; private int sessionTimeout = 2000; private ZooKeeper zkClient; @Before public void init() throws IOException { zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) {//监听节点路径的变化 List<String> children = null; try { System.out.println("********************************"); children = zkClient.getChildren("/", true); for (int i = 0; i < children.size(); i++) { System.out.println(children.get(i)); } System.out.println("********************************"); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } //删除节点 @Test public void delete() throws InterruptedException, KeeperException { zkClient.delete("/atguigu2",-1); } }
2、zookeeper 开源客户端curator
Curator 是 Apache ZooKeeper 的Java客户端库。Curator 项目的目标是简化 ZooKeeper 客户端的使用。
2.1、创建连接
创建maven项目引入依赖和日志文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.test</groupId> <artifactId>curator-zk</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> <!--curator--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> <!--日志--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.21</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
log4j.properties
log4j.rootLogger=off,stdout log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target = System.out log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern = [%d{yyyy-MM-dd HH/:mm/:ss}]%-5p %c(line/:%L) %x-%m%n
连接zk
@Before public void testConnect() { //重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); //2.第二种方式 //CuratorFrameworkFactory.builder(); client = CuratorFrameworkFactory.builder() .connectString("192.168.200.130:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy) .namespace("test") .build(); //开启连接 client.start(); }
2.2、节点创建
/** * 创建节点:create 持久 临时 顺序 数据 * 1. 基本创建 :create().forPath("") * 2. 创建节点 带有数据:create().forPath("",data) * 3. 设置节点的类型:create().withMode().forPath("",data) * 4. 创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath("",data) */ @Test public void testCreate() throws Exception { //2. 创建节点 带有数据 //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储 String path = client.create().forPath("/app2", "hehe".getBytes()); System.out.println(path); } @Test public void testCreate2() throws Exception { //1. 基本创建 //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储 String path = client.create().forPath("/app1"); System.out.println(path); } @Test public void testCreate3() throws Exception { //3. 设置节点的类型 //默认类型:持久化 String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3"); System.out.println(path); } @Test public void testCreate4() throws Exception { //4. 创建多级节点 /app1/p1 //creatingParentsIfNeeded():如果父节点不存在,则创建父节点 String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1"); System.out.println(path); }
2.3、节点查看
/** * 查询节点: * 1. 查询数据:get: getData().forPath() * 2. 查询子节点: ls: getChildren().forPath() * 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath() */ @Test public void testGet1() throws Exception { //1. 查询数据:get byte[] data = client.getData().forPath("/app1"); System.out.println(new String(data)); } @Test public void testGet2() throws Exception { // 2. 查询子节点: ls List<String> path = client.getChildren().forPath("/"); System.out.println(path); } @Test public void testGet3() throws Exception { Stat status = new Stat(); System.out.println(status); //3. 查询节点状态信息:ls -s client.getData().storingStatIn(status).forPath("/app1"); System.out.println(status); }
2.4、节点删除
/** * 删除节点: delete deleteall * 1. 删除单个节点:delete().forPath("/app1"); * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1"); * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2"); * 4. 回调:inBackground * @throws Exception */ @Test public void testDelete() throws Exception { // 1. 删除单个节点 client.delete().forPath("/app1"); } @Test public void testDelete2() throws Exception { //2. 删除带有子节点的节点 client.delete().deletingChildrenIfNeeded().forPath("/app4"); } @Test public void testDelete3() throws Exception { //3. 必须成功的删除 client.delete().guaranteed().forPath("/app2"); } @Test public void testDelete4() throws Exception { //4. 回调 client.delete().guaranteed().inBackground(new BackgroundCallback(){ @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("我被删除了~"); System.out.println(event); } }).forPath("/app1"); }
2.5、节点修改
/** * 修改数据 * 1. 基本修改数据:setData().forPath() * 2. 根据版本修改: setData().withVersion().forPath() * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。 * * @throws Exception */ @Test public void testSet() throws Exception { client.setData().forPath("/app1", "itcast".getBytes()); } @Test public void testSetForVersion() throws Exception { Stat status = new Stat(); //3. 查询节点状态信息:ls -s client.getData().storingStatIn(status).forPath("/app1"); int version = status.getVersion();//查询出来的 3 System.out.println(version); client.setData().withVersion(version).forPath("/app1", "hehe".getBytes()); }
2.6、节点监听
ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。
ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便
需要开发人员自己反复注册Watcher,比较繁琐。
Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。
ZooKeeper提供了三种Watcher:
NodeCache : 只是监听某一个特定的节点
PathChildrenCache : 监控一个ZNode的子节点.
TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合
/** * 演示 NodeCache:给指定一个节点注册监听器 */ @Test public void testNodeCache() throws Exception { //1. 创建NodeCache对象 final NodeCache nodeCache = new NodeCache(client,"/app1"); //2. 注册监听 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("节点变化了~"); //获取修改节点后的数据 byte[] data = nodeCache.getCurrentData().getData(); System.out.println(new String(data)); } }); //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据 nodeCache.start(true); while (true){ } } /**演示PathChildrenCache */ @Test public void testPathChildrenCache() throws Exception { //1.创建监听对象 PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true); //2. 绑定监听器 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("子节点变化了~"); System.out.println(event); //监听子节点的数据变更,并且拿到变更后的数据 //1.获取类型 PathChildrenCacheEvent.Type type = event.getType(); //2.判断类型是否是update if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ System.out.println("数据变了!!!"); byte[] data = event.getData().getData(); System.out.println(new String(data)); } } }); //3. 开启 pathChildrenCache.start(); while (true){ } } /** * 演示 TreeCache:监听某个节点自己和所有子节点们 */ @Test public void testTreeCache() throws Exception { //1. 创建监听器 TreeCache treeCache = new TreeCache(client,"/app2"); //2. 注册监听 treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { System.out.println("节点变化了"); System.out.println(event); } }); //3. 开启 treeCache.start(); while (true){ } }
2.7、分布式锁
在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。
但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。
那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁
核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。
客户端获取锁时,在lock节点下创建临时顺序节点。
然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
如果发现比自己小的那个节点被删除,则客户端的
Watcher会收到相应通知,此时再次判断自己创建的节点
是否是lock子节点中序号最小的,如果是则获取到了锁,
如果不是则重复以上步骤继续获取到比自己小的一个节点
并注册监听。
分布式锁-模拟12306售票案例
Curator实现分布式锁API
在Curator中有五种锁方案:
- InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
- InterProcessMutex:分布式可重入排它锁
- InterProcessReadWriteLock:分布式读写锁
- InterProcessMultiLock:将多个锁作为单个实体管理的容器
- InterProcessSemaphoreV2:共享信号量
创建线程进行加锁设置
public class Ticket12306 implements Runnable{ private int tickets = 10;//数据库的票数 private InterProcessMutex lock ; @Override public void run() { while(true){ //获取锁 try { lock.acquire(3, TimeUnit.SECONDS); if(tickets > 0){ System.out.println(Thread.currentThread()+":"+tickets); Thread.sleep(100); tickets--; } } catch (Exception e) { e.printStackTrace(); }finally { //释放锁 try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } } }
创建连接,并且初始化锁
public Ticket12306(){ //重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); //2.第二种方式 //CuratorFrameworkFactory.builder(); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("192.168.149.135:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy) .build(); //开启连接 client.start(); lock = new InterProcessMutex(client,"/lock"); }
运行多个线程进行测试
public class LockTest { public static void main(String[] args) { Ticket12306 ticket12306 = new Ticket12306(); //创建客户端 Thread t1 = new Thread(ticket12306,"携程"); Thread t2 = new Thread(ticket12306,"飞猪"); t1.start(); t2.start(); } }
五、内部原理(面试题)
1、选举机制
1)半数机制:集群中半数以上机器存活,集群可用。所以Zookeeper适合安装奇数台服务器。
2)Zookeeper虽然在配置文件中并没有指定Master和Slave。但是,Zookeeper工作时,是有一个节点为Leader,其他则为Follower,Leader是通过内部的选举机制临时产生的。
3)以一个简单的例子来说明整个选举的过程。
假设有五台服务器组成的Zookeeper集群,它们的id从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动,来看看会发生什么,
(1)服务器1启动,此时只有它一台服务器启动了,它发出去的报文没有任何响应,所以它的选举状态一直是LOOKING状态。
(2)服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1、2还是继续保持LOOKING状态。
(3)服务器3启动,根据前面的理论分析,服务器3成为服务器1、2、3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的Leader。
(4)服务器4启动,根据前面的分析,理论上服务器4应该是服务器1、2、3、4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接收当小弟的命了。
(5)服务器5启动,同4一样当小弟。
2、ZooKeeper的监听原理是什么?
2.1、节点值变化监听
在 hadoop104 主机上注册监听/sanguo 节点数据变化
[zk: localhost:2181(CONNECTED) 26] get -w /sanguo
可能因为版本问题上面的命令会报错,可改用:
[zk: localhost:2181(CONNECTED) 26] get /sanguo [watch]
在 hadoop103 主机上修改/sanguo 节点的数据
[zk: localhost:2181(CONNECTED) 1] set /sanguo "xisi"
观察 hadoop104 主机收到数据变化的监听
WATCHER:: WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo
注意:在hadoop103再多次修改/sanguo的值,hadoop104上不会再收到监听。因为注册一次,只能监听一次。想再次监听,需要再次注册。
2.2、节点的子节点变化监听(路径变化)
在 hadoop104 主机上注册监听/sanguo 节点的子节点变化
[zk: localhost:2181(CONNECTED) 1] ls -w /sanguo [shuguo, weiguo]
在 hadoop103 主机/sanguo 节点上创建子节点
[zk: localhost:2181(CONNECTED) 2] create /sanguo/jin "simayi" Created /sanguo/jin
观察 hadoop104 主机收到子节点变化的监听
WATCHER:: WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo
注意:节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。
- 监控数据变化用 set
- 监控子节点变化用 ls
3、写数据流程
4、ZooKeeper的部署方式有哪几种?
集群中的角色有哪些?集群最少需要几台机器?
(1)部署方式单机模式、集群模式
(2)角色:Leader和Follower
(3)集群最少需要机器数:3
六、实战案例
1、服务器动态上下线
需求
现在集群创建server节点
create /servers "servers"
服务器端向Zookeeper注册代码
public class DistributeServer { private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183"; private int sessionTimeout = 2000; private ZooKeeper zkClient; public static void main(String[] args) throws IOException, InterruptedException, KeeperException { DistributeServer server = new DistributeServer(); //1 连接zookeeper集群 server.getConnect(); //2 注册节点 server.regist("test2"); //3 业务逻辑处理 server.business(); } //业务逻辑处理 private void business() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); } //注册节点 private void regist(String hostName) throws InterruptedException, KeeperException { String path = zkClient.create ("/servers/server", hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(path); } //连接zookeeper集群 private void getConnect() throws IOException { zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { List<String> children = null; try { System.out.println("********************************"); children = zkClient.getChildren("/", true); for (int i = 0; i < children.size(); i++) { System.out.println(children.get(i)); } System.out.println("********************************"); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }
客户端代码
public class DistributeClient { private String connectionString = "192.168.145.143:2181,192.168.145.143:2182,192.168.145.143:2183"; private int sessionTimeout = 2000; private ZooKeeper zkClient; public static void main(String[] args) throws IOException, InterruptedException, KeeperException { DistributeClient server = new DistributeClient(); //1 连接zookeeper集群 server.getConnect(); //2 注册节点 server.getChildren(); //3 业务逻辑处理 server.business(); } //注册监听 private void getChildren() throws InterruptedException, KeeperException { List<String> children = zkClient.getChildren("/servers", true); //储存服务器节点主机名称集合 ArrayList<String> hosts = new ArrayList<>(); for (String child : children) { byte[] data = zkClient.getData("/servers/" + child, false, null); hosts.add(String.valueOf(data)); } //将所有在线主机的名称打印出来 System.out.println(hosts); } //业务逻辑处理 private void business() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); } //连接zookeeper集群 private void getConnect() throws IOException { zkClient = new ZooKeeper(connectionString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { try { getChildren(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } }); } }
2、分布式锁案例
2.1、原生zk实现
锁实现
package com.herobin.flink.debug_local.zk; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; public class DistributedLock { private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; // 超时时间 private int sessionTimeout = 2000; private ZooKeeper zk; private String rootNode = "locks"; private String subNode = "seq-"; // 当前 client 等待的子节点 private String waitPath; //ZooKeeper 连接 private CountDownLatch connectLatch = new CountDownLatch(1); //ZooKeeper 节点等待 private CountDownLatch waitLatch = new CountDownLatch(1); // 当前 client 创建的子节点 private String currentNode; // 和 zk 服务建立连接,并创建根节点 public DistributedLock() throws IOException, InterruptedException, KeeperException { zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程 if (event.getState() == Event.KeeperState.SyncConnected) { connectLatch.countDown(); } // 发生了 waitPath 的删除事件 if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) { waitLatch.countDown(); } } }); // 等待连接建立 connectLatch.await(); //获取根节点状态 Stat stat = zk.exists("/" + rootNode, false); //如果根节点不存在,则创建根节点,根节点类型为永久节点 if (stat == null) { System.out.println("根节点不存在"); zk.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } // 加锁方法 public void zkLock() { try { //在根节点下创建临时顺序节点,返回值为创建的节点路径 currentNode = zk.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // wait 一小会, 让结果更清晰一些 Thread.sleep(10); // 注意, 没有必要监听"/locks"的子节点的变化情况 List<String> childrenNodes = zk.getChildren("/" + rootNode, false); // 列表中只有一个子节点, 那肯定就是 currentNode , 说明 client 获得锁 if (childrenNodes.size() == 1) { return; } else { //对根节点下的所有临时顺序节点进行从小到大排序 Collections.sort(childrenNodes); //当前节点名称 String thisNode = currentNode.substring(("/" + rootNode + "/").length()); //获取当前节点的位置 int index = childrenNodes.indexOf(thisNode); if (index == -1) { System.out.println("数据异常"); } else if (index == 0) { // index == 0, 说明 thisNode 在列表中最小, 当前 client 获得锁 return; } else { // 获得排名比 currentNode 前 1 位的节点 this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1); // 在 waitPath 上注册监听器, 当 waitPath 被删除时, zookeeper 会回调监听器的 process 方法 zk.getData(waitPath, true, new Stat()); //进入等待锁状态 waitLatch.await(); return; } } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } // 解锁方法 public void zkUnlock() { try { zk.delete(this.currentNode, -1); } catch (InterruptedException | KeeperException e) { e.printStackTrace(); } } }
锁测试(创建两个线程)
package com.herobin.flink.debug_local.zk; import org.apache.zookeeper.KeeperException; import java.io.IOException; public class DistributedLockTest { public static void main(String[] args) throws InterruptedException, IOException, KeeperException { // 创建分布式锁 1 final DistributedLock lock1 = new DistributedLock(); // 创建分布式锁 2 final DistributedLock lock2 = new DistributedLock(); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { lock1.zkLock(); System.out.println("线程 1 获取锁"); Thread.sleep(5 * 1000); lock1.zkUnlock(); System.out.println("线程 1 释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { // 获取锁对象 try { lock2.zkLock(); System.out.println("线程 2 获取锁"); Thread.sleep(5 * 1000); lock2.zkUnlock(); System.out.println("线程 2 释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
观察运行
线程 1 获取锁 线程 1 释放锁 线程 2 获取锁 线程 2 释放锁
2.2、Curator 框架实现分布式锁案例
查看API Curator章节