从构建分布式秒杀系统聊聊分布式锁

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 这篇文章我们重点介绍一下 Zookeeper 如何实现分布式锁。

案例介绍

在尝试了解分布式锁之前,大家可以想象一下,什么场景下会使用分布式锁?
_1
单机应用架构中,秒杀案例使用ReentrantLcok或者synchronized来达到秒杀商品互斥的目的。然而在分布式系统中,会存在多台机器并行去实现同一个功能。也就是说,在多进程中,如果还使用以上JDK提供的进程锁,来并发访问数据库资源就可能会出现商品超卖的情况。因此,需要我们来实现自己的分布式锁。

实现一个分布式锁应该具备的特性:

  • 高可用、高性能的获取锁与释放锁
  • 在分布式系统环境下,一个方法或者变量同一时间只能被一个线程操作
  • 具备锁失效机制,网络中断或宕机无法释放锁时,锁必须被删除,防止死锁
  • 具备阻塞锁特性,即没有获取到锁,则继续等待获取锁
  • 具备非阻塞锁特性,即没有获取到锁,则直接返回获取锁失败
  • 具备可重入特性,一个线程中可以多次获取同一把锁,比如一个线程在执行一个带锁的方法,该方法中又调用了另一个需要相同锁的方法,则该线程可以直接执行调用的方法,而无需重新获得锁

在之前的秒杀案例中,我们曾介绍过关于分布式锁几种实现方式:

  • 基于数据库实现分布式锁
  • 基于 Redis 实现分布式锁
  • 基于 Zookeeper 实现分布式锁

前两种对于分布式生产环境来说并不是特别推荐,高并发下数据库锁性能太差,Redis在锁时间限制和缓存一致性存在一定问题。这里我们重点介绍一下 Zookeeper 如何实现分布式锁。

实现原理

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能存在唯一文件名。
_2

数据模型
  • PERSISTENT 持久化节点,节点创建后,不会因为会话失效而消失
  • EPHEMERAL 临时节点, 客户端session超时此类节点就会被自动删除
  • EPHEMERAL_SEQUENTIAL 临时自动编号节点
  • PERSISTENT_SEQUENTIAL 顺序自动编号持久化节点,这种节点会根据当前已存在的节点数自动加 1
监视器(watcher)

当创建一个节点时,可以注册一个该节点的监视器,当节点状态发生改变时,watch被触发时,ZooKeeper将会向客户端发送且仅发送一条通知,因为watch只能被触发一次。

根据zookeeper的这些特性,我们来看看如何利用这些特性来实现分布式锁:

  • 创建一个锁目录lock
  • 线程A获取锁会在lock目录下,创建临时顺序节点
  • 获取锁目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁
  • 线程B创建临时节点并获取所有兄弟节点,判断自己不是最小节点,设置监听(watcher)比自己次小的节点(只关注比自己次小的节点是为了防止发生“羊群效应”)
  • 线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是最小的节点,获得锁
代码分析

尽管ZooKeeper已经封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。但是如果让一个普通开发者去手撸一个分布式锁还是比较困难的,在秒杀案例中我们直接使用 Apache 开源的curator 开实现 Zookeeper 分布式锁。

这里我们使用以下版本,截止目前最新版4.0.1:

<!-- zookeeper 分布式锁、注意zookeeper版本  这里对应的是3.4.6-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.10.0</version>
</dependency>

首先,我们看下InterProcessLock接口中的几个方法:

/**
* 获取锁、阻塞等待、可重入
*/
public void acquire() throws Exception;

/**
* 获取锁、阻塞等待、可重入、超时则获取失败
*/
public boolean acquire(long time, TimeUnit unit) throws Exception;

/**
* 释放锁
*/
public void release() throws Exception;

/**
* Returns true if the mutex is acquired by a thread in this JVM
*/
boolean isAcquiredInThisProcess();

获取锁:

//获取锁
public void acquire() throws Exception
    {
        if ( !internalLock(-1, null) )
        {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }
private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
         实现同一个线程可重入性,如果当前线程已经获得锁,
         则增加锁数据中lockCount的数量(重入次数),直接返回成功
        */
        //获取当前线程
        Thread currentThread = Thread.currentThread();
        //获取当前线程重入锁相关数据
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            //原子递增一个当前值,记录重入次数,后面锁释放会用到
            lockData.lockCount.incrementAndGet();
            return true;
        }
        //尝试连接zookeeper获取锁
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            //创建可重入锁数据,用于记录当前线程重入次数
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }
        //获取锁超时或者zk通信异常返回失败
        return false;
    }

