zookeeper实现分布式锁

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: 接口:import java.util.concurrent.TimeUnit;public interface DistributedLock { public void getLock() throws Exception; public b...

接口:

import java.util.concurrent.TimeUnit;

public interface DistributedLock {

    public void getLock() throws Exception;

    public boolean getLock(long time,TimeUnit timeUnit) throws Exception;

    public void releaseLock();

}

核心公用代码:

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class BaseDistributedLock {

    private final static Logger logger = LoggerFactory.getLogger(BaseDistributedLock.class);

    private final ZkClient client;
    private final String lockPath;
    private final String baseLockName;
    private final String lockName;
    private ThreadPoolTaskExecutor taskExecutor;
    private static final Integer MAX_RETRY_COUNT = 10;


    public BaseDistributedLock(ZkClient zkClient,String baseLockName,String lockName,ThreadPoolTaskExecutor taskExecutor){
        this.client = zkClient;
        this.baseLockName = baseLockName;
        this.lockName = lockName;
        this.lockPath = baseLockName.concat("/").concat(lockName);
        this.taskExecutor = taskExecutor;
    }

    /**
     * 获取锁资源
     * @param time
     * @param timeUnit
     * @return
     * @throws Exception 
     */
    public String doGetLock(long time, TimeUnit timeUnit) throws Exception{
        String path = null;
        if(timeUnit != null){
            //存在超时时间
            Future<String> future = taskExecutor.submit(new GetLock());
            try {
                path = future.get(time, timeUnit);
            } catch (TimeoutException e) {
                //超时则中断获取锁的方法
                future.cancel(true);
            }
        }else {
            //没有超时时间
            path = waitLock();
        }
        return path;
    }

    /**
     * 创建path,调用锁等待方法
     * @return
     * @throws Exception
     */
    public String waitLock()throws Exception{
        int retrycount = 1;
        String path = null;
        //创建零时节点,重试次数为10次
        try {
            path = createLockNode();
        } catch (Exception e) {
            if(retrycount++ < MAX_RETRY_COUNT){
                createLockNode();
            }else {
                throw e;
            }
        }
        return doWaitLock(path);
    }

    /**
     * 等待锁资源(核心逻辑)
     * @return 创建的路径
     * @throws Exception 
     */
    public String doWaitLock(String path) throws Exception{

        boolean getTheLock = false;
        boolean doDelete = false;

        //判断是否获取到锁资源
        try {
            while (!getTheLock) {
                //获取该baseLockName路径下的所有子节点,并排序
                List<String> children = getSortedChildren();
                String sequenceNodeName = path.substring(baseLockName.length()+1);

                int pathIndex = children.indexOf(sequenceNodeName);
                if(pathIndex < 0){
                    throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);
                }

                if(pathIndex == 0){
                    //获取到锁资源
                    getTheLock = true;
                    return path;
                }else {
                    //获取前一个节点的path
                    String previousPath = baseLockName.concat("/").concat(children.get(pathIndex-1));
                    //监听该节点
                    final CountDownLatch latch = new CountDownLatch(1);
                    final IZkDataListener previousListener = new IZkDataListener() {
                        // 次小节点删除事件发生时,让countDownLatch结束等待
                        // 此时还需要重新让程序回到while,重新判断一次!
                        public void handleDataDeleted(String dataPath)
                                throws Exception {
                            latch.countDown();
                        }

                        public void handleDataChange(String dataPath,
                                Object data) throws Exception {
                            return;
                        }
                    };

                    try {
                        client.subscribeDataChanges(previousPath,
                                previousListener);
                        latch.await();
                    } catch (ZkNoNodeException e) {
                        logger.error("注册监听事件出现异常:"+e.getMessage());
                    } finally {
                        client.unsubscribeDataChanges(previousPath,
                                previousListener);
                    }
                }
            }
        } catch (Exception e) {
            doDelete = true;
            throw e;
        }finally {
            if(doDelete){
                deletePath(path);
            }

        }
        return null;
    }

    /**
     * 创建临时节点
     * @return
     * @throws Exception
     */
    private String createLockNode()throws Exception {
        return client.createEphemeralSequential(lockPath, null);
    }

    /**
     * 如果该路径存在,那么就删除
     * @param path
     */
    public void deletePath(String path){
        client.delete(path);
    }

    /**
     * 另起一个线程去获取锁资源,主线程监听超时时间
     * @author 13376
     *
     */
    class GetLock implements Callable<String>{

        @Override
        public String call() throws Exception {
            return waitLock();
        }

    }

    /**
     * 顺序获取所有的子节点
     * @return
     * @throws Exception
     */
    private List<String> getSortedChildren() throws Exception {
        try {
            List<String> children = client.getChildren(baseLockName);
            Collections.sort(children, new Comparator<String>() {
                public int compare(String lhs, String rhs) {
                    return getLockNodeNumber(lhs, lockName).compareTo(
                            getLockNodeNumber(rhs, lockName));
                }
            });
            return children;

        } catch (ZkNoNodeException e) {
            client.createPersistent(baseLockName, true);
            return getSortedChildren();
        }
    }

    /**
     * 获取零时节点名后面的自增长的数字
     * @param str
     * @param lockName
     * @return
     */
    private String getLockNodeNumber(String str, String lockName) {
        int index = str.lastIndexOf(lockName);
        if (index >= 0) {
            index += lockName.length();
            return index <= str.length() ? str.substring(index) : "";
        }
        return str;
    }
}

