zookeeper实现分布式锁

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 接口:import java.util.concurrent.TimeUnit;public interface DistributedLock { public void getLock() throws Exception; public b...

接口:

import java.util.concurrent.TimeUnit;

public interface DistributedLock {

    public void getLock() throws Exception;

    public boolean getLock(long time,TimeUnit timeUnit) throws Exception;

    public void releaseLock();

}

核心公用代码:

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class BaseDistributedLock {

    private final static Logger logger = LoggerFactory.getLogger(BaseDistributedLock.class);

    private final ZkClient client;
    private final String lockPath;
    private final String baseLockName;
    private final String lockName;
    private ThreadPoolTaskExecutor taskExecutor;
    private static final Integer MAX_RETRY_COUNT = 10;


    public BaseDistributedLock(ZkClient zkClient,String baseLockName,String lockName,ThreadPoolTaskExecutor taskExecutor){
        this.client = zkClient;
        this.baseLockName = baseLockName;
        this.lockName = lockName;
        this.lockPath = baseLockName.concat("/").concat(lockName);
        this.taskExecutor = taskExecutor;
    }

    /**
     * 获取锁资源
     * @param time
     * @param timeUnit
     * @return
     * @throws Exception 
     */
    public String doGetLock(long time, TimeUnit timeUnit) throws Exception{
        String path = null;
        if(timeUnit != null){
            //存在超时时间
            Future<String> future = taskExecutor.submit(new GetLock());
            try {
                path = future.get(time, timeUnit);
            } catch (TimeoutException e) {
                //超时则中断获取锁的方法
                future.cancel(true);
            }
        }else {
            //没有超时时间
            path = waitLock();
        }
        return path;
    }

    /**
     * 创建path,调用锁等待方法
     * @return
     * @throws Exception
     */
    public String waitLock()throws Exception{
        int retrycount = 1;
        String path = null;
        //创建零时节点,重试次数为10次
        try {
            path = createLockNode();
        } catch (Exception e) {
            if(retrycount++ < MAX_RETRY_COUNT){
                createLockNode();
            }else {
                throw e;
            }
        }
        return doWaitLock(path);
    }

    /**
     * 等待锁资源(核心逻辑)
     * @return 创建的路径
     * @throws Exception 
     */
    public String doWaitLock(String path) throws Exception{

        boolean getTheLock = false;
        boolean doDelete = false;

        //判断是否获取到锁资源
        try {
            while (!getTheLock) {
                //获取该baseLockName路径下的所有子节点,并排序
                List<String> children = getSortedChildren();
                String sequenceNodeName = path.substring(baseLockName.length()+1);

                int pathIndex = children.indexOf(sequenceNodeName);
                if(pathIndex < 0){
                    throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);
                }

                if(pathIndex == 0){
                    //获取到锁资源
                    getTheLock = true;
                    return path;
                }else {
                    //获取前一个节点的path
                    String previousPath = baseLockName.concat("/").concat(children.get(pathIndex-1));
                    //监听该节点
                    final CountDownLatch latch = new CountDownLatch(1);
                    final IZkDataListener previousListener = new IZkDataListener() {
                        // 次小节点删除事件发生时,让countDownLatch结束等待
                        // 此时还需要重新让程序回到while,重新判断一次!
                        public void handleDataDeleted(String dataPath)
                                throws Exception {
                            latch.countDown();
                        }

                        public void handleDataChange(String dataPath,
                                Object data) throws Exception {
                            return;
                        }
                    };

                    try {
                        client.subscribeDataChanges(previousPath,
                                previousListener);
                        latch.await();
                    } catch (ZkNoNodeException e) {
                        logger.error("注册监听事件出现异常:"+e.getMessage());
                    } finally {
                        client.unsubscribeDataChanges(previousPath,
                                previousListener);
                    }
                }
            }
        } catch (Exception e) {
            doDelete = true;
            throw e;
        }finally {
            if(doDelete){
                deletePath(path);
            }

        }
        return null;
    }

    /**
     * 创建临时节点
     * @return
     * @throws Exception
     */
    private String createLockNode()throws Exception {
        return client.createEphemeralSequential(lockPath, null);
    }

    /**
     * 如果该路径存在,那么就删除
     * @param path
     */
    public void deletePath(String path){
        client.delete(path);
    }

    /**
     * 另起一个线程去获取锁资源,主线程监听超时时间
     * @author 13376
     *
     */
    class GetLock implements Callable<String>{

        @Override
        public String call() throws Exception {
            return waitLock();
        }

    }

    /**
     * 顺序获取所有的子节点
     * @return
     * @throws Exception
     */
    private List<String> getSortedChildren() throws Exception {
        try {
            List<String> children = client.getChildren(baseLockName);
            Collections.sort(children, new Comparator<String>() {
                public int compare(String lhs, String rhs) {
                    return getLockNodeNumber(lhs, lockName).compareTo(
                            getLockNodeNumber(rhs, lockName));
                }
            });
            return children;

        } catch (ZkNoNodeException e) {
            client.createPersistent(baseLockName, true);
            return getSortedChildren();
        }
    }

    /**
     * 获取零时节点名后面的自增长的数字
     * @param str
     * @param lockName
     * @return
     */
    private String getLockNodeNumber(String str, String lockName) {
        int index = str.lastIndexOf(lockName);
        if (index >= 0) {
            index += lockName.length();
            return index <= str.length() ? str.substring(index) : "";
        }
        return str;
    }
}

