【分布式系统】Curator 实现 Zookeeper 分布式锁

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【分布式系统】Curator 实现 Zookeeper 分布式锁

1.Curator简介

官网的说法:curator是一个Java/JVM客户端库,用于zookeeper,一个分布式协调服务。它包括一个高级API框架和实用程序,使ApacheZooKeeper的使用更加简单和可靠。它还包括常见用例和扩展的方法,如服务发现和Java8异步DSL。

官方使用文档:Apache Curator –

个人使用手册:Curator使用手册 - 腾讯云开发者社区-
腾讯云

配置zookeeper集群参考;https://blog.csdn.net/m0_63748493/article/details/125776295

2.共享锁和排他锁

排它锁,又称独占锁,独享锁 synchronized就是一个排它锁
共享锁,又称为读锁,获得共享锁后,可以查看,但无法删除和修改数 据, 其他线程此时业获取到共享锁,也可以查看但是 无法修改和 删除数据
共享锁和排它锁典型是ReentranReadWriteLock 其中,读锁是共享锁,写锁是 排它锁

要么多读,要么一写,二者不可共存

3.实现共享锁

1.IDEA创建maven项目,pom.xml添加如下依赖

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.8.0</version>
        </dependency>

        <dependency>
            <groupId>curator</groupId>
            <artifactId>curator</artifactId>
            <version>0.0.7</version>
        </dependency>

        <dependency>
            <groupId>cn.itlym.shoulder</groupId>
            <artifactId>lombok</artifactId>
            <version>0.1</version>
        </dependency>

2.鼠标右击,重新构建项目

3.创建CuratorCRUD类,和ZKShareLock类

public class CuratorCRUD {

    public   CuratorFramework curatorFramework;
    public   String ip="192.168.159.151:2181,192.168.159.151:2182,192.168.159.151:2183";

    public CuratorCRUD() {
        createZkCuratorConnection();
    }

    /**
     * 创建客户端连接
     */
    public  void  createZkCuratorConnection(){
        curatorFramework=CuratorFrameworkFactory
                .builder()
                .connectString(ip)
                .retryPolicy(new ExponentialBackoffRetry(1000,3))
                .build();
        curatorFramework.start();
    }

    /**
     * 关闭客户端连接
     */
    public  void deleteZkCuratorConnection(){
        curatorFramework.close();
    }
}


public class ZKShareLock extends Thread{

    private Object o=new Object();
    private CuratorFramework curatorFramework;
    private String basePath="/ShareLocks";
    private String userName=basePath+"/User-";
    private String cname;//客户端名字

    public ZKShareLock(CuratorFramework curatorFramework,String cname) {
        this.curatorFramework = curatorFramework;
        this.cname=cname;
    }

    @Override
    public void run() {
        try {
            //创建节点并获取节点名字
            //得到完整目录/ShareLocks/User-0000000092
            String nodeName = curatorFramework
                    .create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                    .forPath(userName, cname.getBytes());
            System.out.println("创捷节点成功");
            System.out.println(nodeName);
            System.out.println();
            //获取目录下子节点
            List<String> tempNodeNames = curatorFramework.getChildren().forPath(basePath);
            List<String> nodeNames =new ArrayList();

            for (int i = 0; i < tempNodeNames.size(); i++) {
                String name =tempNodeNames.get(i);
                name=basePath+"/"+name;
                nodeNames.add(name);
            }
            Collections.sort(nodeNames);
            int index=nodeNames.indexOf(nodeName);
            System.out.printf("index =%d  \n",index);
            if(index==0){
                doSomthings(nodeName);
                unlock(nodeName);
            }else {
                addWatcherWithTreeCache(nodeNames.get(index-1));

                synchronized (o){
                    o.wait();
                }

                doSomthings(nodeName);
                unlock(nodeName);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        curatorFramework.close();

    }
    //添加监听器
    public   void addWatcherWithTreeCache(String path) throws Exception {
        TreeCache treeCache=new TreeCache(curatorFramework,path);
        TreeCacheListener treeCacheListene=new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                if(treeCacheEvent.getType()==TreeCacheEvent.Type.NODE_REMOVED){
                    System.out.printf("%s delete\n",path);
                    synchronized (o){
                        o.notify();
                    }
                    System.out.printf("%s notify\n",path);
                }
            }
        };
        treeCache.getListenable().addListener(treeCacheListene);
        treeCache.start();
    }

    public void unlock(String nodeName){
        System.out.printf("%s unlock",nodeName);
    }

    public void doSomthings(String nodeName){
        System.out.printf("%s doSomthings",nodeName);
    }

    public static void main(String[] args) {
        Date date=new Date();
        for (int i = 0; i < 100; i++) {
            new ZKShareLock(new CuratorCRUD().curatorFramework,"用户 "+i).start();
        }
        Date date1=new Date();
        System.out.println(date1.getTime()-date.getTime());
    }
}