对外暴露的接口方法:

import java.util.concurrent.TimeUnit;

import org.I0Itec.zkclient.ZkClient;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class ZooDistributedLock extends BaseDistributedLock implements
        DistributedLock {

    private static final String LOCK_NAME = "lock-";
    private String path;

    public ZooDistributedLock(ZkClient zkClient,String baseLockName,ThreadPoolTaskExecutor taskExecutor){
        super(zkClient,baseLockName,LOCK_NAME,taskExecutor);
    }

    @Override
    public void getLock() throws Exception {
        if(!getLock(-1,null)){
            throw new RuntimeException("连接异常,"+path+"路径下获取所失败");
        }
    }

    @Override
    public boolean getLock(long time, TimeUnit timeUnit) throws Exception {
        path = this.doGetLock(time,timeUnit);
        return path != null;
    }

    @Override
    public void releaseLock() {
        this.deletePath(path);
    }

}
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
2月前
|
监控 NoSQL Java
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
|
2月前
|
算法 前端开发
|
2月前
|
NoSQL 前端开发 算法
Redis问题之Redis分布式锁与Zookeeper分布式锁有何不同
Redis问题之Redis分布式锁与Zookeeper分布式锁有何不同
|
3月前
|
Shell 虚拟化
分布式系统详解--框架(Zookeeper-基本shell命令)
分布式系统详解--框架(Zookeeper-基本shell命令)
40 1
|
2月前
|
安全 Java
使用Zookeeper实现分布式锁的最佳实践
使用Zookeeper实现分布式锁的最佳实践
|
3月前
|
缓存 NoSQL 数据库
分布式系统面试全集通第一篇(dubbo+redis+zookeeper----分布式+CAP+BASE+分布式事务+分布式锁)
分布式系统面试全集通第一篇(dubbo+redis+zookeeper----分布式+CAP+BASE+分布式事务+分布式锁)
83 0
|
3月前
|
设计模式 监控 安全
一文搞懂:zookeeper实现分布式锁安全用法
一文搞懂:zookeeper实现分布式锁安全用法
37 0
|
3月前
|
Java 网络安全
分布式系统详解--框架(Zookeeper-简介和集群搭建)
分布式系统详解--框架(Zookeeper-简介和集群搭建)
120 0
|
19天前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
53 2
基于Redis的高可用分布式锁——RedLock
|
27天前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】

热门文章

最新文章