文章目录:
1.前言
什么叫做分布式锁呢?
比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
2.原生Zookeeper实现分布式锁
代码中的注释我已经写的很详细了。
这其中用到了JUC中的CountDownLatch,可以参考:https://blog.csdn.net/weixin_43823808/article/details/120799251
package com.szh.case2; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.CountDownLatch; /** * */ public class DistributedZkLock { private final String connectString = "192.168.40.130:2181"; private final int sessionTimeout = 30000; private final ZooKeeper zk; private CountDownLatch connectLatch = new CountDownLatch(1); private CountDownLatch waitLatch = new CountDownLatch(1); private String currentNode; private String waitPath; public DistributedZkLock() throws IOException, InterruptedException, KeeperException { //获取zk连接 zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { //如果连接上zk,则connectLatch可以释放 if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { connectLatch.countDown(); } //如果监听的节点已被删除,同时当前监听的节点路径与即将被监听节点的前一个节点路径相同,则waitLatch可以释放 if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) { waitLatch.countDown(); } } }); //等待zk连接成功之后,程序则继续往下走,其他线程进入等待连接的状态 connectLatch.await(); //判断根节点 /locks 是否存在 Stat stat = zk.exists("/locks", false); //如果根节点 /locks 不存在,则立马创建 if (Objects.isNull(stat)) { //此 /locks 节点默认所有人均可访问,而且是永久性节点 zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } //加锁 public void zkLock() { try { //所谓加锁,就是在根节点/locks下创建对应的临时、带序号的节点 currentNode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //睡一会,让结果更清晰 Thread.sleep(100); //判断创建的节点是否是序号最小的节点,如果是,则获取到锁;如果不是,则监听它序号的前一个节点 List<String> children = zk.getChildren("/locks", false); if (children.size() == 1) { //只有一个节点,则直接获取锁 return; } else { //如果有多个节点,则需要判断谁的序号最小 //先对获取的节点的list集合排序,确保从小到大的顺序 Collections.sort(children); //获取节点名称 seq-00000000 String thisNode = currentNode.substring("/locks/".length()); //通过节点名称获取到它在list集合中的位置 int index = children.indexOf(thisNode); if (index == -1) { //没数据,无意义 System.out.println("数据异常...."); } else if (index == 0) { //说明此节点处于第一个位置,可以获取锁 return; } else { //非第一个位置,需要监听前一个节点的变化 //获取该节点序号的前一个节点 waitPath = "/locks/" + children.get(index - 1); //监听,回调Watch的process方法 zk.getData(waitPath, true, new Stat()); //其他线程进入等待锁的状态 waitLatch.await(); return; } } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } //解锁 public void zkUnLock() { //删除节点即解锁 try { zk.delete(this.currentNode, -1); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } }
下面是针对上面的一些方法的测试。
package com.szh.case2; import org.apache.zookeeper.KeeperException; import java.io.IOException; import java.util.concurrent.TimeUnit; /** * */ public class DistributedZkLockTest { public static void main(String[] args) throws InterruptedException, IOException, KeeperException { //创建分布式锁1 final DistributedZkLock lock1 = new DistributedZkLock(); //创建分布式锁2 final DistributedZkLock lock2 = new DistributedZkLock(); //如下创建两个线程,模拟获取分布式锁的过程 new Thread(new Runnable() { @Override public void run() { try { lock1.zkLock(); System.out.println(Thread.currentThread().getName() + " 已启动,获取到锁...."); TimeUnit.MILLISECONDS.sleep(3000); lock1.zkUnLock(); System.out.println(Thread.currentThread().getName() + "已释放锁...."); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { lock2.zkLock(); System.out.println(Thread.currentThread().getName() + " 已启动,获取到锁...."); TimeUnit.MILLISECONDS.sleep(3000); lock2.zkUnLock(); System.out.println(Thread.currentThread().getName() + "已释放锁...."); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
那么上面是我们自己手写的加锁、解锁的一些方法,其中也存在着很多问题。
(1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
(2)Watch 需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高的
(4)不支持多节点删除和创建。需要自己去递归
所以就引出了下面的案例:👇👇👇
3.Curator框架实现分布式锁案例
Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。
详情请查看官方文档:https://curator.apache.org/index.html
要使用它,就需要在pom文件中添加相关依赖。
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.7</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version> </dependency>
package com.szh.case3; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.concurrent.TimeUnit; /** * */ public class CuratorLockTest { public static void main(String[] args) { //创建分布式锁1 InterProcessMutex lock1 = new InterProcessMutex(getZkClient(), "/locks"); //创建分布式锁2 InterProcessMutex lock2= new InterProcessMutex(getZkClient(), "/locks"); //下面创建两个线程 new Thread(() -> { try { lock1.acquire(); //获取锁 System.out.println(Thread.currentThread().getName() + " 首次获取到锁...."); lock1.acquire(); System.out.println(Thread.currentThread().getName() + " 再次获取到锁...."); TimeUnit.MILLISECONDS.sleep(5000); lock1.release(); //释放锁 System.out.println(Thread.currentThread().getName() + " 首次释放锁...."); lock1.release(); System.out.println(Thread.currentThread().getName() + " 再次释放锁...."); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { lock2.acquire(); System.out.println(Thread.currentThread().getName() + " 首次获取到锁...."); lock2.acquire(); System.out.println(Thread.currentThread().getName() + " 再次获取到锁...."); TimeUnit.MILLISECONDS.sleep(5000); lock2.release(); System.out.println(Thread.currentThread().getName() + " 首次释放锁...."); lock2.release(); System.out.println(Thread.currentThread().getName() + " 再次释放锁...."); } catch (Exception e) { e.printStackTrace(); } }).start(); } private static CuratorFramework getZkClient() { //客户端和服务器连接失败之后,多少秒之后再进行重试,以及重试的次数,5000ms之后重试,重试3次 ExponentialBackoffRetry policy = new ExponentialBackoffRetry(5000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("192.168.40.130:2181") .connectionTimeoutMs(20000) .sessionTimeoutMs(30000) .retryPolicy(policy).build(); //启动zk客户端 client.start(); System.out.println("zookeeper 启动成功...."); return client; } }
4.Zookeeper常见面试题
· 选举机制
半数机制,超过半数的投票通过,即通过。
(1)第一次启动选举规则:投票过半数时,服务器 id 大的胜出
(2)第二次启动选举规则:①EPOCH 大的直接胜出
②EPOCH 相同,事务 id 大的胜出
③事务 id 相同,服务器 id 大的胜出
· 生产集群安装多少 zk 合适?
安装奇数台。
生产经验:10 台服务器:3 台 zk
20台服务器:5 台 zk
100台服务器:11 台 zk
200台服务器:11 台 zk
服务器台数多:好处,提高可靠性;坏处:提高通信延时
· 常用命令:ls、get、create、delete