1、ZooKeeper概念
Zookeeper 是 Apache Hadoop 项目下的一个子项目,是一个树形目录服务。
Zookeeper 翻译过来就是 动物园管理员,他是用来管 Hadoop(大象)、Hive(蜜蜂)、Pig(小 猪)的管理员。简称zk。
Zookeeper 是一个分布式的、开源的分布式应用程序的协调服务。简单点说就是它是来管理分布式应用程序的,它自己不做事情,它是来管人的。
Zookeeper 提供的主要功能包括:
1、配置管理:ZooKeeper提供了一个配置中心,用于管理配置。2、分布式锁:提供了一个分布式锁中心,谁要访问就去中心取锁。
3、集群管理:注册中心
2、安装与配置
Linux版ZooKeeper安装-阿里云开发者社区 (aliyun.com)
3、Zookerper命令操作
3.1 Zookeeper 数据模型
- ZooKeeper 是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构。
- 这里面的每一个节点都被称为: ZNode,每个节点上都会保存自己的数据和节点信息。
- 节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。
- 节点可以分为四大类:
- PERSISTENT 持久化节点
- EPHEMERAL 临时节点 :-e
- PERSISTENT_SEQUENTIAL 持久化顺序节点 :-s
- EPHEMERAL_SEQUENTIAL 临时顺序节点 :-es
3.2 Zookeeper服务端常用命令
启动 ZooKeeper 服务
./zkServer.sh start
查看 ZooKeeper 服务状态
./zkServer.sh status
停止 ZooKeeper 服务
./zkServer.sh stop
重启 ZooKeeper 服务
./zkServer.sh restart
3.3 Zookeeper客户端常用命令
连接ZooKeeper服务端
#./zkCli.sh –server ip:port
./zkCli.sh -server localhost:2181
断开连接
quit
显示指定目录下节点
ls 目录
创建节点
create /节点path value
获取节点值
get /节点path
设置节点值
set /节点path value
删除单个节点
delete /节点path
删除带有子节点的节点
deleteall /节点path
查看命令帮助
help
创建临时节点
create -e /节点path value
创建顺序节点
create -s /节点path value
创建临时顺序节点
create -es /节点path value
ls2 /
查询节点详细信息
ls –s /节点path
4、ZooKeeper JavaAPI操作
4.1 Curator 介绍
Curator 是 Apache ZooKeeper 的Java客户端库。
其实Curator 是一个 外来者,其实ZooKeeper 提供了很多Java客户端。
常见的ZooKeeper Java API :
- 原生Java API
- ZkClient
- Curator
Curator 项目的目标是简化 ZooKeeper 客户端的使用。
Curator 最初是 Netfix 研发的,后来捐献了 Apache 基金会,目前是 Apache 的顶级项目。
官网:http://curator.apache.org/
4.2 Curator API 常用操作
4.2.1 建立连接
/**
* @description:建立连接
* @author: jie
* @time: 2022/4/3 23:02
*/
@Test
void testConnect() {
/**
* 第一个参数 : 连接字符串
* 第二个参数 : 会话超时时间
* 第三个参数 : 连接超时时间
* 第四个参数 : 重试策略
*
*/
//重试策略 参数: 每次休眠的时间,最大的重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
//1、第一种连接方式
// CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.58.150:2181",
// 60 * 1000, 15 * 1000, retryPolicy);
// //2、开启连接
// client.start();
//第二种方式 CuratorFrameworkFactory.builder();
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.58.150:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).namespace("jie").build();
//2、开启连接
client.start();
}
4.2.2 添加节点
1、创建节点
/**
* @description:创建节点 :节点类型 持久 临时 顺序 数据
* @author: jie
* @time: 2022/4/4 11:35
*/
@Test
void testCreate() {
try {
//1.基本创建 注:如果创建节点,没有指定数据。则默认将当前客户端的IP作为数据存储
client.create().forPath("/app1");
} catch (Exception e) {
e.printStackTrace();
}
}
2、创建节点 带数据
@Test
void testCreate2() {
try {
//1.创建节点 带有数据 注:如果创建节点,没有指定数据。则默认将当前客户端的IP作为数据存储
client.create().forPath("/app2","求个关注".getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
3、设置节点类型
@Test
void testCreate3() {
try {
//设置节点的类型 默认类型:持久化 CreateMode.EPHEMERAL 临时节点,方法结束消失,因为只存在于一次会话
client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3 ","求个关注".getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
4、创建多级节点
@Test
void testCreate4() {
try {
//创建多级节点 creatingParentsIfNeeded():如果父节点不存在,就先创建父节点
client.create().creatingParentsIfNeeded().forPath("/app4/p1 ","求个关注".getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
4.2.3 查询节点
1、查询数据
/**
* @description:查询节点
* @author: jie
* @time: 2022/4/4 12:04
*/
@Test
void testGet() {
try {
//查询数据
byte[] bytes = client.getData().forPath("/app2");
System.out.println(new String(bytes));
} catch (Exception e) {
e.printStackTrace();
}
}
2、查询子节点
@Test
void testGet2() {
try {
//查询子节点
List<String> list = client.getChildren().forPath("/app4");
System.out.println(list);
} catch (Exception e) {
e.printStackTrace();
}
}
3、查询节点状态信息
@Test
void testGet3() {
try {
Stat status = new Stat();
System.out.println(status);
//查询节点的状态信息
client.getData().storingStatIn(status).forPath("/app1");
System.out.println(status);
} catch (Exception e) {
e.printStackTrace();
}
}
4.2.4 修改节点
1、修改数据
/**
* @description:修改数据
* @author: jie
* @time: 2022/4/4 12:20
*/
@Test
void testset() {
try {
client.setData().forPath("/app1","求个点赞".getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
2、根据版本修改
@Test
void testsetForVersion() {
try {
Stat status = new Stat();
//查询节点的状态信息
client.getData().storingStatIn(status).forPath("/app1");
//version 是通过查询出来的,目的就是为了让其他客户端或者线程不干扰。
int version = status.getVersion();
System.out.println(version);
//根据版本修改
client.setData().withVersion(version).forPath("/app1","求个关注".getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
4.2.5 删除节点
1、删除单个节点
/**
* @description:删除节点
* @author: jie
* @time: 2022/4/4 12:30
*/
@Test
void testDelete() {
//删除单个节点
try {
client.delete().forPath("/app1");
} catch (Exception e) {
e.printStackTrace();
}
}
2、删除带有子节点的节点
@Test
void testDelete2() {
try {
//删除带有子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/app4");
} catch (Exception e) {
e.printStackTrace();
}
}
3、保证其删除成功
@Test
void testDelete3() {
//保证其删除成功
try {
client.delete().guaranteed().forPath("/app2");
} catch (Exception e) {
e.printStackTrace();
}
}
4、回调
@Test
void testDelete4() {
//回调
try {
client.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("被删除了");
System.out.println(curatorEvent);
}
}).forPath("/app1");
} catch (Exception e) {
e.printStackTrace();
}
}
4.2.6 Watch事件监听
- ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。
- ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
- ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便需要开发人员自己反复注册Watcher,比较繁琐,我们不用。
- Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。
- ZooKeeper提供了三种Watcher:
5.1 NodeCache : 只是监听某一个特定的节点。
5.2 PathChildrenCache : 监控一个ZNode的子节点。
5.3 TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合。
代码演示:
1、NodeCathe:给指定一个节点注册监听器
/**
* @description:演示 NodeCathe:给指定一个节点注册监听器
* @author: jie
* @time: 2022/4/4 12:53
*/
@Test
void testNodeCathe() throws Exception {
//1、创建NodeCath对象
NodeCache nodeCache = new NodeCache(client,"/app1");
//2、注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了~");
//获取修改节点后的数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println("最新的数据:"+new String(data));
}
});
//3、开启监听 如果设置为True,则开启监听,加载缓冲数据
nodeCache.start(true);
while (true){
}
}
2、 PathChildrenCache:监听某个节点的所有子节点
/**
* @description:演示 PathChildrenCache:监听某个节点的所有子节点
* @author: jie
* @time: 2022/4/4 12:53
*/
@Test
void testPathChildrenCache() throws Exception {
//创建监听对象 连接 地址 是否缓存
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
//绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点变化了");
System.out.println(pathChildrenCacheEvent);
//监听子节点的 数据变更,并且拿到变更后的数据
//1、获取类型
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
//2、判断类型是否是update
if(type.equals((PathChildrenCacheEvent.Type.CHILD_UPDATED))){
System.out.println("数据变了!!!");
byte[] data = pathChildrenCacheEvent.getData().getData();
System.out.println(new String(data));
}
}
});
//开启
pathChildrenCache.start();
while (true){
}
}
3、ThreeCathe():监听某个节点和所有子节点们
/**
* @description:ThreeCathe():监听某个节点和所有子节点们
* @author: jie
* @time: 2022/4/4 13:33
*/
@Test
void testThreeCathe() throws Exception {
//创建监听器
TreeCache treeCache = new TreeCache(client,"/app2");
//注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println("节点变化了");
System.out.println(treeCacheEvent);
}
});
//开启
treeCache.start();
while (true){
}
}
4.3 分布式锁
在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。
那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁。
4.3.1 Zookeeper分布式锁原理
核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。
这里有三个客户端,它们都和ZooKeeper Server连接起来的,ZooKeeper Server里面现在又三个节点,现在client1如果想获取锁,那他就可以在/lock节点下创建一个节点,就代表获取锁了,用完了之后,释放资源,再把这个节点删除掉。
1、客户端获取锁时,在lock节点下创建临时顺序节点。2、然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创 建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
3、如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
4、如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。
4.4 模拟12306 售票案例
在Curator中有五种实现分布式锁方案:
- InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
- InterProcessMutex:分布式可重入排它锁
- InterProcessReadWriteLock:分布式读写锁
- InterProcessMultiLock:将多个锁作为单个实体管理的容器
- InterProcessSemaphoreV2:共享信号量
接下来是代码演示,我这里为了方便,就不搞好几台机器了,就简单的模拟一下。
12306 实体类
package com.jie.curatorzk;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
public class Ticket12306 implements Runnable {
private int tickets = 10;//数据库的票数
private InterProcessMutex lock;
public Ticket12306() {
//重试策略 参数: 每次休眠的时间,最大的重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.58.150:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).build();
//2、开启连接
client.start();
lock = new InterProcessMutex(client,"/lock");
}
@Override
public void run() {
while (true) {
try {
//获取锁 参数 时间 时间单位
lock.acquire(3, TimeUnit.SECONDS);
if (tickets > 0) {
System.out.println(Thread.currentThread() + ":" + tickets);
Thread.sleep(1000);
tickets--;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放锁
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
测试类
package com.jie.curatorzk;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class LoockTest {
public static void main(String[] args) {
Ticket12306 ticket12306 = new Ticket12306();
//创建客户端
Thread t1 = new Thread(ticket12306,"携程");
Thread t2 = new Thread(ticket12306,"飞猪");
t1.start();
t2.start();
}
}
代码实现还是比较简单的,主要是要知道原理。
5、ZooKeeper集群搭建
这里有这么多台机器,那么多台机器,那么到底谁说得算呢?那么ZooKeeper搭建集群第一步,就是要从这么多台机器中选举出一台领导者。
Leader选举:
- Serverid:服务器ID, 比如有三台服务器,编号分别是1,2,3;编号越大在选择算法中的权重越大。
- Zxid:数据ID;服务器中存放的最大数据ID.值越大说明数据 越新,在选举算法中数据越新权重越大。
在Leader选举的过程中,如果某台ZooKeeper获得了超过半数的选票, 则此ZooKeeper就可以成为Leader了。
搭建教程:
ZooKeeper集群角色介绍
在ZooKeeper集群服中务中有三个角色:
Leader 领导者 :
1.处理事务请求
2 .集群内部各服务器的调度者
Follower 跟随者 :
1.处理客户端非事务请求,转发事务请求给Leader服务器
2 .参与Leader选举投票
Observer 观察者:
处理客户端非事务请求,转发事务请求给Leader服务器