【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 前言上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列中,我们一起写了下如何通过ZooKeeper的持久性顺序节点实现一个分布式队列。本文我们来一起写一个ZooKeeper的实现的分布式锁。

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列中,我们一起写了下如何通过ZooKeeper的持久性顺序节点实现一个分布式队列。
本文我们来一起写一个ZooKeeper的实现的分布式锁。

设计

参考之前学习的【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock,实现java.util.concurrent.locks.Lock接口。
我们通过重写接口中的方法实现一个可重入锁。

  • lock:请求锁,如果成功则直接返回,不成功则阻塞 直到获取锁。
  • lockInterruptibly:请求锁,如果失败则一直阻塞等待 直到获取锁或线程中断
  • tryLock:1、尝试获取锁,获取失败的话 直接返回false,不会再等待。2、尝试获取锁,获取成功返回true,否则一直请求,直到超时返回false
  • unlock:释放锁

我们使用ZooKeeper的EPHEMERAL临时节点机制,如果能创建成功的话,则获取锁成功,释放锁或客户端断开连接后,临时节点自动删除,这样可以避免误删除或漏删除的情况。

获取锁失败后,这里我们使用轮询的方式来不断尝试创建。其实应该使用Watcher机制来实现,这样能避免大量的无用请求。在下一节更优雅的分布式锁实现机制中我们会用到。

DistributedLock

public class DistributedLock implements Lock {
    private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class);

    //ZooKeeper客户端,进行ZooKeeper操作
    private ZooKeeper zooKeeper;

    //根节点名称
    private String dir;

    //加锁节点
    private String node;

    //ZooKeeper鉴权信息
    private List<ACL> acls;

    //要加锁节点
    private String fullPath;

    //加锁标识,为0时表示未获取到锁,每获取一次锁则加一,释放锁时减一。减到0时断开连接,删除临时节点。
    private volatile int state;

    /**
     * Constructor.
     *
     * @param zooKeeper the zoo keeper
     * @param dir       the dir
     * @param node      the node
     * @param acls      the acls
     */
    public DistributedLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
        this.zooKeeper = zooKeeper;
        this.dir = dir;
        this.node = node;
        this.acls = acls;
        this.fullPath = dir.concat("/").concat(node);
        init();
    }

    private void init() {
        try {
            Stat stat = zooKeeper.exists(dir, false);
            if (stat == null) {
                zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            logger.error("[DistributedLock#init] error : " + e.toString(), e);
        }
    }
}

lock

public void lock() {
    //通过state实现重入机制,如果已经获取锁,则将state++即可。
    if (addLockCount()) {
        return;
    }
    //一直尝试获取锁,知道获取成功
    for (;;) {
        try {
            //创建临时节点
            zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL);
            //第一次获取锁,state++,这里不需要使用加锁机制保证原子性,因为同一时间,最多只有一个线程能create节点成功。
            state++;
            break;
        } catch (InterruptedException ie) {
            //如果捕获中断异常,则设置当前线程为中断状态
            logger.error("[DistributedLock#lock] error : " + ie.toString(), ie);
            Thread.currentThread().interrupt();
        } catch (KeeperException ke) {
            //如果捕获到的异常是 节点已存在 外的其他异常,则设置当前线程为中断状态
            logger.error("[DistributedLock#lock] error : " + ke.toString(), ke);
            if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

lockInterruptibly

public void lockInterruptibly() throws InterruptedException {
    //通过state实现重入机制,如果已经获取锁,则将state++即可。
    if (addLockCount()) {
        return;
    }
    for (;;) {
        //如果当前线程为中断状态,则抛出中断异常
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        try {
            zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL);
            state++;
            break;
        } catch (InterruptedException ie) {
            //如果捕获中断异常,则设置当前线程为中断状态
            logger.error("[DistributedLock#lockInterruptibly] error : " + ie.toString(), ie);
            Thread.currentThread().interrupt();
        } catch (KeeperException ke) {
            //如果捕获到的异常是 节点已存在 外的其他异常,则设置当前线程为中断状态
            logger.error("[DistributedLock#lockInterruptibly] error : " + ke.toString(), ke);
            if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

tryLock

public boolean tryLock() {
    //通过state实现重入机制,如果已经获取锁,则将state++即可。
    if (addLockCount()) {
        return true;
    }
    //如果获取成功则返回true,失败则返回false
    try {
        zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL);
        state++;
        return true;
    } catch (Exception e) {
        logger.error("[DistributedLock#tryLock] error : " + e.toString(), e);
    }

    return false;
}


public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    //通过state实现重入机制,如果已经获取锁,则将state++即可。
    if (addLockCount()) {
        return true;
    }

    //如果尝试获取超时,则返回false
    long nanosTimeout = unit.toNanos(time);
    if (nanosTimeout <= 0L) {
        return false;
    }

    final long deadline = System.nanoTime() + nanosTimeout;
    for (;;) {
        //如果当前线程为中断状态,则抛出中断异常
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }

        //如果尝试获取超时,则返回false
        nanosTimeout = deadline - System.nanoTime();
        if (nanosTimeout <= 0L) {
            return false;
        }
        try {
            zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL);
            state++;
            return true;
        } catch (InterruptedException ie) {
            //如果捕获中断异常,则返回false
            logger.error("[DistributedLock#tryLock] error : " + ie.toString(), ie);
            return false;
        } catch (KeeperException ke) {
            //如果捕获到的异常是 节点已存在 外的其他异常,则返回false
            logger.error("[DistributedLock#tryLock] error : " + ke.toString(), ke);
            if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
                return false;
            }
        }
    }
}

