【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁-升级版

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介:

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁中,我们通过利用ZooKeeper的临时节点特性,实现了一个分布式锁。
但是是通过轮询的方式去判断不断尝试获取锁,空转对于CPU还是有一定消耗的,同时,对于多个线程竞争锁激烈的时候,很容易出现羊群效应。

为了解决上面两个问题。本文来看一下如何实现一个升级版的分布式锁。

设计

我们依然实现java.util.concurrent.locks.Lock接口。
和上一文中实现方式不同的是,我们使用ZooKeeper的EPHEMERAL_SEQUENTIAL临时顺序节点。
当首次获取锁时,会创建一个临时节点,如果这个临时节点末尾数字是当前父节点下同名节点中最小的,则获取锁成功。
否则,则监听上一个数字较大的节点,直到上一个节点被释放,则再次尝试获取锁成功。这样可以避免多个线程同时获取一把锁造成的竞争。
同时使用了ZooKeeper提供的watch功能,避免了轮询带来的CPU空转。
获取锁后使用一个volatile int类型的state进行计数,来实现锁的可重入机制。

DistributedFairLock

public class DistributedFairLock implements Lock {
    private static Logger logger = LoggerFactory.getLogger(DistributedFairLock.class);

    //ZooKeeper客户端,进行ZooKeeper操作
    private ZooKeeper zooKeeper;

    //根节点名称
    private String dir;

    //加锁节点
    private String node;

    //ZooKeeper鉴权信息
    private List<ACL> acls;

    //要加锁节点
    private String fullPath;

    //加锁标识,为0时表示未获取到锁,每获取一次锁则加一,释放锁时减一。减到0时断开连接,删除临时节点。
    private volatile int state;

    //当前锁创建的节点id
    private String id;

    //通过CountDownLatch阻塞,直到监听上一节点被取消,再进行后续操作
    private CountDownLatch countDownLatch;

    /**
     * Constructor.
     *
     * @param zooKeeper the zoo keeper
     * @param dir       the dir
     * @param node      the node
     * @param acls      the acls
     */
    public DistributedFairLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
        this.zooKeeper = zooKeeper;
        this.dir = dir;
        this.node = node;
        this.acls = acls;
        this.fullPath = dir.concat("/").concat(this.node);
        init();
    }

    private void init() {
        try {
            Stat stat = zooKeeper.exists(dir, false);
            if (stat == null) {
                zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            logger.error("[DistributedFairLock#init] error : " + e.toString(), e);
        }
    }
}

lock

public void lock() {
    try {
        //加锁
        synchronized (this) {
            //如果当前未持有锁
            if (state <= 0) {
                //创建节点
                if (id == null) {
                    id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
                }

                //获取当前路径下所有的节点
                List<String> nodes = zooKeeper.getChildren(dir, false);
                SortedSet<String> sortedSet = new TreeSet<>();
                for (String node : nodes) {
                    sortedSet.add(dir.concat("/").concat(node));
                }

                //获取所有id小于当前节点顺序的节点
                SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);

                if (!lessSet.isEmpty()) {
                    //监听上一个节点,就是通过这里避免多锁竞争和CPU空转,实现公平锁的  
                    Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
                    if (stat != null) {
                        countDownLatch = new CountDownLatch(1);
                        countDownLatch.await();
                    }

                }
            }

            state++;
        }
    } catch (InterruptedException e) {
        logger.error("[DistributedFairLock#lock] error : " + e.toString(), e);
        Thread.currentThread().interrupt();
    } catch (KeeperException ke) {
        logger.error("[DistributedFairLock#lock] error : " + ke.toString(), ke);
        if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
            Thread.currentThread().interrupt();
        }
    }
}

tryLock

public boolean tryLock() {
    try {
        synchronized (this) {
            if (state <= 0) {
                if (id == null) {
                    id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
                }

                List<String> nodes = zooKeeper.getChildren(dir, false);
                SortedSet<String> sortedSet = new TreeSet<>();
                for (String node : nodes) {
                    sortedSet.add(dir.concat("/").concat(node));
                }


                SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);

                if (!lessSet.isEmpty()) {
                    return false;
                }
            }
            state++;
        }
    } catch (InterruptedException e) {
        logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
        return false;
    } catch (KeeperException ke) {
        logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
        if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
            return false;
        }
    }
    return true;
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    try {
        synchronized (this) {
            if (state <= 0) {
                if (id == null) {
                    id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
                }

                List<String> nodes = zooKeeper.getChildren(dir, false);
                SortedSet<String> sortedSet = new TreeSet<>();
                for (String node : nodes) {
                    sortedSet.add(dir.concat("/").concat(node));
                }


                SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);

                if (!lessSet.isEmpty()) {
                    Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
                    if (stat != null) {
                        countDownLatch = new CountDownLatch(1);
                        countDownLatch.await(time, unit);
                    }

                }
            }

            state++;
        }
    } catch (InterruptedException e) {
        logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
        return false;
    } catch (KeeperException ke) {
        logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
        if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
            return false;
        }
    }
    return true;
}

unlock

public void unlock() {
    synchronized (this) {
        if (state > 0) {
            state--;
        }
        //当不再持有锁时,删除创建的临时节点
        if (state == 0 && zooKeeper != null) {
            try {
                zooKeeper.delete(id, -1);
                id = null;
            } catch (Exception e) {
                logger.error("[DistributedFairLock#unlock] error : " + e.toString(), e);
            }
        }
    }
}

LockWatcher

private class LockWatcher implements Watcher {
    @Override
    public void process(WatchedEvent event) {
        synchronized (this) {
            if (countDownLatch != null) {
                countDownLatch.countDown();
            }
        }
    }
}

总结

上面就是我们改良后,通过临时顺序节点和watch机制实现的公平可重入分布式锁。
源代码可见:aloofJr
通过watch机制避免轮询带来的CPU空转。
通过顺序临时节点避免了羊群效应。

如果对以上方式有更好的优化方案,欢迎一起讨论。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
3月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
3月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
2月前
|
消息中间件 关系型数据库 Java
‘分布式事务‘ 圣经:从入门到精通,架构师尼恩最新、最全详解 (50+图文4万字全面总结 )
本文 是 基于尼恩之前写的一篇 分布式事务的文章 升级而来 , 尼恩之前写的 分布式事务的文章, 在全网阅读量 100万次以上 , 被很多培训机构 作为 顶级教程。 此文修改了 老版本的 一个大bug , 大家不要再看老版本啦。
|
2月前
|
NoSQL Java Redis
开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!
本文详细介绍了 Redisson 延迟队列(DelayedQueue)的实现原理,包括基本使用、内部数据结构、基本流程、发送和获取延时消息以及初始化延时队列等内容。文章通过代码示例和流程图,逐步解析了延迟消息的发送、接收及处理机制,帮助读者深入了解 Redisson 延迟队列的工作原理。
|
2月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
45 2
|
2月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
48 1
|
3月前
|
Dubbo Java 应用服务中间件
分布式-dubbo的入门
分布式-dubbo的入门
|
2月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
49 0
|
3月前
|
Java
分布式-Zookeeper-分布式锁
分布式-Zookeeper-分布式锁
|
2月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
下一篇
无影云桌面