肝一下ZooKeeper实现分布式锁的方案,附带实例!

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 肝一下ZooKeeper实现分布式锁的方案,附带实例!

zookeeper客户端选型


  • 原生zookeeper客户端,有watcher一次性、无超时重连机制等一系列问题


  • ZkClient,解决了原生客户端一些问题,一些存量老系统中还在使用


  • curator,提供了各种应用场景(封装了分布式锁,计数器等),新项目首选


分布式锁使用场景


在单体项目中jvm中的锁即可完成需要,但是微服务、分布式环境下,同一个服务可能部署在多台服务器上,多个jvm之间无法通过常用的jvm锁来完成同步操作,需要借用分布式锁来完成上锁、释放锁。例如在订单服务中,我们需要根据日期来生成订单号流水,就有可能产生相同的时间日期,从而出现重复订单号。


zookeeper分布式锁实现原理


  • zookeeper中规定,在同一时刻,不能有多个客户端创建同一个节点,我们可以利用这个特性实现分布式锁。zookeeper临时节点只在session生命周期存在,session一结束会自动销毁。


  • watcher机制,在代表锁资源的节点被删除,即可以触发watcher解除阻塞重新去获取锁,这也是zookeeper分布式锁较其他分布式锁方案的一大优势。


基于临时节点方案


第一种方案实现较为简单,逻辑就是谁创建成功该节点,谁就持有锁,创建失败的自己进行阻塞,A线程先持有锁,B线程获取失败就会阻塞,同时对/lockPath设置监听,A线程执行完操作后删除节点,触发监听器,B线程此时解除阻塞,重新去获取锁。


image.png


我们模仿原生jdk的lock接口设计,采用模板方法设计模式来编写分布式锁,这样的好处是扩展性强,我们可以快速切换到redis分布式锁、数据库分布式锁等实现方式。


创建Lock接口


public interface Lock {
    /**
     * 获取锁
     */
    void getLock() throws Exception;
    /**
     * 释放锁
     */
    void unlock() throws Exception;
}


AbstractTemplateLock抽象类


public abstract class AbstractTemplateLock implements Lock {
    @Override
    public void getLock() {
        if (tryLock()) {
            System.out.println(Thread.currentThread().getName() + "获取锁成功");
        } else {
            //等待
            waitLock();//事件监听 如果节点被删除则可以重新获取
            //重新获取
            getLock();
        }
    }
    protected abstract void waitLock();
    protected abstract boolean tryLock();
    protected abstract void releaseLock();
    @Override
    public void unlock() {
        releaseLock();
    }
}


zookeeper分布式锁逻辑


@Slf4j
public class ZkTemplateLock extends AbstractTemplateLock {
    private static final String zkServers = "127.0.0.1:2181";
    private static final int sessionTimeout = 8000;
    private static final int connectionTimeout = 5000;
    private static final String lockPath = "/lockPath";
    private ZkClient client;
    public ZkTemplateLock() {
        client = new ZkClient(zkServers, sessionTimeout, connectionTimeout);
        log.info("zk client 连接成功:{}",zkServers);
    }
    @Override
    protected void waitLock() {
        CountDownLatch latch = new CountDownLatch(1);
        IZkDataListener listener = new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("监听到节点被删除");
                latch.countDown();
            }
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {}
        };
        //完成 watcher 注册
        client.subscribeDataChanges(lockPath, listener);
        //阻塞自己
        if (client.exists(lockPath)) {
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //取消watcher注册
        client.unsubscribeDataChanges(lockPath, listener);
    }
    @Override
    protected boolean tryLock() {
        try {
            client.createEphemeral(lockPath);
            System.out.println(Thread.currentThread().getName()+"获取到锁");
        } catch (Exception e) {
            log.error("创建失败");
            return false;
        }
        return true;
    }
    @Override
    public void releaseLock() {
       client.delete(this.lockPath);
    }
}


缺点


每次去竞争锁,都只会有一个线程拿到锁,当线程数庞大时会发生“惊群”现象,zookeeper节点可能会运行缓慢甚至宕机。这是因为其他线程没获取到锁时都会监听/lockPath节点,当A线程释放完毕,海量的线程都同时停止阻塞,去争抢锁,这种操作十分耗费资源,且性能大打折扣。


基于临时顺序节点方案


临时顺序节点与临时节点不同的是产生的节点是有序的,我们可以利用这一特点,只让当前线程监听上一序号的线程,每次获取锁的时候判断自己的序号是否为最小,最小即获取到锁,执行完毕就删除当前节点继续判断谁为最小序号的节点。


image.png


image.png


临时顺序节点操作源码


