ZK(ZooKeeper)分布式锁实现(2)

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: ZK(ZooKeeper)分布式锁实现

代码实现


使用ZooKeeper 创建临时顺序节点来实现分布式锁,大体的流程就是 先创建一个持久父节点,在当前节点下,创建临时顺序节点,找出最小的序列号,获取分布式锁,程序业务完成之后释放锁,通知下一个节点进行操作,使用的是watch来监控节点的变化,然后依次下一个最小序列节点进行操作。


首先我们需要创建一个持久父类节点:我这里是 /mxn


image.png

WatchCallBack

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
 * @program: mxnzookeeper
 * @ClassName WatchCallBack
 * @description:
 * @author: 微信搜索:牧小农
 * @create: 2021-10-23 10:48
 * @Version 1.0
 **/
public class WatchCallBack  implements Watcher, AsyncCallback.StringCallback ,AsyncCallback.Children2Callback ,AsyncCallback.StatCallback {
    ZooKeeper zk ;
    String threadName;
    CountDownLatch cc = new CountDownLatch(1);
    String pathName;
    public String getPathName() {
        return pathName;
    }
    public void setPathName(String pathName) {
        this.pathName = pathName;
    }
    public String getThreadName() {
        return threadName;
    }
    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }
    public ZooKeeper getZk() {
        return zk;
    }
    public void setZk(ZooKeeper zk) {
        this.zk = zk;
    }
    /** @Author 牧小农
     * @Description //TODO 尝试加锁方法
     * @Date 16:14 2021/10/24
     * @Param 
     * @return 
     **/
    public void tryLock(){
        try {
            System.out.println(threadName + " 开始创建。。。。");
            //创建一个顺序临时节点
            zk.create("/lock",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"abc");
            //阻塞当前,监听前一个节点是否释放锁
            cc.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    /** @Author 牧小农
     * @Description //TODO 解锁方法
     * @Date 16:14 2021/10/24
     * @Param 
     * @return 
     **/
    public void unLock(){
        try {
            //释放锁,删除临时节点
            zk.delete(pathName,-1);
            //结束工作
            System.out.println(threadName + "         结束工作了....");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void process(WatchedEvent event) {
        //如果第一个节点释放了锁,那么第二个就会收到回调
        //告诉它前一个节点释放了,你可以开始尝试获取锁
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                //当前节点重新获取锁
                zk.getChildren("/",false,this ,"sdf");
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
        }
    }
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if(name != null ){
            System.out.println(threadName  +" 线程创建了一个节点为 : " +  name );
            pathName =  name ;
            //监听前一个节点
            zk.getChildren("/",false,this ,"sdf");
        }
    }
    //getChildren  call back
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        //节点按照编号,升序排列
        Collections.sort(children);
        //对节点进行截取例如  /lock0000000022 截取后就是  lock0000000022
        int i = children.indexOf(pathName.substring(1));
        //是不是第一个,也就是说是不是最小的
        if(i == 0){
            //是第一个
            System.out.println(threadName +" 现在我是最小的....");
            try {
                zk.setData("/",threadName.getBytes(),-1);
                cc.countDown();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else{
            //不是第一个
            //监听前一个节点 看它是不是完成了工作进行释放锁了
            zk.exists("/"+children.get(i-1),this,this,"sdf");
        }
    }
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        //判断是否失败exists
    }
}

TestLock

import com.mxn.zookeeper.config.ZKUtils;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
 * @program: mxnzookeeper
 * @ClassName TestLock
 * @description:
 * @author: 微信搜索:牧小农
 * @create: 2021-10-23 10:45
 * @Version 1.0
 **/
public class TestLock {
    ZooKeeper zk ;
    @Before
    public void conn (){
        zk  = ZKUtils.getZK();
    }
    @After
    public void close (){
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    @Test
    public void lock(){
        //创建十个线程
        for (int i = 0; i < 10; i++) {
            new Thread(){
                @Override
                public void run() {
                    WatchCallBack watchCallBack = new WatchCallBack();
                    watchCallBack.setZk(zk);
                    String threadName = Thread.currentThread().getName();
                    watchCallBack.setThreadName(threadName);
                    //线程进行抢锁操作
                    watchCallBack.tryLock();
                    try {
                        //进行业务逻辑处理
                        System.out.println(threadName+"         开始处理业务逻辑了...");
                        Thread.sleep(200);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                    //释放锁
                    watchCallBack.unLock();
                }
            }.start();
        }
        while(true){
        }
    }
}

运行结果:

Thread-1 线程创建了一个节点为 : /lock0000000112
Thread-5 线程创建了一个节点为 : /lock0000000113
Thread-2 线程创建了一个节点为 : /lock0000000114
Thread-6 线程创建了一个节点为 : /lock0000000115
Thread-9 线程创建了一个节点为 : /lock0000000116
Thread-4 线程创建了一个节点为 : /lock0000000117
Thread-7 线程创建了一个节点为 : /lock0000000118
Thread-3 线程创建了一个节点为 : /lock0000000119
Thread-8 线程创建了一个节点为 : /lock0000000120
Thread-0 线程创建了一个节点为 : /lock0000000121
Thread-1 现在我是最小的....
Thread-1         开始处理业务逻辑了...
Thread-1         结束工作了....
Thread-5 现在我是最小的....
Thread-5         开始处理业务逻辑了...
Thread-5         结束工作了....
Thread-2 现在我是最小的....
Thread-2         开始处理业务逻辑了...
Thread-2         结束工作了....
Thread-6 现在我是最小的....
Thread-6         开始处理业务逻辑了...
Thread-6         结束工作了....
Thread-9 现在我是最小的....
Thread-9         开始处理业务逻辑了...
Thread-9         结束工作了....
Thread-4 现在我是最小的....
Thread-4         开始处理业务逻辑了...
Thread-4         结束工作了....
Thread-7 现在我是最小的....
Thread-7         开始处理业务逻辑了...
Thread-7         结束工作了....
Thread-3 现在我是最小的....
Thread-3         开始处理业务逻辑了...
Thread-3         结束工作了....
Thread-8 现在我是最小的....
Thread-8         开始处理业务逻辑了...
Thread-8         结束工作了....
Thread-0 现在我是最小的....
Thread-0         开始处理业务逻辑了...
Thread-0         结束工作了....

总结


ZK分布式锁,能够有效的解决分布式、不可重入的问题,在上面的案例中我, 没有实现可重入锁,但是实现起来也不麻烦,只需要带上线程信息等唯一标识,判断一下就可以了


ZK实现分布式锁具有天然的优势,临时顺序节点,可以有效的避免死锁问题,让客户端断开,那么就会删除当前临时节点,让下一个节点进行工作。


如果文中有错误或者不了解的地方,欢迎留言,小农看见了会第一时间回复大家,大家加油


我是牧小农,一个卑微的打工人,如果觉得文中的内容对你有帮助,记得一键三连啊,你们的三连是小农最大的动力。


我是牧小农,怕什么真理无穷,进一步 有进一步的欢喜,大家加油~


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
2月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
2月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
1月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
43 2
|
1月前
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
62 1
|
1月前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
39 1
|
1月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
47 1
|
1月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
47 0
|
2月前
|
Java
分布式-Zookeeper-分布式锁
分布式-Zookeeper-分布式锁
|
2月前
|
存储 负载均衡 算法
分布式-Zookeeper-Master选举
分布式-Zookeeper-Master选举
|
2月前
|
存储 负载均衡 Dubbo
分布式-Zookeeper(一)
分布式-Zookeeper(一)