【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁-升级版

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁中,我们通过利用ZooKeeper的临时节点特性,实现了一个分布式锁。
但是是通过轮询的方式去判断不断尝试获取锁,空转对于CPU还是有一定消耗的,同时,对于多个线程竞争锁激烈的时候,很容易出现羊群效应。

为了解决上面两个问题。本文来看一下如何实现一个升级版的分布式锁。

设计

我们依然实现java.util.concurrent.locks.Lock接口。
和上一文中实现方式不同的是,我们使用ZooKeeper的EPHEMERAL_SEQUENTIAL临时顺序节点。
当首次获取锁时,会创建一个临时节点,如果这个临时节点末尾数字是当前父节点下同名节点中最小的,则获取锁成功。
否则,则监听上一个数字较大的节点,直到上一个节点被释放,则再次尝试获取锁成功。这样可以避免多个线程同时获取一把锁造成的竞争。
同时使用了ZooKeeper提供的watch功能,避免了轮询带来的CPU空转。
获取锁后使用一个volatile int类型的state进行计数,来实现锁的可重入机制。

DistributedFairLock

public class DistributedFairLock implements Lock {
    private static Logger logger = LoggerFactory.getLogger(DistributedFairLock.class);

    //ZooKeeper客户端,进行ZooKeeper操作
    private ZooKeeper zooKeeper;

    //根节点名称
    private String dir;

    //加锁节点
    private String node;

    //ZooKeeper鉴权信息
    private List<ACL> acls;

    //要加锁节点
    private String fullPath;

    //加锁标识,为0时表示未获取到锁,每获取一次锁则加一,释放锁时减一。减到0时断开连接,删除临时节点。
    private volatile int state;

    //当前锁创建的节点id
    private String id;

    //通过CountDownLatch阻塞,直到监听上一节点被取消,再进行后续操作
    private CountDownLatch countDownLatch;

    /**
     * Constructor.
     *
     * @param zooKeeper the zoo keeper
     * @param dir       the dir
     * @param node      the node
     * @param acls      the acls
     */
    public DistributedFairLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
        this.zooKeeper = zooKeeper;
        this.dir = dir;
        this.node = node;
        this.acls = acls;
        this.fullPath = dir.concat("/").concat(this.node);
        init();
    }

    private void init() {
        try {
            Stat stat = zooKeeper.exists(dir, false);
            if (stat == null) {
                zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            logger.error("[DistributedFairLock#init] error : " + e.toString(), e);
        }
    }
}

lock

public void lock() {
    try {
        //加锁
        synchronized (this) {
            //如果当前未持有锁
            if (state <= 0) {
                //创建节点
                if (id == null) {
                    id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
                }

                //获取当前路径下所有的节点
                List<String> nodes = zooKeeper.getChildren(dir, false);
                SortedSet<String> sortedSet = new TreeSet<>();
                for (String node : nodes) {
                    sortedSet.add(dir.concat("/").concat(node));
                }

                //获取所有id小于当前节点顺序的节点
                SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);

                if (!lessSet.isEmpty()) {
                    //监听上一个节点,就是通过这里避免多锁竞争和CPU空转,实现公平锁的  
                    Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
                    if (stat != null) {
                        countDownLatch = new CountDownLatch(1);
                        countDownLatch.await();
                    }

                }
            }

            state++;
        }
    } catch (InterruptedException e) {
        logger.error("[DistributedFairLock#lock] error : " + e.toString(), e);
        Thread.currentThread().interrupt();
    } catch (KeeperException ke) {
        logger.error("[DistributedFairLock#lock] error : " + ke.toString(), ke);
        if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
            Thread.currentThread().interrupt();
        }
    }
}

tryLock

public boolean tryLock() {
    try {
        synchronized (this) {
            if (state <= 0) {
                if (id == null) {
                    id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
                }

                List<String> nodes = zooKeeper.getChildren(dir, false);
                SortedSet<String> sortedSet = new TreeSet<>();
                for (String node : nodes) {
                    sortedSet.add(dir.concat("/").concat(node));
                }


                SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);

                if (!lessSet.isEmpty()) {
                    return false;
                }
            }
            state++;
        }
    } catch (InterruptedException e) {
        logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
        return false;
    } catch (KeeperException ke) {
        logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
        if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
            return false;
        }
    }
    return true;
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    try {
        synchronized (this) {
            if (state <= 0) {
                if (id == null) {
                    id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
                }

                List<String> nodes = zooKeeper.getChildren(dir, false);
                SortedSet<String> sortedSet = new TreeSet<>();
                for (String node : nodes) {
                    sortedSet.add(dir.concat("/").concat(node));
                }


                SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);

                if (!lessSet.isEmpty()) {
                    Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
                    if (stat != null) {
                        countDownLatch = new CountDownLatch(1);
                        countDownLatch.await(time, unit);
                    }

                }
            }

            state++;
        }
    } catch (InterruptedException e) {
        logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
        return false;
    } catch (KeeperException ke) {
        logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
        if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
            return false;
        }
    }
    return true;
}

