【分布式系统】Curator 实现 Zookeeper 分布式锁

简介: 【分布式系统】Curator 实现 Zookeeper 分布式锁

1.Curator简介

官网的说法:curator是一个Java/JVM客户端库,用于zookeeper,一个分布式协调服务。它包括一个高级API框架和实用程序,使ApacheZooKeeper的使用更加简单和可靠。它还包括常见用例和扩展的方法,如服务发现和Java8异步DSL。

官方使用文档:Apache Curator –

个人使用手册:Curator使用手册 - 腾讯云开发者社区-
腾讯云

配置zookeeper集群参考;https://blog.csdn.net/m0_63748493/article/details/125776295

2.共享锁和排他锁

排它锁,又称独占锁,独享锁 synchronized就是一个排它锁
共享锁,又称为读锁,获得共享锁后,可以查看,但无法删除和修改数 据, 其他线程此时业获取到共享锁,也可以查看但是 无法修改和 删除数据
共享锁和排它锁典型是ReentranReadWriteLock 其中,读锁是共享锁,写锁是 排它锁

要么多读,要么一写,二者不可共存

3.实现共享锁

1.IDEA创建maven项目,pom.xml添加如下依赖

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.8.0</version>
        </dependency>

        <dependency>
            <groupId>curator</groupId>
            <artifactId>curator</artifactId>
            <version>0.0.7</version>
        </dependency>

        <dependency>
            <groupId>cn.itlym.shoulder</groupId>
            <artifactId>lombok</artifactId>
            <version>0.1</version>
        </dependency>

2.鼠标右击,重新构建项目

3.创建CuratorCRUD类,和ZKShareLock类

public class CuratorCRUD {

    public   CuratorFramework curatorFramework;
    public   String ip="192.168.159.151:2181,192.168.159.151:2182,192.168.159.151:2183";

    public CuratorCRUD() {
        createZkCuratorConnection();
    }

    /**
     * 创建客户端连接
     */
    public  void  createZkCuratorConnection(){
        curatorFramework=CuratorFrameworkFactory
                .builder()
                .connectString(ip)
                .retryPolicy(new ExponentialBackoffRetry(1000,3))
                .build();
        curatorFramework.start();
    }

    /**
     * 关闭客户端连接
     */
    public  void deleteZkCuratorConnection(){
        curatorFramework.close();
    }
}


public class ZKShareLock extends Thread{

    private Object o=new Object();
    private CuratorFramework curatorFramework;
    private String basePath="/ShareLocks";
    private String userName=basePath+"/User-";
    private String cname;//客户端名字

    public ZKShareLock(CuratorFramework curatorFramework,String cname) {
        this.curatorFramework = curatorFramework;
        this.cname=cname;
    }

    @Override
    public void run() {
        try {
            //创建节点并获取节点名字
            //得到完整目录/ShareLocks/User-0000000092
            String nodeName = curatorFramework
                    .create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                    .forPath(userName, cname.getBytes());
            System.out.println("创捷节点成功");
            System.out.println(nodeName);
            System.out.println();
            //获取目录下子节点
            List<String> tempNodeNames = curatorFramework.getChildren().forPath(basePath);
            List<String> nodeNames =new ArrayList();

            for (int i = 0; i < tempNodeNames.size(); i++) {
                String name =tempNodeNames.get(i);
                name=basePath+"/"+name;
                nodeNames.add(name);
            }
            Collections.sort(nodeNames);
            int index=nodeNames.indexOf(nodeName);
            System.out.printf("index =%d  \n",index);
            if(index==0){
                doSomthings(nodeName);
                unlock(nodeName);
            }else {
                addWatcherWithTreeCache(nodeNames.get(index-1));

                synchronized (o){
                    o.wait();
                }

                doSomthings(nodeName);
                unlock(nodeName);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        curatorFramework.close();

    }
    //添加监听器
    public   void addWatcherWithTreeCache(String path) throws Exception {
        TreeCache treeCache=new TreeCache(curatorFramework,path);
        TreeCacheListener treeCacheListene=new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                if(treeCacheEvent.getType()==TreeCacheEvent.Type.NODE_REMOVED){
                    System.out.printf("%s delete\n",path);
                    synchronized (o){
                        o.notify();
                    }
                    System.out.printf("%s notify\n",path);
                }
            }
        };
        treeCache.getListenable().addListener(treeCacheListene);
        treeCache.start();
    }

    public void unlock(String nodeName){
        System.out.printf("%s unlock",nodeName);
    }

    public void doSomthings(String nodeName){
        System.out.printf("%s doSomthings",nodeName);
    }

    public static void main(String[] args) {
        Date date=new Date();
        for (int i = 0; i < 100; i++) {
            new ZKShareLock(new CuratorCRUD().curatorFramework,"用户 "+i).start();
        }
        Date date1=new Date();
        System.out.println(date1.getTime()-date.getTime());
    }
}

运行之前,需要在zookeeper客户端创建/ShareLocks节点,且上面的ip修改为自己的集群ip

部分运行结果:

4.实现排他锁

Curator中封装了一种分布式可重入排他锁:InterProcessMutex

创建CuratorMutex类,并在zookeeper中创建/Mutex节点

public class CuratorMutex implements Runnable{

    public  CuratorFramework curatorFramework;
    public  String basePath;
    public InterProcessLock processLock;
    public int idx;

    public CuratorMutex(CuratorFramework curatorFramework, String basePath, int idx) {
        this.curatorFramework = curatorFramework;
        this.basePath = basePath;
        this.processLock=new InterProcessMutex(curatorFramework,basePath);
        this.idx=idx;
    }

