开发者社区> 牧小农> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

ZK(ZooKeeper)分布式锁实现(2)

简介: ZK(ZooKeeper)分布式锁实现
+关注继续查看

代码实现


使用ZooKeeper 创建临时顺序节点来实现分布式锁,大体的流程就是 先创建一个持久父节点,在当前节点下,创建临时顺序节点,找出最小的序列号,获取分布式锁,程序业务完成之后释放锁,通知下一个节点进行操作,使用的是watch来监控节点的变化,然后依次下一个最小序列节点进行操作。


首先我们需要创建一个持久父类节点:我这里是 /mxn


image.png

WatchCallBack

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;


/**
 * @program: mxnzookeeper
 * @ClassName WatchCallBack
 * @description:
 * @author: 微信搜索:牧小农
 * @create: 2021-10-23 10:48
 * @Version 1.0
 **/
public class WatchCallBack  implements Watcher, AsyncCallback.StringCallback ,AsyncCallback.Children2Callback ,AsyncCallback.StatCallback {

    ZooKeeper zk ;
    String threadName;
    CountDownLatch cc = new CountDownLatch(1);
    String pathName;

    public String getPathName() {
        return pathName;
    }

    public void setPathName(String pathName) {
        this.pathName = pathName;
    }

    public String getThreadName() {
        return threadName;
    }

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public ZooKeeper getZk() {
        return zk;
    }

    public void setZk(ZooKeeper zk) {
        this.zk = zk;
    }

