1.1 Zookeeper简介
Zookeeper是Apache开源的致力于开发和维护实现高可用、高可靠的分布式协调服务器。Zookeeper提供了一种集中式服务,用于维护配置信息、服务命名、提供分布式同步以及提供组服务。这一类的服务如果没有统一的管理,那么就在分布式应用运行过程中,会导致不可避免的错误和竞争条件。
1.2 Zookeeper安装
Zookeeper的安装有单机、集群和伪集群三种类型。在开发中只需要搭建单机就可以,但是在生产环境一般为了避免单点故障都会部署集群或者伪集群模式。
由于Zookeeper采用过半投票机制策略,所以集群至少需要保证有3个以上的服务器。那么集群和伪集群有什么区别呢。
集群:在多个独立的物理服务器上各自部署Zookeeper,这样的好处是单台物理服务器故障不会导致整体受影响。但是也增加了网络通信的问题。
伪集群:伪集群和集群其实差不多,只是在同一台物理服务器上部署多个Zookeeper,这样的确定就是如果物理服务器故障,将导致整体不可用。
以下演示的安装是以 Linux环境、CentOS 7系统为基础,那么在使用之前我们需要做一下配置,如果有环境并且已经处理了可以忽略
JDK安装问题,Zookeeper是Java语言编写的,所以安装之前需要确保Linux已经安装了JDK并且可以正常使用
- 使用java -version检查是否有默认安装的JDK,如果有需要卸载
- 将linux的JDK安装包tar.gz上传到服务器
- tar -zxvf jdk-11.0.7_linux-x64_bin.tar.gz解压
- 配置JDK环境变量
- vi /etc/profile打开环境配置文件
#java environment export JAVA_HOME=/usr/local/java/jdk1.8.0_201 export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin
- source /etc/profile让配置文件生效
- java -version如果显示jdk版本信息就是安装成功
防火墙问题,CentOS 7防火墙默认是开启的,那么如果不关闭端口就不可以访问,在代码中调用的时候就无法使用默认的2181和其他端口
# CentOS 6处理方法 //临时关闭 service iptables stop //禁止开机启动 chkconfig iptables off
# CentOS 7处理方法,由于CentOS 7以后的防火墙默认使用firewalld,因此处理方法和CentOS 6有差异 //临时关闭 systemctl stop firewalld //禁止开机启动 systemctl disable firewalld Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service. Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.sersvice.
# 额外提供的命令 # 防火墙相关命令 1、查看防火墙状态 : systemctl status firewalld.service 注:active是绿的running表示防火墙开启 2、关闭防火墙 :systemctl stop firewalld.service 3、开机禁用防火墙自启命令 :systemctl disable firewalld.service 4、启动防火墙 :systemctl start firewalld.service 5、防火墙随系统开启启动 : systemctl enable firewalld.service 6、重启防火墙 : firewall-cmd --reload # 端口开放命令 1、查询已经开放的端口 :firewall-cmd --list-port 2、查询某个端口是否开放 :firewall-cmd --query-port=80/tcp 3、开启端口 :firewall-cmd --zone=public --add-port=80/tcp --permanent 注:可以是一个端口范围,如1000-2000/tcp 4、移除端口 :firewall-cmd --zone=public --remove-port=80/tcp --permanent 5、命令含义: --zone #作用域 --add-port=80/tcp #添加端口,格式为:端口/通讯协议 --remove-port=80/tcp #移除端口,格式为:端口/通讯协议 --permanent #永久生效,没有此参数重启后失效
1.2.1 单机版
- 下载Zookeeper http://zookeeper.apache.org/releases.html 打开这个网址就可以看到不同的版本直接下载即可
- 在/usr/local/目录下创建zookeeper文件夹,用于存放zookeeper。
mkdir zookeeper
- 将下载的zookeeper.tar.gz压缩包上传到Linux的/usr/local/zookeeper目录下
# 解压 tar -zxvf zookeeper-3.4.14.tar.gz
# 进入conf目录,zookeeper启动是读取zoo.cfg配置文件,所以重命名或者拷贝一份都可以 cp zoo_sample.cfg zoo.cfg
# 在zookeeper目录下创建data文件夹,用于存储文件,不使用默认的路径文件夹 mkdir data # 修改conf/zoo.cfg配置文件的dataDir路径,这个是数据存放的文件路径 dataDir=/usr/local/zookeeper/data
# 命令 ./bin/zkServer.sh start # 启动 ./bin/zkServer.sh stop # 暂停 ./bin/zkServer.sh status # 查看状态
# 启动命令执行的效果 [root@localhost zookeeper-3.4.14-2181]# ./bin/zkServer.sh start ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/zookeeper-3.4.14-2181/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
# 查看状态命令执行的效果 [root@localhost zookeeper-3.4.14-2181]# ./bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/zookeeper-3.4.14-2181/bin/../conf/zoo.cfg Mode: standalone # standalone表示单机版本
# 暂停命令执行的效果 [root@localhost zookeeper-3.4.14-2181]# ./bin/zkServer.sh stop ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/zookeeper-3.4.14-2181/bin/../conf/zoo.cfg Stopping zookeeper ... STOPPED
1.2.2 集群版
- 下载http://zookeeper.apache.org/releases.html 打开这个网址就可以看到不同的版本直接下载
- 上传zookeeper.tar.gz压缩包到/usr/local/zookeeper目录下
# 在/usr/local/目录下创建文件夹 mkdir zkcluster
# 解压zookeeper到zkcluster文件夹,-C的作用是指定解压到那个文件夹下 tar -zxvf zookeeper-3.4.14.tar.gz -C /zkcluster
# 修改文件名称,并复制 mv zookeeper-3.4.14 zookeeper01 # 复制2份,集群数量为3 cp -r zookeeper01/ zookeeper02 cp -r zookeeper01/ zookeeper03
# 在每个zookeeper目录下创建data文件夹,并在data目录下创建log文件夹 mkdir data cd data mkdir log
# 修改/conf/zoo_sample.cfg配置文件名 zookeeper01 zookeeper02 zookeeper03都要执行 cp zoo_sample.cfg zoo.cfg
# 修改zoo.cfg配置文件的端口的数据存储就以及日志路径,端口分别是:2181 2182 2183 # 端口 clientPort=2181 # 存储路径 dataDir=/usr/local/zookeeper/zkcluster/zookeeper01/data # 日志路径 dataLogDir=/usr/local/zookeeper/zkcluster/zookeeper01/data/log clientPort=2182 dataDir=/usr/local/zookeeper/zkcluster/zookeeper02/data dataLogDir=/usr/local/zookeeper/zkcluster/zookeeper02/data/log clientPort=2183 dataDir=/usr/local/zookeeper/zkcluster/zookeeper03/data dataLogDir=/usr/local/zookeeper/zkcluster/zookeeper03/data/log
# 配置集群,分别在data目录下创建myid,内容分别是1 2 3用于记录每个服务器的ID,也是集群中zookeeper的唯一标记,不可以重复 touch ./zookeeper01/data/myid touch ./zookeeper02/data/myid touch ./zookeeper03/data/myid
#在每个zookeeper的zoo.cfg文件中配置客户端访问端口(clientPort)和集群服务器IP列表 #server.服务器ID=服务器IP地址:服务器之间通信端口:服务器之间投票选举端口 server.1=192.168.247.100:2881:3881 server.2=192.168.247.100:2882:3882 server.3=192.168.247.100:2883:3883
# 依次启动3个zookeeper,相关命令如下 ./bin/zkServer.sh start ./bin/zkServer.sh stop ./bin/zkServer.sh status
1.3 基本使用命令
启动后打开客户端
./bin/zkCli.sh
如果要连接的是远程的zookeeper,那么使用./bin/zkCli.sh -server ip:port
连接指定的服务器
1.3.1 创建节点
create命令用于创建一个zookeeper节点
create [-s][-e] path data 其中,-s或-e分别指定节点特性,顺序或临时节点,若不指定,则创建持久节点;
在执行创建节点之前,可以使用ls /
命令,那么就会显示[zookeeper]
这个是自动生成的持久节点。
节点需要一级一级的创建,不可以一下子创建多级
创建持久节点
[zk: localhost:2181(CONNECTED) 5] create /zk-seq 123 Created /zk-seq [zk: localhost:2181(CONNECTED) 6] ls / # zk-seq已经创建好 [zk-seq, zookeeper]
创建持久顺序节点
注意:
- 顺序节点创建后zookeeper默认会在节点名称后面拼接序列,这个数字序列用于标记节点的顺序。
- 顺序节点的特点就是节点后面会拼接一串数字表示和非顺序节点的区别。
[zk: localhost:2181(CONNECTED) 3] create -s /zk-test 123 Created /zk-test0000000000 [zk: localhost:2181(CONNECTED) 4] ls / # zk-test已经创建好 [zk-test0000000000, zookeeper]
创建临时节点
注意:
- 临时节点的特点是如果会话断开连接,那么节点就会自动被zookeeper删除。
[zk: localhost:2181(CONNECTED) 7] create -e /zk-temp 123 Created /zk-temp [zk: localhost:2181(CONNECTED) 8] ls / # zk-temp已经创建好 [zookeeper, zk-temp] # 退出关闭会话,然后重新登录看临时节点是否还存在 [zk: localhost:2181(CONNECTED) 9] quit Quitting... 2020-08-08 15:29:51,404 [myid:] - INFO [main:ZooKeeper@693] - Session: 0x1000009d3a80000 closed 2020-08-08 15:29:51,407 [myid:] - INFO [main-EventThread:ClientCnxn$EventThread@522] - EventThread shut down for session: 0x1000009d3a80000 # 重新登录后执行ls命令,可以看到临时节点/zk-temp已经被删除 [zk: localhost:2181(CONNECTED) 0] ls /
创建临时顺序节点
[zk: localhost:2181(CONNECTED) 1] create -s -e /zk-temp 123456 Created /zk-temp0000000003 [zk: localhost:2181(CONNECTED) 2] ls / [zk-temp0000000003, zookeeper]
1.3.2 读取节点
读取节点命令主要有ls path
和get path
。ls 命令可以列出zookeeper指定节点的所有子节点,但是只可以查看子一级目录。get命令可以获取zookeeper指定节点的数据和属性信息。
[zk: localhost:2181(CONNECTED) 4] get /zk-temp0000000003 # 获取临时顺序节点的信息 123456 # 节点数据内容 cZxid = 0x7 # 创建时候的事务ID ctime = Sat Aug 08 15:32:00 CST 2020 # 创建时间 mZxid = 0x7 # 修改时候的事务ID mtime = Sat Aug 08 15:32:00 CST 2020 # 修改时间 pZxid = 0x7 # 最新修改的zxid cversion = 0 # 子节点被修改版本 dataVersion = 0 # 数据版本 aclVersion = 0 # acl权限控制版本 ephemeralOwner = 0x1000009d3a80001 dataLength = 6 # 数据长度 numChildren = 0 # 子节点数
1.3.3 更新节点
更新节点使用set path data [version]
命令。一般不需要指定version版本。
# 以下命令非核心打印的信息省略了 [zk: localhost:2181(CONNECTED) 6] get /zk-seq # 获取节点数据 123 # 数据为123 [zk: localhost:2181(CONNECTED) 7] set /zk-seq 456 # 更新节点数据为456 [zk: localhost:2181(CONNECTED) 8] get /zk-seq # 获取节点数据 456 # 数据已经被更新为456
1.3.4 删除节点
删除节点使用delete path [version]
命令。
[zk: localhost:2181(CONNECTED) 0] ls / # 查看节点 [zk-seq, zk-test0000000000, zookeeper] [zk: localhost:2181(CONNECTED) 1] delete /zk-seq # 删除/zk-seq节点 [zk: localhost:2181(CONNECTED) 2] ls / # 查看节点/zk-seq节点已经不存在 [zk-test0000000000, zookeeper]
1.4 基本API操作Zookeeper
Zookeeper作为一个分布式框架,主要用于解决分布式一致性问题。所以提供了基于各种版本的原生API,那么下面我们就学习一下Zookeeper的Java API如何操作Zookeeper。
引入Jar
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.14</version> </dependesncy>
1.4.1 创建会话
在进行Zookeeper的节点操作之前,我们需要和Zookeeper创建一个会话连接的过程。但是Zookeeper的原生API进行会话连接是异步操作的,如果你和Zookeeper创建连接,那么Zookeeper会立刻返回给你。但是最终连接成功后会异步的通知。
// 类实现Watcher接口,Zookeeper基于这个异步通知 public class CreateSession implements Watcher { // 使用同步操作阻塞等待Zookeeper异步的通知 private static CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException { // 参数一:Zookeeper的ip:port // 参数二:连接超时时间 // 参数三:异步通知 ZooKeeper zooKeeper = new ZooKeeper("192.168.247.100:2181", 5000, new CreateSession()); System.out.println(zooKeeper.getState()); countDownLatch.await(); System.out.println("zookeeper会话创建成功......"); } // 实现异步通知回调的方法 @Override public void process(WatchedEvent watchedEvent) { // 异步通知的事件类型有很多,只有是创建会话成功的才放行,其他的不处理 if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { countDownLatch.countDown(); } } }
1.4.2 创建节点
public class CreateNode implements Watcher { private static CountDownLatch countDownLatch = new CountDownLatch(1); private static ZooKeeper zooKeeper; public static void main(String[] args) throws Exception { zooKeeper = new ZooKeeper("192.168.247.100:2181", 5000, new CreateNode()); System.out.println(zooKeeper.getState()); countDownLatch.await(); System.out.println("zookeeper会话创建成功......"); // 创建节点 createNodeSync(); Thread.sleep(Integer.MAX_VALUE); } /** * 创建节点 * 节点的类型: * CreateMode.PERSISTENT 持久节点 * CreateMode.PERSISTENT_SEQUENTIAL 持久顺序节点 * CreateMode.EPHEMERAL 临时节点 * CreateMode.EPHEMERAL_SEQUENTIAL 临时顺序节点 */ private static void createNodeSync() throws Exception { // 参数一:要创建的节点,一次只可以创建一个子节点,不可以创建多级 // 参数二:节点的内容,是一个字节数组 // 参数三:节点的权限类型 // 参数四:节点的类型,这是持久节点 String s = zooKeeper.create("/lg_persistent", "持久节点内容".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); String s1 = zooKeeper.create("/lg_persistent_sequential", "持久顺序节点内容".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); String s2 = zooKeeper.create("/lg_ephemeral", "临时节点内容".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); String s3 = zooKeeper.create("/lg_ephemeral_sequential", "临时顺序节点内容".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("创建的持久节点是:" + s); System.out.println("创建的持久顺序节点是:" + s1); System.out.println("创建的临时节点是:" + s2); System.out.println("创建的临时顺序节点是:" + s3); } @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { countDownLatch.countDown(); } } }
1.4.3 获取节点数据
public class GetNodeData implements Watcher { private static CountDownLatch countDownLatch = new CountDownLatch(1); private static ZooKeeper zooKeeper; public static void main(String[] args) throws Exception { zooKeeper = new ZooKeeper("192.168.247.100:2181", 5000, new GetNodeData()); System.out.println(zooKeeper.getState()); countDownLatch.await(); System.out.println("zookeeper会话创建成功......"); // 获取节点内容 getNodeData(); // 获取所有的节点 getChildrens(); Thread.sleep(Integer.MAX_VALUE); } private static void getChildrens() throws Exception { /* 参数一:path节点路径 参数二:是否要监听,如果子节点变化会触发监听 */ List<String> children = zooKeeper.getChildren("/lg_persistent", true); System.out.println("/lg_persistent子节点为:" + children); } private static void getNodeData() throws Exception { /** * 参数一:要获取内容的节点path * 参数二:是否监听 * 参数三:版本,不填默认最新版本 */ byte[] data = zooKeeper.getData("/lg_persistent", true, null); System.out.println("获取到的节点内容为:" + new String(data)); } @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { countDownLatch.countDown(); } // 当字节点变化的时候,触发 if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) { List<String> children = null; try { // true参数继续注册监听 children = zooKeeper.getChildren(watchedEvent.getPath(), true); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(children); } } }
1.4.4 修改节点数据
public class UpdateNodeData implements Watcher { private static CountDownLatch countDownLatch = new CountDownLatch(1); static ZooKeeper zooKeeper; public static void main(String[] args) throws Exception { zooKeeper = new ZooKeeper("192.168.247.100:2181", 5000, new UpdateNodeData()); System.out.println(zooKeeper.getState()); countDownLatch.await(); System.out.println("zookeeper会话创建成功......"); // 修改节点内容 updateNodeData(); Thread.sleep(Integer.MAX_VALUE); } private static void updateNodeData() throws Exception { // 修改前的节点数据 byte[] data = zooKeeper.getData("/lg_persistent/c1", true, null); System.out.println("修改前的数据为:" + new String(data)); // 节点数据修改, 版本为-1表示更新最新版本数据 zooKeeper.setData("/lg_persistent/c1", "节点修改后的数据".getBytes("UTF-8"), -1); // 修改后的节点数据 byte[] data1 = zooKeeper.getData("/lg_persistent/c1", true, null); System.out.println("修改后的数据为:" + new String(data1)); } @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { countDownLatch.countDown(); } } }
1.4.5 删除节点
public class DeleteNode implements Watcher { static ZooKeeper zooKeeper; public static void main(String[] args) throws IOException, InterruptedException { zooKeeper = new ZooKeeper("192.168.247.100:2181", 5000, new DeleteNode()); System.out.println(zooKeeper.getState()); System.out.println("zookeeper会话创建成功......"); Thread.sleep(Integer.MAX_VALUE); } @Override public void process(WatchedEvent watchedEvent) { // 连接成功后删除节点 try { Stat exists = zooKeeper.exists("/lg_persistent/c1", false); System.out.println("节点是否存在:" + exists); // 删除节点 zooKeeper.delete("/lg_persistent/c1", -1); Stat exists1 = zooKeeper.exists("/lg_persistent/c1", false); System.out.println("节点是否存在:" + exists1); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } }
1.5 开源客户端操作Zookeeper
Zookeeper的开源客户端有很多,这里只列举ZkClient进行演示。开源客户端对Zookeeper的原生API进行了大量的封装,使得我们使用起来非常的方便。
引入Jar
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
1.5.1 创建会话
public class CreateSession { public static void main(String[] args) { /** * 这个创建会话的过程已经同步化 * 参数就是zookeeper的ip:port */ ZkClient zkClient = new ZkClient("192.168.247.100:2181"); System.out.println("客户端会话创建成功"); } }
1.5.2 创建节点
public class CreateNode { public static void main(String[] args) { ZkClient zkClient = new ZkClient("192.168.247.100:2181"); System.out.println("客户端会话创建成功"); /** * 方法名称: * createPersistent 创建持久节点 * createEphemeral 创建临时节点 * createPersistentSequential 创建持久顺序节点 * createEphemeralSequential 创建临时顺序节点 * 参数一:创建的节点path * 参数二:是否创建父节点,如果/lg_zkclient不存在,那么就创建。 */ zkClient.createPersistent("/lg_zkclient/c1", true); System.out.println("节点创建成功"); } }
1.5.3 获取/更新节点数据
public class GetNodeData { public static void main(String[] args) throws InterruptedException { ZkClient zkClient = new ZkClient("192.168.247.100:2181"); System.out.println("客户端会话创建成功"); // 判断节点是否存在 boolean exists = zkClient.exists("/lg_zkclient"); System.out.println("节点是否存在:" + exists); // 获取节点数据 Object readData = zkClient.readData("/lg_zkclient"); System.out.println(readData); // 创建监听事件,监听节点数据变更 zkClient.subscribeDataChanges("/lg_zkclient", new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { System.out.println(s + "节点的数据变更了, " + o); } @Override public void handleDataDeleted(String s) throws Exception { System.out.println(s + "节点被删除了"); } }); // 更新节点数据 zkClient.writeData("/lg_zkclient", "更新后的节点数据"); Thread.sleep(Integer.MAX_VALUE); } }
1.5.4 删除节点
public class DeleteNode { public static void main(String[] args) { ZkClient zkClient = new ZkClient("192.168.247.100:2181"); System.out.println("客户端会话创建成功"); // 删除节点 /* 方法说明: delete 删除单个节点 deleteRecursive 循环删除节点 */ zkClient.deleteRecursive("/lg_zkclient/c1"); System.out.println("节点循环删除子成功"); } }