Zookeeper获取锁实现:

 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {    
        //获取当前时间戳
        final long      startMillis = System.currentTimeMillis();
        //如果unit不为空(非阻塞锁),把当前传入time转为毫秒
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        //子节点标识
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        //尝试次数
        int             retryCount = 0;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        //自旋锁,循环获取锁
        while ( !isDone )
        {
            isDone = true;

            try
            {
                //在锁节点下创建临时且有序的子节点,例如:_c_008c1b07-d577-4e5f-8699-8f0f98a013b4-lock-000000001
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                //如果当前子节点序号最小,获得锁则直接返回,否则阻塞等待前一个子节点删除通知(release释放锁)
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                //异常处理,如果找不到节点,这可能发生在session过期等时,因此,如果重试允许,只需重试一次即可
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }
        //如果获取锁则返回当前锁子节点路径
        if ( hasTheLock )
        {
            return ourPath;
        }

        return null;
    }
  private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }
            //自旋获取锁
            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                //获取所有子节点集合
                List<String>        children = getSortedChildren();
                //判断当前子节点是否为最小子节点
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                //如果是最小节点则获取锁
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    //获取前一个节点,用于监听
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try 
                        {
                            //这里使用getData()接口而不是checkExists()是因为,如果前一个子节点已经被删除了那么会抛出异常而且不会设置事件监听器,而checkExists虽然也可以获取到节点是否存在的信息但是同时设置了监听器,这个监听器其实永远不会触发,对于Zookeeper来说属于资源泄露
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                //如果设置了获取锁等待时间
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // 超时则删除子节点
                                    break;
                                }
                                //等待超时时间
                                wait(millisToWait);
                            }
                            else
                            {
                                wait();//一直等待
                            }
                        }
                        catch ( KeeperException.NoNodeException e ) 
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                            //如果前一个子节点已经被删除则deException,只需要自旋获取一次即可
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);//获取锁超时则删除节点
            }
        }
        return haveTheLock;
    }