    /** @Author 牧小农
     * @Description //TODO 尝试加锁方法
     * @Date 16:14 2021/10/24
     * @Param 
     * @return 
     **/
    public void tryLock(){
        try {

            System.out.println(threadName + " 开始创建。。。。");
            //创建一个顺序临时节点
            zk.create("/lock",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"abc");
            //阻塞当前,监听前一个节点是否释放锁
            cc.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /** @Author 牧小农
     * @Description //TODO 解锁方法
     * @Date 16:14 2021/10/24
     * @Param 
     * @return 
     **/
    public void unLock(){
        try {
            //释放锁,删除临时节点
            zk.delete(pathName,-1);
            //结束工作
            System.out.println(threadName + "         结束工作了....");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }


    @Override
    public void process(WatchedEvent event) {

        //如果第一个节点释放了锁,那么第二个就会收到回调
        //告诉它前一个节点释放了,你可以开始尝试获取锁
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                //当前节点重新获取锁
                zk.getChildren("/",false,this ,"sdf");
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
        }

    }

    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if(name != null ){
            System.out.println(threadName  +" 线程创建了一个节点为 : " +  name );
            pathName =  name ;
            //监听前一个节点
            zk.getChildren("/",false,this ,"sdf");
        }

    }

    //getChildren  call back
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {

        //节点按照编号,升序排列
        Collections.sort(children);
        //对节点进行截取例如  /lock0000000022 截取后就是  lock0000000022
        int i = children.indexOf(pathName.substring(1));


        //是不是第一个,也就是说是不是最小的
        if(i == 0){
            //是第一个
            System.out.println(threadName +" 现在我是最小的....");
            try {
                zk.setData("/",threadName.getBytes(),-1);
                cc.countDown();

            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else{
            //不是第一个
            //监听前一个节点 看它是不是完成了工作进行释放锁了
            zk.exists("/"+children.get(i-1),this,this,"sdf");
        }

    }

    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        //判断是否失败exists
    }
}


TestLock

import com.mxn.zookeeper.config.ZKUtils;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;


/**
 * @program: mxnzookeeper
 * @ClassName TestLock
 * @description:
 * @author: 微信搜索:牧小农
 * @create: 2021-10-23 10:45
 * @Version 1.0
 **/
public class TestLock {


    ZooKeeper zk ;

    @Before
    public void conn (){
        zk  = ZKUtils.getZK();
    }

    @After
    public void close (){
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void lock(){

        //创建十个线程
        for (int i = 0; i < 10; i++) {
            new Thread(){
                @Override
                public void run() {
                    WatchCallBack watchCallBack = new WatchCallBack();
                    watchCallBack.setZk(zk);
                    String threadName = Thread.currentThread().getName();
                    watchCallBack.setThreadName(threadName);
                    //线程进行抢锁操作
                    watchCallBack.tryLock();
                    try {
                        //进行业务逻辑处理
                        System.out.println(threadName+"         开始处理业务逻辑了...");
                        Thread.sleep(200);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                    //释放锁
                    watchCallBack.unLock();


                }
            }.start();
        }


        while(true){

        }

    }

}

运行结果:

Thread-1 线程创建了一个节点为 : /lock0000000112
Thread-5 线程创建了一个节点为 : /lock0000000113
Thread-2 线程创建了一个节点为 : /lock0000000114
Thread-6 线程创建了一个节点为 : /lock0000000115
Thread-9 线程创建了一个节点为 : /lock0000000116
Thread-4 线程创建了一个节点为 : /lock0000000117
Thread-7 线程创建了一个节点为 : /lock0000000118
Thread-3 线程创建了一个节点为 : /lock0000000119
Thread-8 线程创建了一个节点为 : /lock0000000120
Thread-0 线程创建了一个节点为 : /lock0000000121
Thread-1 现在我是最小的....
Thread-1         开始处理业务逻辑了...
Thread-1         结束工作了....
Thread-5 现在我是最小的....
Thread-5         开始处理业务逻辑了...
Thread-5         结束工作了....
Thread-2 现在我是最小的....
Thread-2         开始处理业务逻辑了...
Thread-2         结束工作了....
Thread-6 现在我是最小的....
Thread-6         开始处理业务逻辑了...
Thread-6         结束工作了....
Thread-9 现在我是最小的....
Thread-9         开始处理业务逻辑了...
Thread-9         结束工作了....
Thread-4 现在我是最小的....
Thread-4         开始处理业务逻辑了...
Thread-4         结束工作了....
Thread-7 现在我是最小的....
Thread-7         开始处理业务逻辑了...
Thread-7         结束工作了....
Thread-3 现在我是最小的....
Thread-3         开始处理业务逻辑了...
Thread-3         结束工作了....
Thread-8 现在我是最小的....
Thread-8         开始处理业务逻辑了...
Thread-8         结束工作了....
Thread-0 现在我是最小的....
Thread-0         开始处理业务逻辑了...
Thread-0         结束工作了....


总结


ZK分布式锁,能够有效的解决分布式、不可重入的问题,在上面的案例中我, 没有实现可重入锁,但是实现起来也不麻烦,只需要带上线程信息等唯一标识,判断一下就可以了


ZK实现分布式锁具有天然的优势,临时顺序节点,可以有效的避免死锁问题,让客户端断开,那么就会删除当前临时节点,让下一个节点进行工作。


如果文中有错误或者不了解的地方,欢迎留言,小农看见了会第一时间回复大家,大家加油


我是牧小农,一个卑微的打工人,如果觉得文中的内容对你有帮助,记得一键三连啊,你们的三连是小农最大的动力。


我是牧小农,怕什么真理无穷,进一步 有进一步的欢喜,大家加油~


版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
zookeeper实现分布式锁实战
zookeeper实现分布式锁实战
515 0
zookeeper分布式锁原理及实现(2)
zookeeper分布式锁原理及实现(2)
30 0
zookeeper分布式锁原理及实现(1)
zookeeper分布式锁原理及实现(1)
28 0
Zookeeper怎么实现分布式锁?
最近在学习 Zookeeper,在刚开始接触 Zookeeper 的时候,完全不知道 Zookeeper 有什么用。且很多资料都是将 Zookeeper 描述成一个“类 Unix/Linux 文件系统”的中间件,导致我很难将类 Unix/Linux 文件系统的 Zookeeper 和分布式应用联系在一起。 后来在粗读了《ZooKeeper 分布式过程协同技术详解》和《从Paxos到Zookeeper 分布式一致性原理与实践》两本书,并动手写了一些 CURD demo 后,初步对 Zookeeper 有了一定的了解。
44 0
zookeeper 实现分布式锁安全用法
背景 ConnectionLoss 链接丢失 SessionExpired 会话过期 绕开 zookeeper broker 进行状态通知 leader 选举与zkNode 断开 做好幂等 静态扩容、动态扩容 背景 分布式锁现在用的越来越多,通常用来协调多个并发任务。
1633 0
zookeeper分布式锁
摘要:分享牛原创,zookeeper使用,zookeeper锁在实际项目开发中还是很常用的,在这里我们介绍一下zookeeper分布式锁的使用,以及我们如何zookeeper分布式锁的原理。
616 0
+关注
牧小农
业精于勤荒于嬉,行成于思毁于随。
134
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载