【分布式系统】Curator 实现 Zookeeper 分布式锁

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 【分布式系统】Curator 实现 Zookeeper 分布式锁

1.Curator简介

官网的说法:curator是一个Java/JVM客户端库,用于zookeeper,一个分布式协调服务。它包括一个高级API框架和实用程序,使ApacheZooKeeper的使用更加简单和可靠。它还包括常见用例和扩展的方法,如服务发现和Java8异步DSL。

官方使用文档:Apache Curator –

个人使用手册:Curator使用手册 - 腾讯云开发者社区-
腾讯云

配置zookeeper集群参考;https://blog.csdn.net/m0_63748493/article/details/125776295

2.共享锁和排他锁

排它锁,又称独占锁,独享锁 synchronized就是一个排它锁
共享锁,又称为读锁,获得共享锁后,可以查看,但无法删除和修改数 据, 其他线程此时业获取到共享锁,也可以查看但是 无法修改和 删除数据
共享锁和排它锁典型是ReentranReadWriteLock 其中,读锁是共享锁,写锁是 排它锁

要么多读,要么一写,二者不可共存

3.实现共享锁

1.IDEA创建maven项目,pom.xml添加如下依赖

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.8.0</version>
        </dependency>

        <dependency>
            <groupId>curator</groupId>
            <artifactId>curator</artifactId>
            <version>0.0.7</version>
        </dependency>

        <dependency>
            <groupId>cn.itlym.shoulder</groupId>
            <artifactId>lombok</artifactId>
            <version>0.1</version>
        </dependency>

2.鼠标右击,重新构建项目

3.创建CuratorCRUD类,和ZKShareLock类

public class CuratorCRUD {

    public   CuratorFramework curatorFramework;
    public   String ip="192.168.159.151:2181,192.168.159.151:2182,192.168.159.151:2183";

    public CuratorCRUD() {
        createZkCuratorConnection();
    }

    /**
     * 创建客户端连接
     */
    public  void  createZkCuratorConnection(){
        curatorFramework=CuratorFrameworkFactory
                .builder()
                .connectString(ip)
                .retryPolicy(new ExponentialBackoffRetry(1000,3))
                .build();
        curatorFramework.start();
    }

    /**
     * 关闭客户端连接
     */
    public  void deleteZkCuratorConnection(){
        curatorFramework.close();
    }
}


public class ZKShareLock extends Thread{

    private Object o=new Object();
    private CuratorFramework curatorFramework;
    private String basePath="/ShareLocks";
    private String userName=basePath+"/User-";
    private String cname;//客户端名字

    public ZKShareLock(CuratorFramework curatorFramework,String cname) {
        this.curatorFramework = curatorFramework;
        this.cname=cname;
    }

    @Override
    public void run() {
        try {
            //创建节点并获取节点名字
            //得到完整目录/ShareLocks/User-0000000092
            String nodeName = curatorFramework
                    .create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                    .forPath(userName, cname.getBytes());
            System.out.println("创捷节点成功");
            System.out.println(nodeName);
            System.out.println();
            //获取目录下子节点
            List<String> tempNodeNames = curatorFramework.getChildren().forPath(basePath);
            List<String> nodeNames =new ArrayList();

            for (int i = 0; i < tempNodeNames.size(); i++) {
                String name =tempNodeNames.get(i);
                name=basePath+"/"+name;
                nodeNames.add(name);
            }
            Collections.sort(nodeNames);
            int index=nodeNames.indexOf(nodeName);
            System.out.printf("index =%d  \n",index);
            if(index==0){
                doSomthings(nodeName);
                unlock(nodeName);
            }else {
                addWatcherWithTreeCache(nodeNames.get(index-1));

                synchronized (o){
                    o.wait();
                }

                doSomthings(nodeName);
                unlock(nodeName);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        curatorFramework.close();

    }
    //添加监听器
    public   void addWatcherWithTreeCache(String path) throws Exception {
        TreeCache treeCache=new TreeCache(curatorFramework,path);
        TreeCacheListener treeCacheListene=new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                if(treeCacheEvent.getType()==TreeCacheEvent.Type.NODE_REMOVED){
                    System.out.printf("%s delete\n",path);
                    synchronized (o){
                        o.notify();
                    }
                    System.out.printf("%s notify\n",path);
                }
            }
        };
        treeCache.getListenable().addListener(treeCacheListene);
        treeCache.start();
    }

