zookeeper实现分布式锁

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 接口:import java.util.concurrent.TimeUnit;public interface DistributedLock { public void getLock() throws Exception; public b...

接口:

import java.util.concurrent.TimeUnit;

public interface DistributedLock {

    public void getLock() throws Exception;

    public boolean getLock(long time,TimeUnit timeUnit) throws Exception;

    public void releaseLock();

}

核心公用代码:

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class BaseDistributedLock {

    private final static Logger logger = LoggerFactory.getLogger(BaseDistributedLock.class);

    private final ZkClient client;
    private final String lockPath;
    private final String baseLockName;
    private final String lockName;
    private ThreadPoolTaskExecutor taskExecutor;
    private static final Integer MAX_RETRY_COUNT = 10;


    public BaseDistributedLock(ZkClient zkClient,String baseLockName,String lockName,ThreadPoolTaskExecutor taskExecutor){
        this.client = zkClient;
        this.baseLockName = baseLockName;
        this.lockName = lockName;
        this.lockPath = baseLockName.concat("/").concat(lockName);
        this.taskExecutor = taskExecutor;
    }

    /**
     * 获取锁资源
     * @param time
     * @param timeUnit
     * @return
     * @throws Exception 
     */
    public String doGetLock(long time, TimeUnit timeUnit) throws Exception{
        String path = null;
        if(timeUnit != null){
            //存在超时时间
            Future<String> future = taskExecutor.submit(new GetLock());
            try {
                path = future.get(time, timeUnit);
            } catch (TimeoutException e) {
                //超时则中断获取锁的方法
                future.cancel(true);
            }
        }else {
            //没有超时时间
            path = waitLock();
        }
        return path;
    }

    /**
     * 创建path,调用锁等待方法
     * @return
     * @throws Exception
     */
    public String waitLock()throws Exception{
        int retrycount = 1;
        String path = null;
        //创建零时节点,重试次数为10次
        try {
            path = createLockNode();
        } catch (Exception e) {
            if(retrycount++ < MAX_RETRY_COUNT){
                createLockNode();
            }else {
                throw e;
            }
        }
        return doWaitLock(path);
    }

    /**
     * 等待锁资源(核心逻辑)
     * @return 创建的路径
     * @throws Exception 
     */
    public String doWaitLock(String path) throws Exception{

        boolean getTheLock = false;
        boolean doDelete = false;

        //判断是否获取到锁资源
        try {
            while (!getTheLock) {
                //获取该baseLockName路径下的所有子节点,并排序
                List<String> children = getSortedChildren();
                String sequenceNodeName = path.substring(baseLockName.length()+1);

                int pathIndex = children.indexOf(sequenceNodeName);
                if(pathIndex < 0){
                    throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);
                }

                if(pathIndex == 0){
                    //获取到锁资源
                    getTheLock = true;
                    return path;
                }else {
                    //获取前一个节点的path
                    String previousPath = baseLockName.concat("/").concat(children.get(pathIndex-1));
                    //监听该节点
                    final CountDownLatch latch = new CountDownLatch(1);
                    final IZkDataListener previousListener = new IZkDataListener() {
                        // 次小节点删除事件发生时,让countDownLatch结束等待
                        // 此时还需要重新让程序回到while,重新判断一次!
                        public void handleDataDeleted(String dataPath)
                                throws Exception {
                            latch.countDown();
                        }

                        public void handleDataChange(String dataPath,
                                Object data) throws Exception {
                            return;
                        }
                    };

                    try {
                        client.subscribeDataChanges(previousPath,
                                previousListener);
                        latch.await();
                    } catch (ZkNoNodeException e) {
                        logger.error("注册监听事件出现异常:"+e.getMessage());
                    } finally {
                        client.unsubscribeDataChanges(previousPath,
                                previousListener);
                    }
                }
            }
        } catch (Exception e) {
            doDelete = true;
            throw e;
        }finally {
            if(doDelete){
                deletePath(path);
            }

        }
        return null;
    }

    /**
     * 创建临时节点
     * @return
     * @throws Exception
     */
    private String createLockNode()throws Exception {
        return client.createEphemeralSequential(lockPath, null);
    }

    /**
     * 如果该路径存在,那么就删除
     * @param path
     */
    public void deletePath(String path){
        client.delete(path);
    }

    /**
     * 另起一个线程去获取锁资源,主线程监听超时时间
     * @author 13376
     *
     */
    class GetLock implements Callable<String>{

        @Override
        public String call() throws Exception {
            return waitLock();
        }

    }

    /**
     * 顺序获取所有的子节点
     * @return
     * @throws Exception
     */
    private List<String> getSortedChildren() throws Exception {
        try {
            List<String> children = client.getChildren(baseLockName);
            Collections.sort(children, new Comparator<String>() {
                public int compare(String lhs, String rhs) {
                    return getLockNodeNumber(lhs, lockName).compareTo(
                            getLockNodeNumber(rhs, lockName));
                }
            });
            return children;

        } catch (ZkNoNodeException e) {
            client.createPersistent(baseLockName, true);
            return getSortedChildren();
        }
    }

    /**
     * 获取零时节点名后面的自增长的数字
     * @param str
     * @param lockName
     * @return
     */
    private String getLockNodeNumber(String str, String lockName) {
        int index = str.lastIndexOf(lockName);
        if (index >= 0) {
            index += lockName.length();
            return index <= str.length() ? str.substring(index) : "";
        }
        return str;
    }
}

对外暴露的接口方法:

import java.util.concurrent.TimeUnit;

import org.I0Itec.zkclient.ZkClient;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class ZooDistributedLock extends BaseDistributedLock implements
        DistributedLock {

    private static final String LOCK_NAME = "lock-";
    private String path;

    public ZooDistributedLock(ZkClient zkClient,String baseLockName,ThreadPoolTaskExecutor taskExecutor){
        super(zkClient,baseLockName,LOCK_NAME,taskExecutor);
    }

    @Override
    public void getLock() throws Exception {
        if(!getLock(-1,null)){
            throw new RuntimeException("连接异常,"+path+"路径下获取所失败");
        }
    }

    @Override
    public boolean getLock(long time, TimeUnit timeUnit) throws Exception {
        path = this.doGetLock(time,timeUnit);
        return path != null;
    }

    @Override
    public void releaseLock() {
        this.deletePath(path);
    }

}
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
2月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
2月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
1月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
43 2
|
1月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
45 1
|
2月前
分布式-Zookeeper-数据订阅
分布式-Zookeeper-数据订阅
|
2月前
|
监控
分布式-Zookeeper-Zab协议
分布式-Zookeeper-Zab协议
|
1月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
47 0
|
2月前
|
Java
分布式-Zookeeper-分布式锁
分布式-Zookeeper-分布式锁
|
2月前
|
存储 负载均衡 算法
分布式-Zookeeper-Master选举
分布式-Zookeeper-Master选举
|
2月前
|
存储 负载均衡 Dubbo
分布式-Zookeeper(一)
分布式-Zookeeper(一)