unlock

public void unlock() {
    //通过state实现重入机制,如果已经获取锁,释放锁时,需要将state--。
    delLockCount();
    
    //如果state为0时,说明不再持有锁,需要将连接关闭,自动删除临时节点
    if (state == 0 && zooKeeper != null) {
        try {
            zooKeeper.close();
        } catch (InterruptedException e) {
            logger.error("[DistributedLock#unlock] error : " + e.toString(), e);
        }
    }
}

addLockCount

private boolean addLockCount() {
    //如果state大于0,即已持有锁,将state数量加一
    if (state > 0) {
        synchronized (this) {
            if (state > 0) {
                state++;
                return true;
            }
        }
    }
    return false;
}

delLockCount

private boolean delLockCount() {
    //如果state大于0,即还持有锁,将state数量减一
    if (state > 0) {
        synchronized (this) {
            if (state > 0) {
                state--;
                return true;
            }
        }
    }
    return false;
}

总结

上面就是一个通过ZooKeeper实现的分布式可重入锁,利用了临时节点的特性。源代码可见:aloofJr
其中有几个可以优化的点。

  • 轮询的方式换成Watcher机制
  • 可重入锁实现方式的优化
  • 所有线程竞争一个节点的创建,容易出现羊群效应,且是一种不公平的锁竞争模式

下节我们使用新的方式实现分布式锁来解决上面的几个问题,如果大家好的优化建议,欢迎一起讨论。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
13天前
|
数据管理 API 调度
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
HarmonyOS Next 是华为新一代操作系统,专注于分布式技术的深度应用与生态融合。本文通过技术特点、应用场景及实战案例,全面解析其核心技术架构与开发流程。重点介绍分布式软总线2.0、数据管理、任务调度等升级特性,并提供基于 ArkTS 的原生开发支持。通过开发跨设备协同音乐播放应用,展示分布式能力的实际应用,涵盖项目配置、主界面设计、分布式服务实现及部署调试步骤。此外,深入分析分布式数据同步原理、任务调度优化及常见问题解决方案,帮助开发者掌握 HarmonyOS Next 的核心技术和实战技巧。
141 76
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
|
13天前
|
物联网 调度 vr&ar
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
鸿蒙技术分享:HarmonyOS Next 深度解析 随着万物互联时代的到来,华为发布的 HarmonyOS Next 在技术架构和生态体验上实现了重大升级。本文从技术架构、生态优势和开发实践三方面深入探讨其特点,并通过跨设备笔记应用实战案例,展示其强大的分布式能力和多设备协作功能。核心亮点包括新一代微内核架构、统一开发语言 ArkTS 和多模态交互支持。开发者可借助 DevEco Studio 4.0 快速上手,体验高效、灵活的开发过程。 239个字符
180 13
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
|
10天前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
20天前
|
NoSQL Java Redis
秒杀抢购场景下实战JVM级别锁与分布式锁
在电商系统中,秒杀抢购活动是一种常见的营销手段。它通过设定极低的价格和有限的商品数量,吸引大量用户在特定时间点抢购,从而迅速增加销量、提升品牌曝光度和用户活跃度。然而,这种活动也对系统的性能和稳定性提出了极高的要求。特别是在秒杀开始的瞬间,系统需要处理海量的并发请求,同时确保数据的准确性和一致性。 为了解决这些问题,系统开发者们引入了锁机制。锁机制是一种用于控制对共享资源的并发访问的技术,它能够确保在同一时间只有一个进程或线程能够操作某个资源,从而避免数据不一致或冲突。在秒杀抢购场景下,锁机制显得尤为重要,它能够保证商品库存的扣减操作是原子性的,避免出现超卖或数据不一致的情况。
50 10
|
2月前
|
存储 运维 NoSQL
分布式读写锁的奥义:上古世代 ZooKeeper 的进击
本文作者将介绍女娲对社区 ZooKeeper 在分布式读写锁实践细节上的思考,希望帮助大家理解分布式读写锁背后的原理。
|
3月前
|
消息中间件 关系型数据库 Java
‘分布式事务‘ 圣经:从入门到精通,架构师尼恩最新、最全详解 (50+图文4万字全面总结 )
本文 是 基于尼恩之前写的一篇 分布式事务的文章 升级而来 , 尼恩之前写的 分布式事务的文章, 在全网阅读量 100万次以上 , 被很多培训机构 作为 顶级教程。 此文修改了 老版本的 一个大bug , 大家不要再看老版本啦。
|
3月前
|
NoSQL Java Redis
开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!
本文详细介绍了 Redisson 延迟队列(DelayedQueue)的实现原理,包括基本使用、内部数据结构、基本流程、发送和获取延时消息以及初始化延时队列等内容。文章通过代码示例和流程图,逐步解析了延迟消息的发送、接收及处理机制,帮助读者深入了解 Redisson 延迟队列的工作原理。
|
3月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
61 2
|
3月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
62 1
|
3月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?