zookeeper客户端选型
- 原生zookeeper客户端,有watcher一次性、无超时重连机制等一系列问题
- ZkClient,解决了原生客户端一些问题,一些存量老系统中还在使用
- curator,提供了各种应用场景(封装了分布式锁,计数器等),新项目首选
分布式锁使用场景
在单体项目中jvm中的锁即可完成需要,但是微服务、分布式环境下,同一个服务可能部署在多台服务器上,多个jvm之间无法通过常用的jvm锁来完成同步操作,需要借用分布式锁来完成上锁、释放锁。例如在订单服务中,我们需要根据日期来生成订单号流水,就有可能产生相同的时间日期,从而出现重复订单号。
zookeeper分布式锁实现原理
- zookeeper中规定,在同一时刻,不能有多个客户端创建同一个节点,我们可以利用这个特性实现分布式锁。zookeeper临时节点只在session生命周期存在,session一结束会自动销毁。
- watcher机制,在代表锁资源的节点被删除,即可以触发watcher解除阻塞重新去获取锁,这也是zookeeper分布式锁较其他分布式锁方案的一大优势。
基于临时节点方案
第一种方案实现较为简单,逻辑就是谁创建成功该节点,谁就持有锁,创建失败的自己进行阻塞,A线程先持有锁,B线程获取失败就会阻塞,同时对/lockPath设置监听,A线程执行完操作后删除节点,触发监听器,B线程此时解除阻塞,重新去获取锁。
我们模仿原生jdk的lock接口设计,采用模板方法设计模式来编写分布式锁,这样的好处是扩展性强,我们可以快速切换到redis分布式锁、数据库分布式锁等实现方式。
创建Lock接口
public interface Lock { /** * 获取锁 */ void getLock() throws Exception; /** * 释放锁 */ void unlock() throws Exception; }
AbstractTemplateLock抽象类
public abstract class AbstractTemplateLock implements Lock { @Override public void getLock() { if (tryLock()) { System.out.println(Thread.currentThread().getName() + "获取锁成功"); } else { //等待 waitLock();//事件监听 如果节点被删除则可以重新获取 //重新获取 getLock(); } } protected abstract void waitLock(); protected abstract boolean tryLock(); protected abstract void releaseLock(); @Override public void unlock() { releaseLock(); } }
zookeeper分布式锁逻辑
@Slf4j public class ZkTemplateLock extends AbstractTemplateLock { private static final String zkServers = "127.0.0.1:2181"; private static final int sessionTimeout = 8000; private static final int connectionTimeout = 5000; private static final String lockPath = "/lockPath"; private ZkClient client; public ZkTemplateLock() { client = new ZkClient(zkServers, sessionTimeout, connectionTimeout); log.info("zk client 连接成功:{}",zkServers); } @Override protected void waitLock() { CountDownLatch latch = new CountDownLatch(1); IZkDataListener listener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("监听到节点被删除"); latch.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception {} }; //完成 watcher 注册 client.subscribeDataChanges(lockPath, listener); //阻塞自己 if (client.exists(lockPath)) { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //取消watcher注册 client.unsubscribeDataChanges(lockPath, listener); } @Override protected boolean tryLock() { try { client.createEphemeral(lockPath); System.out.println(Thread.currentThread().getName()+"获取到锁"); } catch (Exception e) { log.error("创建失败"); return false; } return true; } @Override public void releaseLock() { client.delete(this.lockPath); } }
缺点:
每次去竞争锁,都只会有一个线程拿到锁,当线程数庞大时会发生“惊群”现象,zookeeper节点可能会运行缓慢甚至宕机。这是因为其他线程没获取到锁时都会监听/lockPath节点,当A线程释放完毕,海量的线程都同时停止阻塞,去争抢锁,这种操作十分耗费资源,且性能大打折扣。
基于临时顺序节点方案
临时顺序节点与临时节点不同的是产生的节点是有序的,我们可以利用这一特点,只让当前线程监听上一序号的线程,每次获取锁的时候判断自己的序号是否为最小,最小即获取到锁,执行完毕就删除当前节点继续判断谁为最小序号的节点。
临时顺序节点操作源码
@Slf4j public class ZkSequenTemplateLock extends AbstractTemplateLock { private static final String zkServers = "127.0.0.1:2181"; private static final int sessionTimeout = 8000; private static final int connectionTimeout = 5000; private static final String lockPath = "/lockPath"; private String beforePath; private String currentPath; private ZkClient client; public ZkSequenTemplateLock() { client = new ZkClient(zkServers); if (!client.exists(lockPath)) { client.createPersistent(lockPath); } log.info("zk client 连接成功:{}",zkServers); } @Override protected void waitLock() { CountDownLatch latch = new CountDownLatch(1); IZkDataListener listener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("监听到节点被删除"); latch.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception {} }; //给排在前面的节点增加数据删除的watcher,本质是启动另一个线程去监听上一个节点 client.subscribeDataChanges(beforePath, listener); //阻塞自己 if (client.exists(beforePath)) { try { System.out.println("阻塞"+currentPath); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //取消watcher注册 client.unsubscribeDataChanges(beforePath, listener); } @Override protected boolean tryLock() { if (currentPath == null) { //创建一个临时顺序节点 currentPath = client.createEphemeralSequential(lockPath + "/", "lock-data"); System.out.println("current:" + currentPath); } //获得所有的子节点并排序。临时节点名称为自增长的字符串 List<String> childrens = client.getChildren(lockPath); //排序list,按自然顺序排序 Collections.sort(childrens); if (currentPath.equals(lockPath + "/" + childrens.get(0))) { return true; } else { //如果当前节点不是排第一,则获取前面一个节点信息,赋值给beforePath int curIndex = childrens.indexOf(currentPath.substring(lockPath.length() + 1)); beforePath = lockPath + "/" + childrens.get(curIndex - 1); } System.out.println("beforePath"+beforePath); return false; } @Override public void releaseLock() { System.out.println("delete:" + currentPath); client.delete(currentPath); } }
Curator分布式锁工具
curator提供了以下种类的锁:
- 共享可重入锁(Shared Reentrant Lock):全局同步锁,同一时间不会有两个客户端持有一个锁
- 共享锁:与共享可重入锁类似,但是不可重入(有时候会因为这个原因造成死锁)
- 共享可重入读写锁
- 共享信号量
- Multi Shared Lock:管理多种锁的容器实体
我们采用第一种Shared Reentrant Lock中的InterProcessMutex
来完成上锁、释放锁的的操作
public class ZkLockWithCuratorTemplate implements Lock { // zk host地址 private String host = "localhost"; // zk自增存储node private String lockPath = "/curatorLock"; // 重试休眠时间 private static final int SLEEP_TIME_MS = 1000; // 最大重试1000次 private static final int MAX_RETRIES = 1000; //会话超时时间 private static final int SESSION_TIMEOUT = 30 * 1000; //连接超时时间 private static final int CONNECTION_TIMEOUT = 3 * 1000; //curator核心操作类 private CuratorFramework curatorFramework; InterProcessMutex lock; public ZkLockWithCuratorTemplate() { curatorFramework = CuratorFrameworkFactory.builder() .connectString(host) .connectionTimeoutMs(CONNECTION_TIMEOUT) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME_MS, MAX_RETRIES)) .build(); curatorFramework.start(); lock = new InterProcessMutex (curatorFramework, lockPath); } @Override public void getLock() throws Exception { //5s后超时释放锁 lock.acquire(5, TimeUnit.SECONDS); } @Override public void unlock() throws Exception { lock.release(); } }
源码以及测试类地址
END