运行之前,需要在zookeeper客户端创建/ShareLocks节点,且上面的ip修改为自己的集群ip

部分运行结果:

4.实现排他锁

Curator中封装了一种分布式可重入排他锁:InterProcessMutex

创建CuratorMutex类,并在zookeeper中创建/Mutex节点

public class CuratorMutex implements Runnable{

    public  CuratorFramework curatorFramework;
    public  String basePath;
    public InterProcessLock processLock;
    public int idx;

    public CuratorMutex(CuratorFramework curatorFramework, String basePath, int idx) {
        this.curatorFramework = curatorFramework;
        this.basePath = basePath;
        this.processLock=new InterProcessMutex(curatorFramework,basePath);
        this.idx=idx;
    }

    @Override
    public void run() {
        Logger logger= Logger.getLogger("");
        try {
            //线程加锁
            processLock.acquire(1000, TimeUnit.SECONDS);
            logger.info(String.format("线程%d获取锁",idx));
            Thread.sleep(10);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //线程解锁
            try {
                processLock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
            logger.info(String.format("线程%d释放锁",idx));
        }
    }

    public static void main(String[] args) {
        CuratorFramework framework = new CuratorCRUD().curatorFramework;
        for (int i = 0; i < 20; i++) {
            new Thread(new CuratorMutex(framework,"/Mutex",i)).start();
        }
        while (true){

        }
    }
}

部分运行结果:

下面将从源码里带大家讲解如何实现可重入排他锁

acquire方法内部实际实际上调用了internalLock方法

private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */
        //获取当前线程,并获取LockData锁信息
        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            //lockCount自增,锁重入
            lockData.lockCount.incrementAndGet();
            return true;
        }
        //获取锁
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            //创建锁,并将锁信息存放到threadData这个Map中
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }

public void release() throws Exception
    {
        /*
            Note on concurrency: a given lockData instance
            can be only acted on by a single thread so locking isn't necessary
         */

        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData == null )
        {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
        //减少重入次数
        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 )
        {
            return;
        }
        if ( newLockCount < 0 )
        {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try
        {    //释放锁
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {    //从Map中移除该线程
            threadData.remove(currentThread);
        }
    }
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
21天前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
411 2
|
19天前
|
前端开发 JavaScript 算法
分布式系统的一致性级别划分及Zookeeper一致性级别分析
分布式系统的一致性级别划分及Zookeeper一致性级别分析
|
21天前
|
存储 大数据 Apache
深入理解ZooKeeper:分布式协调服务的核心与实践
【5月更文挑战第7天】ZooKeeper是Apache的分布式协调服务,确保大规模分布式系统中的数据一致性与高可用性。其特点包括强一致性、高可用性、可靠性、顺序性和实时性。使用ZooKeeper涉及安装配置、启动服务、客户端连接及执行操作。实际应用中,面临性能瓶颈、不可伸缩性和单点故障等问题,可通过水平扩展、集成其他服务和多集群备份来解决。理解ZooKeeper原理和实践,有助于构建高效分布式系统。
|
21天前
|
缓存 NoSQL Java
【亮剑】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护,如何使用注解来实现 Redis 分布式锁的功能?
【4月更文挑战第30天】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护。基于 Redis 的分布式锁利用 SETNX 或 SET 命令实现,并考虑自动过期、可重入及原子性以确保可靠性。在 Java Spring Boot 中,可通过 `@EnableCaching`、`@Cacheable` 和 `@CacheEvict` 注解轻松实现 Redis 分布式锁功能。
|
21天前
|
Dubbo Java 应用服务中间件
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
224 0
|
21天前
|
消息中间件 大数据 分布式数据库
分布式事务:构建可靠分布式系统的基石
【4月更文挑战第21天】分布式事务是确保现代分布式系统数据一致性和完整性的关键技术,涉及多服务协调,面临网络延迟、故障和数据一致性等问题。本文探讨了分布式事务的原理,包括两阶段提交、三阶段提交、分布式锁和补偿机制等解决方案,并阐述其在微服务、分布式数据库和消息队列等场景的应用。面对挑战,我们需要持续优化分布式事务处理机制。
|
21天前
|
监控 Dubbo 前端开发
快速入门分布式系统与Dubbo+zookeeper Demo
快速入门分布式系统与Dubbo+zookeeper Demo
412 0
|
21天前
|
消息中间件 Java 网络安全
JAVAEE分布式技术之Zookeeper的第一次课
JAVAEE分布式技术之Zookeeper的第一次课
518 0
|
21天前
|
监控 NoSQL Java
Zookeeper分布式锁
Zookeeper分布式锁
506 1
|
21天前
|
监控 Dubbo Java
深入理解Zookeeper系列-2.Zookeeper基本使用和分布式锁原理
深入理解Zookeeper系列-2.Zookeeper基本使用和分布式锁原理
69 0