    @Override
    public void run() {
        Logger logger= Logger.getLogger("");
        try {
            //线程加锁
            processLock.acquire(1000, TimeUnit.SECONDS);
            logger.info(String.format("线程%d获取锁",idx));
            Thread.sleep(10);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //线程解锁
            try {
                processLock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
            logger.info(String.format("线程%d释放锁",idx));
        }
    }

    public static void main(String[] args) {
        CuratorFramework framework = new CuratorCRUD().curatorFramework;
        for (int i = 0; i < 20; i++) {
            new Thread(new CuratorMutex(framework,"/Mutex",i)).start();
        }
        while (true){

        }
    }
}

部分运行结果:

下面将从源码里带大家讲解如何实现可重入排他锁

acquire方法内部实际实际上调用了internalLock方法

private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */
        //获取当前线程,并获取LockData锁信息
        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            //lockCount自增,锁重入
            lockData.lockCount.incrementAndGet();
            return true;
        }
        //获取锁
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            //创建锁,并将锁信息存放到threadData这个Map中
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }

public void release() throws Exception
    {
        /*
            Note on concurrency: a given lockData instance
            can be only acted on by a single thread so locking isn't necessary
         */

        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData == null )
        {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
        //减少重入次数
        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 )
        {
            return;
        }
        if ( newLockCount < 0 )
        {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try
        {    //释放锁
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {    //从Map中移除该线程
            threadData.remove(currentThread);
        }
    }
目录
相关文章
|
6月前
|
消息中间件 分布式计算 资源调度
《聊聊分布式》ZooKeeper与ZAB协议:分布式协调的核心引擎
ZooKeeper是一个开源的分布式协调服务,基于ZAB协议实现数据一致性,提供分布式锁、配置管理、领导者选举等核心功能,具有高可用、强一致和简单易用的特点,广泛应用于Kafka、Hadoop等大型分布式系统中。
|
7月前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
596 0
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
NoSQL Java 中间件
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
本文介绍了从单机锁到分布式锁的演变,重点探讨了使用Redis实现分布式锁的方法。分布式锁用于控制分布式系统中多个实例对共享资源的同步访问,需满足互斥性、可重入性、锁超时防死锁和锁释放正确防误删等特性。文章通过具体示例展示了如何利用Redis的`setnx`命令实现加锁,并分析了简化版分布式锁存在的问题,如锁超时和误删。为了解决这些问题,文中提出了设置锁过期时间和在解锁前验证持有锁的线程身份的优化方案。最后指出,尽管当前设计已解决部分问题,但仍存在进一步优化的空间,将在后续章节继续探讨。
1547 131
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
|
11月前
|
Apache
分布式锁—7.Curator的分布式锁
本文详细解析了Apache Curator库中多种分布式锁的实现机制,包括可重入锁、非可重入锁、可重入读写锁、MultiLock和Semaphore。可重入锁通过InterProcessMutex实现,支持同一线程多次加锁,锁的获取和释放通过Zookeeper的临时顺序节点实现。非可重入锁InterProcessSemaphoreMutex基于Semaphore实现,确保同一时间只有一个线程获取锁。可重入读写锁InterProcessReadWriteLock通过组合读锁和写锁实现,支持读写分离。Multi
|
NoSQL Java 测试技术
【📕分布式锁通关指南 05】通过redisson实现分布式锁
本文介绍了如何使用Redisson框架在SpringBoot中实现分布式锁,简化了之前通过Redis手动实现分布式锁的复杂性和不完美之处。Redisson作为Redis的高性能客户端,封装了多种锁的实现,使得开发者只需关注业务逻辑。文中详细展示了引入依赖、配置Redisson客户端、实现扣减库存功能的代码示例,并通过JMeter压测验证了其正确性。后续篇章将深入解析Redisson锁实现的源码。
517 0
【📕分布式锁通关指南 05】通过redisson实现分布式锁
|
运维 NoSQL 算法
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理
本文深入探讨了基于Redis实现分布式锁时遇到的细节问题及解决方案。首先,针对锁续期问题,提出了通过独立服务、获取锁进程自己续期和异步线程三种方式,并详细介绍了如何利用Lua脚本和守护线程实现自动续期。接着,解决了锁阻塞问题,引入了带超时时间的`tryLock`机制,确保在高并发场景下不会无限等待锁。最后,作为知识扩展,讲解了RedLock算法原理及其在实际业务中的局限性。文章强调,在并发量不高的场景中手写分布式锁可行,但推荐使用更成熟的Redisson框架来实现分布式锁,以保证系统的稳定性和可靠性。
927 0
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
NoSQL 关系型数据库 MySQL
分布式系统学习9:分布式锁
本文介绍了分布式系统中分布式锁的概念、实现方式及其应用场景。分布式锁用于在多个独立的JVM进程间确保资源的互斥访问,具备互斥、高可用、可重入和超时机制等特点。文章详细讲解了三种常见的分布式锁实现方式:基于Redis、Zookeeper和关系型数据库(如MySQL)。其中,Redis适合高性能场景,推荐使用Redisson库;Zookeeper适用于对一致性要求较高的场景,建议基于Curator框架实现;而基于数据库的方式性能较低,实际开发中较少使用。此外,还探讨了乐观锁和悲观锁的区别及适用场景,并介绍了如何通过Lua脚本和Redis的`SET`命令实现原子操作,以及Redisson的自动续期机
1265 7
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1