对外暴露的接口方法:

import java.util.concurrent.TimeUnit;

import org.I0Itec.zkclient.ZkClient;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class ZooDistributedLock extends BaseDistributedLock implements
        DistributedLock {

    private static final String LOCK_NAME = "lock-";
    private String path;

    public ZooDistributedLock(ZkClient zkClient,String baseLockName,ThreadPoolTaskExecutor taskExecutor){
        super(zkClient,baseLockName,LOCK_NAME,taskExecutor);
    }

    @Override
    public void getLock() throws Exception {
        if(!getLock(-1,null)){
            throw new RuntimeException("连接异常,"+path+"路径下获取所失败");
        }
    }

    @Override
    public boolean getLock(long time, TimeUnit timeUnit) throws Exception {
        path = this.doGetLock(time,timeUnit);
        return path != null;
    }

    @Override
    public void releaseLock() {
        this.deletePath(path);
    }

}
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
2月前
|
监控 NoSQL Java
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
|
2月前
|
算法 前端开发
|
2月前
|
NoSQL 前端开发 算法
Redis问题之Redis分布式锁与Zookeeper分布式锁有何不同
Redis问题之Redis分布式锁与Zookeeper分布式锁有何不同
|
3月前
|
Shell 虚拟化
分布式系统详解--框架(Zookeeper-基本shell命令)
分布式系统详解--框架(Zookeeper-基本shell命令)
38 1
|
2月前
|
安全 Java
使用Zookeeper实现分布式锁的最佳实践
使用Zookeeper实现分布式锁的最佳实践
|
3月前
|
缓存 NoSQL 数据库
分布式系统面试全集通第一篇(dubbo+redis+zookeeper----分布式+CAP+BASE+分布式事务+分布式锁)
分布式系统面试全集通第一篇(dubbo+redis+zookeeper----分布式+CAP+BASE+分布式事务+分布式锁)
81 0
|
3月前
|
设计模式 监控 安全
一文搞懂:zookeeper实现分布式锁安全用法
一文搞懂:zookeeper实现分布式锁安全用法
34 0
|
3月前
|
Java 网络安全
分布式系统详解--框架(Zookeeper-简介和集群搭建)
分布式系统详解--框架(Zookeeper-简介和集群搭建)
119 0
|
4月前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
484 2
|
4月前
|
监控 Dubbo 前端开发
快速入门分布式系统与Dubbo+zookeeper Demo
快速入门分布式系统与Dubbo+zookeeper Demo
468 0
下一篇
DDNS