释放锁:

 public void release() throws Exception
    {

        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        //没有获取锁,你释放个球球,如果为空抛出异常
        if ( lockData == null )
        {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
        //获取重入数量
        int newLockCount = lockData.lockCount.decrementAndGet();
        //如果重入锁次数大于0,直接返回
        if ( newLockCount > 0 )
        {
            return;
        }
        //如果重入锁次数小于0,抛出异常
        if ( newLockCount < 0 )
        {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try
        {
            //释放锁
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {
            //移除当前线程锁数据
            threadData.remove(currentThread);
        }
    }

测试案例

为了更好的理解其原理和代码分析中获取锁的过程,这里我们实现一个简单的Demo:

/**
 * 基于curator的zookeeper分布式锁
 */
public class CuratorUtil {
    private static String address = "192.168.1.180:2181";
    
    public static void main(String[] args) {
        //1、重试策略:初试时间为1s 重试3次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); 
        //2、通过工厂创建连接
        CuratorFramework client = CuratorFrameworkFactory.newClient(address, retryPolicy);
        //3、开启连接
        client.start();
        //4 分布式锁
        final InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock"); 
        //读写锁
        //InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/readwriter");
        
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        
        for (int i = 0; i < 5; i++) {
            fixedThreadPool.submit(new Runnable() {
                @Override
                public void run() {
                    boolean flag = false;
                    try {
                        //尝试获取锁,最多等待5秒
                        flag = mutex.acquire(5, TimeUnit.SECONDS);
                        Thread currentThread = Thread.currentThread();
                        if(flag){
                            System.out.println("线程"+currentThread.getId()+"获取锁成功");
                        }else{
                            System.out.println("线程"+currentThread.getId()+"获取锁失败");
                        }
                        //模拟业务逻辑,延时4秒
                        Thread.sleep(4000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally{
                        if(flag){
                            try {
                                mutex.release();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            });
        }
    }
}

这里我们开启5个线程,每个线程获取锁的最大等待时间为5秒,为了模拟具体业务场景,方法中设置4秒等待时间。开始执行main方法,通过ZooInspector监控/curator/lock下的节点如下图:

_3
对,没错,设置4秒的业务处理时长就是为了观察生成了几个顺序节点。果然如案例中所述,每个线程都会生成一个节点并且还是有序的。

观察控制台,我们会发现只有两个线程获取锁成功,另外三个线程超时获取锁失败会自动删除节点。线程执行完毕我们刷新一下/curator/lock节点,发现刚才创建的五个子节点已经不存在了。

小结

通过分析第三方开源工具实现的分布式锁方式,收获还是满满的。学习本身就是一个由浅入深的过程,从如何调用API,到理解其代码逻辑实现,想要更深入可以去挖掘Zookeeper的核心算法ZAB协议。

最后为了方便大家学习,总结了学习过程中遇到的几个关键词:重入锁、自旋锁、有序节点、阻塞、非阻塞、监听,希望对大家有所帮助。

文章来源:https://mp.weixin.qq.com/s/UQbSeGHAak1WDerPlQBq7w
相关推荐:https://www.roncoo.com/course/view/086c2c4027ac4a00aa1e9d63d7bac36d

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
27天前
|
NoSQL 关系型数据库 MySQL
分布式系统学习9:分布式锁
本文介绍了分布式系统中分布式锁的概念、实现方式及其应用场景。分布式锁用于在多个独立的JVM进程间确保资源的互斥访问,具备互斥、高可用、可重入和超时机制等特点。文章详细讲解了三种常见的分布式锁实现方式:基于Redis、Zookeeper和关系型数据库(如MySQL)。其中,Redis适合高性能场景,推荐使用Redisson库;Zookeeper适用于对一致性要求较高的场景,建议基于Curator框架实现;而基于数据库的方式性能较低,实际开发中较少使用。此外,还探讨了乐观锁和悲观锁的区别及适用场景,并介绍了如何通过Lua脚本和Redis的`SET`命令实现原子操作,以及Redisson的自动续期机
104 7
|
1月前
|
存储 运维 安全
盘古分布式存储系统的稳定性实践
本文介绍了阿里云飞天盘古分布式存储系统的稳定性实践。盘古作为阿里云的核心组件,支撑了阿里巴巴集团的众多业务,确保数据高可靠性、系统高可用性和安全生产运维是其关键目标。文章详细探讨了数据不丢不错、系统高可用性的实现方法,以及通过故障演练、自动化发布和健康检查等手段保障生产安全。总结指出,稳定性是一项系统工程,需要持续迭代演进,盘古经过十年以上的线上锤炼,积累了丰富的实践经验。
|
1月前
|
存储 分布式计算 Hadoop
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
50 7
|
2月前
|
机器学习/深度学习 存储 运维
分布式机器学习系统:设计原理、优化策略与实践经验
本文详细探讨了分布式机器学习系统的发展现状与挑战,重点分析了数据并行、模型并行等核心训练范式,以及参数服务器、优化器等关键组件的设计与实现。文章还深入讨论了混合精度训练、梯度累积、ZeRO优化器等高级特性,旨在提供一套全面的技术解决方案,以应对超大规模模型训练中的计算、存储及通信挑战。
121 4
|
3月前
|
存储 运维 负载均衡
构建高可用性GraphRAG系统:分布式部署与容错机制
【10月更文挑战第28天】作为一名数据科学家和系统架构师,我在构建和维护大规模分布式系统方面有着丰富的经验。最近,我负责了一个基于GraphRAG(Graph Retrieval-Augmented Generation)模型的项目,该模型用于构建一个高可用性的问答系统。在这个过程中,我深刻体会到分布式部署和容错机制的重要性。本文将详细介绍如何在生产环境中构建一个高可用性的GraphRAG系统,包括分布式部署方案、负载均衡、故障检测与恢复机制等方面的内容。
232 4
构建高可用性GraphRAG系统:分布式部署与容错机制
|
3月前
|
机器学习/深度学习 人工智能 分布式计算
【AI系统】分布式通信与 NVLink
进入大模型时代后,AI的核心转向大模型发展,训练这类模型需克服大量GPU资源及长时间的需求。面对单个GPU内存限制,跨多个GPU的分布式训练成为必要,这涉及到分布式通信和NVLink技术的应用。分布式通信允许多个节点协作完成任务,而NVLink则是一种高速、低延迟的通信技术,用于连接GPU或GPU与其它设备,以实现高性能计算。随着大模型的参数、数据规模扩大及算力需求增长,分布式并行策略,如数据并行和模型并行,变得至关重要。这些策略通过将模型或数据分割在多个GPU上处理,提高了训练效率。此外,NVLink和NVSwitch技术的持续演进,为GPU间的高效通信提供了更强的支持,推动了大模型训练的快
82 0
|
3月前
|
存储 监控 大数据
构建高可用性ClickHouse集群:从单节点到分布式
【10月更文挑战第26天】随着业务的不断增长,单一的数据存储解决方案可能无法满足日益增加的数据处理需求。在大数据时代,数据库的性能、可扩展性和稳定性成为企业关注的重点。ClickHouse 是一个用于联机分析处理(OLAP)的列式数据库管理系统(DBMS),以其卓越的查询性能和高吞吐量而闻名。本文将从我的个人角度出发,分享如何将单节点 ClickHouse 扩展为高可用性的分布式集群,以提升系统的稳定性和可靠性。
350 0
|
2天前
|
NoSQL Java Redis
Springboot使用Redis实现分布式锁
通过这些步骤和示例,您可以系统地了解如何在Spring Boot中使用Redis实现分布式锁,并在实际项目中应用。希望这些内容对您的学习和工作有所帮助。
112 83
|
4月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
11天前
|
缓存 NoSQL 中间件
Redis,分布式缓存演化之路
本文介绍了基于Redis的分布式缓存演化,探讨了分布式锁和缓存一致性问题及其解决方案。首先分析了本地缓存和分布式缓存的区别与优劣,接着深入讲解了分布式远程缓存带来的并发、缓存失效(穿透、雪崩、击穿)等问题及应对策略。文章还详细描述了如何使用Redis实现分布式锁,确保高并发场景下的数据一致性和系统稳定性。最后,通过双写模式和失效模式讨论了缓存一致性问题,并提出了多种解决方案,如引入Canal中间件等。希望这些内容能为读者在设计分布式缓存系统时提供有价值的参考。感谢您的阅读!
Redis,分布式缓存演化之路