unlock

public void unlock() {
    synchronized (this) {
        if (state > 0) {
            state--;
        }
        //当不再持有锁时,删除创建的临时节点
        if (state == 0 && zooKeeper != null) {
            try {
                zooKeeper.delete(id, -1);
                id = null;
            } catch (Exception e) {
                logger.error("[DistributedFairLock#unlock] error : " + e.toString(), e);
            }
        }
    }
}

LockWatcher

private class LockWatcher implements Watcher {
    @Override
    public void process(WatchedEvent event) {
        synchronized (this) {
            if (countDownLatch != null) {
                countDownLatch.countDown();
            }
        }
    }
}

总结

上面就是我们改良后,通过临时顺序节点和watch机制实现的公平可重入分布式锁。
源代码可见:aloofJr
通过watch机制避免轮询带来的CPU空转。
通过顺序临时节点避免了羊群效应。

如果对以上方式有更好的优化方案,欢迎一起讨论。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
5天前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
135 2
|
5天前
|
存储 分布式计算 大数据
HBase分布式数据库关键技术与实战:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析了HBase的核心技术,包括数据模型、分布式架构、访问模式和一致性保证,并探讨了其实战应用,如大规模数据存储、实时数据分析及与Hadoop、Spark集成。同时,分享了面试经验,对比了HBase与其他数据库的差异,提出了应对挑战的解决方案,展望了HBase的未来趋势。通过Java API代码示例,帮助读者巩固理解。全面了解和掌握HBase,能为面试和实际工作中的大数据处理提供坚实基础。
55 3
|
5天前
|
监控 Dubbo 前端开发
快速入门分布式系统与Dubbo+zookeeper Demo
快速入门分布式系统与Dubbo+zookeeper Demo
44 0
|
5天前
|
监控 NoSQL Java
Zookeeper分布式锁
Zookeeper分布式锁
92 1
|
3天前
|
前端开发 JavaScript 算法
分布式系统的一致性级别划分及Zookeeper一致性级别分析
分布式系统的一致性级别划分及Zookeeper一致性级别分析
|
5天前
|
存储 大数据 Apache
深入理解ZooKeeper:分布式协调服务的核心与实践
【5月更文挑战第7天】ZooKeeper是Apache的分布式协调服务,确保大规模分布式系统中的数据一致性与高可用性。其特点包括强一致性、高可用性、可靠性、顺序性和实时性。使用ZooKeeper涉及安装配置、启动服务、客户端连接及执行操作。实际应用中,面临性能瓶颈、不可伸缩性和单点故障等问题,可通过水平扩展、集成其他服务和多集群备份来解决。理解ZooKeeper原理和实践,有助于构建高效分布式系统。
|
5天前
|
监控 NoSQL 算法
探秘Redis分布式锁:实战与注意事项
本文介绍了Redis分区容错中的分布式锁概念,包括利用Watch实现乐观锁和使用setnx防止库存超卖。乐观锁通过Watch命令监控键值变化,在事务中执行修改,若键值被改变则事务失败。Java代码示例展示了具体实现。setnx命令用于库存操作,确保无超卖,通过设置锁并检查库存来更新。文章还讨论了分布式锁存在的问题,如客户端阻塞、时钟漂移和单点故障,并提出了RedLock算法来提高可靠性。Redisson作为生产环境的分布式锁实现,提供了可重入锁、读写锁等高级功能。最后,文章对比了Redis、Zookeeper和etcd的分布式锁特性。
136 16
探秘Redis分布式锁:实战与注意事项
|
5天前
|
Java 网络安全 Apache
搭建Zookeeper集群:三台服务器,一场分布式之舞
搭建Zookeeper集群:三台服务器,一场分布式之舞
53 0
|
5天前
|
缓存 应用服务中间件 数据库
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
53 1
|
5天前
|
存储 缓存 监控
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(场景问题分析+性能影响因素)
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(场景问题分析+性能影响因素)
47 0