【分布式系统】Curator 实现 Zookeeper 分布式锁

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 【分布式系统】Curator 实现 Zookeeper 分布式锁

1.Curator简介

官网的说法:curator是一个Java/JVM客户端库,用于zookeeper,一个分布式协调服务。它包括一个高级API框架和实用程序,使ApacheZooKeeper的使用更加简单和可靠。它还包括常见用例和扩展的方法,如服务发现和Java8异步DSL。

官方使用文档:Apache Curator –

个人使用手册:Curator使用手册 - 腾讯云开发者社区-
腾讯云

配置zookeeper集群参考;https://blog.csdn.net/m0_63748493/article/details/125776295

2.共享锁和排他锁

排它锁,又称独占锁,独享锁 synchronized就是一个排它锁
共享锁,又称为读锁,获得共享锁后,可以查看,但无法删除和修改数 据, 其他线程此时业获取到共享锁,也可以查看但是 无法修改和 删除数据
共享锁和排它锁典型是ReentranReadWriteLock 其中,读锁是共享锁,写锁是 排它锁

要么多读,要么一写,二者不可共存

3.实现共享锁

1.IDEA创建maven项目,pom.xml添加如下依赖

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.8.0</version>
        </dependency>

        <dependency>
            <groupId>curator</groupId>
            <artifactId>curator</artifactId>
            <version>0.0.7</version>
        </dependency>

        <dependency>
            <groupId>cn.itlym.shoulder</groupId>
            <artifactId>lombok</artifactId>
            <version>0.1</version>
        </dependency>

2.鼠标右击,重新构建项目

3.创建CuratorCRUD类,和ZKShareLock类

public class CuratorCRUD {

    public   CuratorFramework curatorFramework;
    public   String ip="192.168.159.151:2181,192.168.159.151:2182,192.168.159.151:2183";

    public CuratorCRUD() {
        createZkCuratorConnection();
    }

    /**
     * 创建客户端连接
     */
    public  void  createZkCuratorConnection(){
        curatorFramework=CuratorFrameworkFactory
                .builder()
                .connectString(ip)
                .retryPolicy(new ExponentialBackoffRetry(1000,3))
                .build();
        curatorFramework.start();
    }

    /**
     * 关闭客户端连接
     */
    public  void deleteZkCuratorConnection(){
        curatorFramework.close();
    }
}


public class ZKShareLock extends Thread{

    private Object o=new Object();
    private CuratorFramework curatorFramework;
    private String basePath="/ShareLocks";
    private String userName=basePath+"/User-";
    private String cname;//客户端名字

    public ZKShareLock(CuratorFramework curatorFramework,String cname) {
        this.curatorFramework = curatorFramework;
        this.cname=cname;
    }

    @Override
    public void run() {
        try {
            //创建节点并获取节点名字
            //得到完整目录/ShareLocks/User-0000000092
            String nodeName = curatorFramework
                    .create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                    .forPath(userName, cname.getBytes());
            System.out.println("创捷节点成功");
            System.out.println(nodeName);
            System.out.println();
            //获取目录下子节点
            List<String> tempNodeNames = curatorFramework.getChildren().forPath(basePath);
            List<String> nodeNames =new ArrayList();

            for (int i = 0; i < tempNodeNames.size(); i++) {
                String name =tempNodeNames.get(i);
                name=basePath+"/"+name;
                nodeNames.add(name);
            }
            Collections.sort(nodeNames);
            int index=nodeNames.indexOf(nodeName);
            System.out.printf("index =%d  \n",index);
            if(index==0){
                doSomthings(nodeName);
                unlock(nodeName);
            }else {
                addWatcherWithTreeCache(nodeNames.get(index-1));

                synchronized (o){
                    o.wait();
                }

                doSomthings(nodeName);
                unlock(nodeName);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        curatorFramework.close();

    }
    //添加监听器
    public   void addWatcherWithTreeCache(String path) throws Exception {
        TreeCache treeCache=new TreeCache(curatorFramework,path);
        TreeCacheListener treeCacheListene=new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                if(treeCacheEvent.getType()==TreeCacheEvent.Type.NODE_REMOVED){
                    System.out.printf("%s delete\n",path);
                    synchronized (o){
                        o.notify();
                    }
                    System.out.printf("%s notify\n",path);
                }
            }
        };
        treeCache.getListenable().addListener(treeCacheListene);
        treeCache.start();
    }