    public void unlock(String nodeName){
        System.out.printf("%s unlock",nodeName);
    }

    public void doSomthings(String nodeName){
        System.out.printf("%s doSomthings",nodeName);
    }

    public static void main(String[] args) {
        Date date=new Date();
        for (int i = 0; i < 100; i++) {
            new ZKShareLock(new CuratorCRUD().curatorFramework,"用户 "+i).start();
        }
        Date date1=new Date();
        System.out.println(date1.getTime()-date.getTime());
    }
}

运行之前,需要在zookeeper客户端创建/ShareLocks节点,且上面的ip修改为自己的集群ip

部分运行结果:

4.实现排他锁

Curator中封装了一种分布式可重入排他锁:InterProcessMutex

创建CuratorMutex类,并在zookeeper中创建/Mutex节点

public class CuratorMutex implements Runnable{

    public  CuratorFramework curatorFramework;
    public  String basePath;
    public InterProcessLock processLock;
    public int idx;

    public CuratorMutex(CuratorFramework curatorFramework, String basePath, int idx) {
        this.curatorFramework = curatorFramework;
        this.basePath = basePath;
        this.processLock=new InterProcessMutex(curatorFramework,basePath);
        this.idx=idx;
    }

    @Override
    public void run() {
        Logger logger= Logger.getLogger("");
        try {
            //线程加锁
            processLock.acquire(1000, TimeUnit.SECONDS);
            logger.info(String.format("线程%d获取锁",idx));
            Thread.sleep(10);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //线程解锁
            try {
                processLock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
            logger.info(String.format("线程%d释放锁",idx));
        }
    }

    public static void main(String[] args) {
        CuratorFramework framework = new CuratorCRUD().curatorFramework;
        for (int i = 0; i < 20; i++) {
            new Thread(new CuratorMutex(framework,"/Mutex",i)).start();
        }
        while (true){

        }
    }
}

部分运行结果:

下面将从源码里带大家讲解如何实现可重入排他锁

acquire方法内部实际实际上调用了internalLock方法

private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */
        //获取当前线程,并获取LockData锁信息
        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            //lockCount自增,锁重入
            lockData.lockCount.incrementAndGet();
            return true;
        }
        //获取锁
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            //创建锁,并将锁信息存放到threadData这个Map中
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }

public void release() throws Exception
    {
        /*
            Note on concurrency: a given lockData instance
            can be only acted on by a single thread so locking isn't necessary
         */

        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData == null )
        {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
        //减少重入次数
        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 )
        {
            return;
        }
        if ( newLockCount < 0 )
        {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try
        {    //释放锁
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {    //从Map中移除该线程
            threadData.remove(currentThread);
        }
    }
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
18天前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
2月前
|
存储 运维 NoSQL
分布式读写锁的奥义:上古世代 ZooKeeper 的进击
本文作者将介绍女娲对社区 ZooKeeper 在分布式读写锁实践细节上的思考,希望帮助大家理解分布式读写锁背后的原理。
|
3月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
65 2
|
4月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
4月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
4月前
|
存储 负载均衡 Dubbo
分布式-Zookeeper(一)
分布式-Zookeeper(一)
|
6月前
|
监控 NoSQL Java
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
|
3月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
66 1
|
4月前
分布式-Zookeeper-数据订阅
分布式-Zookeeper-数据订阅
|
4月前
|
监控
分布式-Zookeeper-Zab协议
分布式-Zookeeper-Zab协议