@Slf4j
public class ZkSequenTemplateLock extends AbstractTemplateLock {
    private static final String zkServers = "127.0.0.1:2181";
    private static final int sessionTimeout = 8000;
    private static final int connectionTimeout = 5000;
    private static final String lockPath = "/lockPath";
    private String beforePath;
    private String currentPath;
    private ZkClient client;
    public ZkSequenTemplateLock() {
        client = new ZkClient(zkServers);
        if (!client.exists(lockPath)) {
            client.createPersistent(lockPath);
        }
        log.info("zk client 连接成功:{}",zkServers);
    }
    @Override
    protected void waitLock() {
        CountDownLatch latch = new CountDownLatch(1);
        IZkDataListener listener = new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("监听到节点被删除");
                latch.countDown();
            }
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {}
        };
        //给排在前面的节点增加数据删除的watcher,本质是启动另一个线程去监听上一个节点
        client.subscribeDataChanges(beforePath, listener);
        //阻塞自己
        if (client.exists(beforePath)) {
            try {
                System.out.println("阻塞"+currentPath);
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //取消watcher注册
        client.unsubscribeDataChanges(beforePath, listener);
    }
    @Override
    protected boolean tryLock() {
        if (currentPath == null) {
            //创建一个临时顺序节点
            currentPath = client.createEphemeralSequential(lockPath + "/", "lock-data");
            System.out.println("current:" + currentPath);
        }
        //获得所有的子节点并排序。临时节点名称为自增长的字符串
        List<String> childrens = client.getChildren(lockPath);
        //排序list,按自然顺序排序
        Collections.sort(childrens);
        if (currentPath.equals(lockPath + "/" + childrens.get(0))) {
            return true;
        } else {
            //如果当前节点不是排第一,则获取前面一个节点信息,赋值给beforePath
            int curIndex = childrens.indexOf(currentPath.substring(lockPath.length() + 1));
            beforePath = lockPath + "/" + childrens.get(curIndex - 1);
        }
        System.out.println("beforePath"+beforePath);
        return false;
    }
    @Override
    public void releaseLock() {
        System.out.println("delete:" + currentPath);
        client.delete(currentPath);
    }
}


Curator分布式锁工具


curator提供了以下种类的锁:


  • 共享可重入锁(Shared Reentrant Lock):全局同步锁,同一时间不会有两个客户端持有一个锁
  • 共享锁:与共享可重入锁类似,但是不可重入(有时候会因为这个原因造成死锁)
  • 共享可重入读写锁
  • 共享信号量
  • Multi Shared Lock:管理多种锁的容器实体


我们采用第一种Shared Reentrant Lock中的InterProcessMutex来完成上锁、释放锁的的操作


public class ZkLockWithCuratorTemplate implements Lock {
    // zk host地址
    private String host = "localhost";
    // zk自增存储node
    private String lockPath = "/curatorLock";
    // 重试休眠时间
    private static final int SLEEP_TIME_MS = 1000;
    // 最大重试1000次
    private static final int MAX_RETRIES = 1000;
    //会话超时时间
    private static final int SESSION_TIMEOUT = 30 * 1000;
    //连接超时时间
    private static final int CONNECTION_TIMEOUT = 3 * 1000;
        //curator核心操作类
    private CuratorFramework curatorFramework;
    InterProcessMutex lock;
   public ZkLockWithCuratorTemplate() {
       curatorFramework = CuratorFrameworkFactory.builder()
               .connectString(host)
               .connectionTimeoutMs(CONNECTION_TIMEOUT)
               .sessionTimeoutMs(SESSION_TIMEOUT)
               .retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME_MS, MAX_RETRIES))
               .build();
       curatorFramework.start();
       lock = new InterProcessMutex (curatorFramework, lockPath);
    }
    @Override
    public void getLock() throws Exception {
        //5s后超时释放锁
         lock.acquire(5, TimeUnit.SECONDS);
    }
    @Override
    public void unlock() throws Exception {
        lock.release();
    }
}


源码以及测试类地址


https://github.com/Motianshi/distribute-tool


END

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
3月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
3月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
3天前
|
NoSQL 安全 PHP
hyperf-wise-locksmith,一个高效的PHP分布式锁方案
`hyperf-wise-locksmith` 是 Hyperf 框架下的互斥锁库,支持文件锁、分布式锁、红锁及协程锁,有效防止分布式环境下的竞争条件。本文介绍了其安装、特性和应用场景,如在线支付系统的余额扣减,确保操作的原子性。
13 4
|
28天前
|
NoSQL 算法 关系型数据库
分布式 ID 详解 ( 5大分布式 ID 生成方案 )
本文详解分布式全局唯一ID及其5种实现方案,关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
分布式 ID 详解 ( 5大分布式 ID 生成方案 )
|
2月前
|
Dubbo 应用服务中间件 Apache
Dubbo 应用切换 ZooKeeper 注册中心实例,流量无损迁移
如果 Dubbo 应用使用 ZooKeeper 作为注册中心,现在需要切换到新的 ZooKeeper 实例,如何做到流量无损?
19 4
|
2月前
|
存储 缓存 NoSQL
分布式架构下 Session 共享的方案
【10月更文挑战第15天】在实际应用中,需要根据具体的业务需求、系统架构和性能要求等因素,选择合适的 Session 共享方案。同时,还需要不断地进行优化和调整,以确保系统的稳定性和可靠性。
|
2月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
45 2
|
2月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
48 1
|
2月前
|
SQL NoSQL 安全
分布式环境的分布式锁 - Redlock方案
【10月更文挑战第2天】Redlock方案是一种分布式锁实现,通过在多个独立的Redis实例上加锁来提高容错性和可靠性。客户端需从大多数节点成功加锁且总耗时小于锁的过期时间,才能视为加锁成功。然而,该方案受到分布式专家Martin的质疑,指出其在特定异常情况下(如网络延迟、进程暂停、时钟偏移)可能导致锁失效,影响系统的正确性。Martin建议采用fencing token方案,以确保分布式锁的正确性和安全性。
48 0
|
2月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
49 0
下一篇
无影云桌面