    public void unlock(String nodeName){
        System.out.printf("%s unlock",nodeName);
    }

    public void doSomthings(String nodeName){
        System.out.printf("%s doSomthings",nodeName);
    }

    public static void main(String[] args) {
        Date date=new Date();
        for (int i = 0; i < 100; i++) {
            new ZKShareLock(new CuratorCRUD().curatorFramework,"用户 "+i).start();
        }
        Date date1=new Date();
        System.out.println(date1.getTime()-date.getTime());
    }
}

运行之前,需要在zookeeper客户端创建/ShareLocks节点,且上面的ip修改为自己的集群ip

部分运行结果:

4.实现排他锁

Curator中封装了一种分布式可重入排他锁:InterProcessMutex

创建CuratorMutex类,并在zookeeper中创建/Mutex节点

public class CuratorMutex implements Runnable{

    public  CuratorFramework curatorFramework;
    public  String basePath;
    public InterProcessLock processLock;
    public int idx;

    public CuratorMutex(CuratorFramework curatorFramework, String basePath, int idx) {
        this.curatorFramework = curatorFramework;
        this.basePath = basePath;
        this.processLock=new InterProcessMutex(curatorFramework,basePath);
        this.idx=idx;
    }

    @Override
    public void run() {
        Logger logger= Logger.getLogger("");
        try {
            //线程加锁
            processLock.acquire(1000, TimeUnit.SECONDS);
            logger.info(String.format("线程%d获取锁",idx));
            Thread.sleep(10);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //线程解锁
            try {
                processLock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
            logger.info(String.format("线程%d释放锁",idx));
        }
    }

    public static void main(String[] args) {
        CuratorFramework framework = new CuratorCRUD().curatorFramework;
        for (int i = 0; i < 20; i++) {
            new Thread(new CuratorMutex(framework,"/Mutex",i)).start();
        }
        while (true){

        }
    }
}

部分运行结果:

下面将从源码里带大家讲解如何实现可重入排他锁

acquire方法内部实际实际上调用了internalLock方法

private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */
        //获取当前线程,并获取LockData锁信息
        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            //lockCount自增,锁重入
            lockData.lockCount.incrementAndGet();
            return true;
        }
        //获取锁
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            //创建锁,并将锁信息存放到threadData这个Map中
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }

public void release() throws Exception
    {
        /*
            Note on concurrency: a given lockData instance
            can be only acted on by a single thread so locking isn't necessary
         */

        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData == null )
        {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
        //减少重入次数
        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 )
        {
            return;
        }
        if ( newLockCount < 0 )
        {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try
        {    //释放锁
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {    //从Map中移除该线程
            threadData.remove(currentThread);
        }
    }
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
1月前
|
缓存 NoSQL Java
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
59 3
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
|
1月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
43 2
|
1月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
47 1
|
1月前
|
SQL NoSQL 安全
分布式环境的分布式锁 - Redlock方案
【10月更文挑战第2天】Redlock方案是一种分布式锁实现,通过在多个独立的Redis实例上加锁来提高容错性和可靠性。客户端需从大多数节点成功加锁且总耗时小于锁的过期时间,才能视为加锁成功。然而,该方案受到分布式专家Martin的质疑,指出其在特定异常情况下(如网络延迟、进程暂停、时钟偏移)可能导致锁失效,影响系统的正确性。Martin建议采用fencing token方案,以确保分布式锁的正确性和安全性。
43 0
|
1月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
47 0
|
1月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
3月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
110 2
基于Redis的高可用分布式锁——RedLock
|
3月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
|
7天前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
40 16
|
1月前
|
NoSQL Redis 数据库
计数器 分布式锁 redis实现
【10月更文挑战第5天】
47 1