分布式锁看了又看,最佳方案我来告诉你

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: 分布式锁看了又看,最佳方案我来告诉你

分布式锁的场景


秒杀场景案例


对于商品秒杀的场景,我们需要防止库存超卖或者重复扣款等并发问题,我们通常需要使用分布式锁,来解决共享资源竞争导致数据不一致的问题。


以手机秒杀的场景为例子,在抢购的过程中通常我们有三个步骤:


  1. 扣掉对应商品的库存;2. 创建商品的订单;3. 用户支付。


对于这样的场景我们就可以采用分布式锁的来解决,比如我们在用户进入秒杀 “下单“ 链接的过程中,我们可以对商品库存进行加锁,然后完成扣库存和其他操作,操作完成后。释放锁,让下一个用户继续进入保证库存的安全性;也可以减少因为秒杀失败,导致 DB 回滚的次数。整个流程如下图所示:

Zookeeper 分布式锁秒杀场景.png

炒荣坊屋

用户1

用户1

用户n

秒杀场景,OnePulsX

秒杀场景,OnePulsX

秒杀场景,OnePulsx

lock

库存Lock

1.扣库存

2创建订单

库存

订单

付款

3.付款

unlock

成单结束

发货后续流程


注:对于锁的粒度要根据具体的场景和需求来权衡。


三种分布式锁


对于 Zookeeper 的分布式锁实现,主要是利用 Zookeeper 的两个特征来实现:


  1. Zookeeper 的一个节点不能被重复创建
  2. Zookeeper 的 Watcher 监听机制


非公平锁


对于非公平锁,我们在加锁的过程如下图所示。


Zookeeper 分布式锁非公平锁.png

获取锁

是否有其他线程获取到锁或者

正在执行事务

加锁

监听等待

create

get-w/lock/lock_XXX

/lock/lockidXxx

是否加锁成功

释放锁

占用锁

delete/lock/lockxxx

一执行完毕/事务中断

优点和缺点


其实上面的实现有优点也有缺点:


  • 优点:
    实现比较简单,有通知机制,能提供较快的响应,有点类似 ReentrantLock 的思想,对于节点删除失败的场景由 Session 超时保证节点能够删除掉。
  • 缺点:
    重量级,同时在大量锁的情况下会有 “惊群” 的问题。


“惊群” 就是在一个节点删除的时候,大量对这个节点的删除动作有订阅 Watcher 的线程会进行回调,这对Zk集群是十分不利的。所以需要避免这种现象的发生。


解决“惊群”:


为了解决“惊群“问题,我们需要放弃订阅一个节点的策略,那么怎么做呢?


  1. 我们将锁抽象成目录,多个线程在此目录下创建瞬时的顺序节点,因为 Zookeeper 会为我们保证节点的顺序性,所以可以利用节点的顺序进行锁的判断。
  2. 首先创建顺序节点,然后获取当前目录下最小的节点,判断最小节点是不是当前节点,如果是那么获取锁成功,如果不是那么获取锁失败。
  3. 获取锁失败的节点获取当前节点上一个顺序节点,对此节点注册监听,当节点删除的时候通知当前节点。
  4. 当unlock的时候删除节点之后会通知下一个节点。


公平锁


基于非公平锁的缺点,我们可以通过一下的方案来规避。


Zookeeper 分布式锁公平锁.png

加锁

create-e/lock

create-e-s/lock/lock_100

/lock//ockid_1000000000000

watch

/lock/lockid_1000000000001

watch

/lock/lockid_1000000000002

watch

/lock/lockid_1000000000003

Zookeeper 分布式锁公平锁.png

加锁

create-e/lock

create-e-s/lock/lock_100

/lock/lockid_1000000000000

watch

/lock/lockid_1000000000001

watch

/lock/lockid_1000000000002

watch

/lock/lockid_1000000000003

优点和缺点


优点: 如上借助于临时顺序节点,可以避免同时多个节点的并发竞争锁,缓解了服务端压力。


缺点: 对于读写场景来说,无法解决一致性的问题,如果读的时候也去获取锁的话,这样会导致性能下降,对于这样的问题,我们可以通过读写锁来实现如类似 jdk 中的 ReadWriteLock


读写锁实现


对于读写锁的特点:读写锁在如果多个线程都是在读的时候,是可以并发读的,就是一个无锁的状态,如果有写锁正在操作的时候,那么读锁需要等待写锁。在加写锁的时候,由于前面的读锁都是并发,所以需要监听最后一个读锁完成后执行写锁。步骤如下:


  1. read 请求, 如果前面是读锁,可以直接读取,不需要监听。如果前面是一个或者多个写锁那么只需要监听最后一个写锁。
  2. write 请求,只需要对前面的节点监听。Watcher 机制和互斥锁一样。

Zookeeper 分布式锁读写锁.png

加锁

create-e/lock

create-e-s/lock/lock_100

read/lock/lockid_1000000000000

read/lock/lockid_1000000000001

-watch

write/lock/lockid_1000000000002

-watch

write/lock/lockid_1000000000002

watch

watch

read/lock/lockid_1000000000003

read/lock/lockid_1000000000004

watch

write/lock/jockid_1000000000005


分布式锁实战


本文源码中使用环境:JDK 1.8Zookeeper 3.6.x


Curator 组件实现


POM 依赖


    <dependency>  <groupId>org.apache.curator</groupId>  <artifactId>curator-framework</artifactId>  <version>2.13.0</version></dependency><dependency>  <groupId>org.apache.curator</groupId>  <artifactId>curator-recipes</artifactId>  <version>2.13.0</version></dependency>


    互斥锁运用


    由于 Zookeeper 非公平锁的 “惊群” 效应,非公平锁在 Zookeeper 中其实并不是最好的选择。下面是一个模拟秒杀的例子来使用 Zookeeper 分布式锁。

      public class MutexTest {    static ExecutorService executor = Executors.newFixedThreadPool(8);    static AtomicInteger stock = new AtomicInteger(3);    public static void main(String[] args) throws InterruptedException {        CuratorFramework client = getZkClient();        String key = "/lock/lockId_111/111";        final InterProcessMutex mutex = new InterProcessMutex(client, key);        for (int i = 0; i < 99; i++) {            executor.submit(() -> {                if (stock.get() < 0) {                    System.err.println("库存不足, 直接返回");                    return;                }                try {                    boolean acquire = mutex.acquire(200, TimeUnit.MILLISECONDS);                    if (acquire) {                        int s = stock.decrementAndGet();                        if (s < 0) {                            System.err.println("进入秒杀,库存不足");                        } else {                            System.out.println("购买成功, 剩余库存: " + s);                        }                    }                } catch (Exception e) {                    e.printStackTrace();                } finally {                    try {                        if (mutex.isAcquiredInThisProcess())                            mutex.release();                    } catch (Exception e) {                        e.printStackTrace();                    }                }            });        }        while (true) {            if (executor.isTerminated()) {                executor.shutdown();                System.out.println("秒杀完毕剩余库存为:" + stock.get());            }            TimeUnit.MILLISECONDS.sleep(100);        }    }    private static CuratorFramework getZkClient() {        String zkServerAddress = "127.0.0.1:2181";        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);        CuratorFramework zkClient = CuratorFrameworkFactory.builder()                .connectString(zkServerAddress)                .sessionTimeoutMs(5000)                .connectionTimeoutMs(5000)                .retryPolicy(retryPolicy)                .build();        zkClient.start();        return zkClient;    }}


      读写锁运用


      读写锁可以用来保证缓存双写的强一致性的,因为读写锁在多线程读的时候是无锁的, 只有在前面有写锁的时候才会等待写锁完成后访问数据。

        public class ReadWriteLockTest {    static ExecutorService executor = Executors.newFixedThreadPool(8);    static AtomicInteger stock = new AtomicInteger(3);    static InterProcessMutex readLock;    static InterProcessMutex writeLock;    public static void main(String[] args) throws InterruptedException {        CuratorFramework client = getZkClient();        String key = "/lock/lockId_111/1111";        InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, key);        readLock = readWriteLock.readLock();        writeLock = readWriteLock.writeLock();        for (int i = 0; i < 16; i++) {            executor.submit(() -> {                try {                    boolean read = readLock.acquire(2000, TimeUnit.MILLISECONDS);                    if (read) {                        int num = stock.get();                        System.out.println("读取库存,当前库存为: " + num);                        if (num < 0) {                            System.err.println("库存不足, 直接返回");                            return;                        }                    }                } catch (Exception e) {                    e.printStackTrace();                }finally {                    if (readLock.isAcquiredInThisProcess()) {                        try {                            readLock.release();                        } catch (Exception e) {                            e.printStackTrace();                        }                    }                }                try {                    boolean acquire = writeLock.acquire(2000, TimeUnit.MILLISECONDS);                    if (acquire) {                        int s = stock.get();                        if (s <= 0) {                            System.err.println("进入秒杀,库存不足");                        } else {                            s = stock.decrementAndGet();                            System.out.println("购买成功, 剩余库存: " + s);                        }                    }                } catch (Exception e) {                    e.printStackTrace();                } finally {                    try {                        if (writeLock.isAcquiredInThisProcess())                            writeLock.release();                    } catch (Exception e) {                        e.printStackTrace();                    }                }            });        }        while (true) {            if (executor.isTerminated()) {                executor.shutdown();                System.out.println("秒杀完毕剩余库存为:" + stock.get());            }            TimeUnit.MILLISECONDS.sleep(100);        }    }    private static CuratorFramework getZkClient() {        String zkServerAddress = "127.0.0.1:2181";        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);        CuratorFramework zkClient = CuratorFrameworkFactory.builder()                .connectString(zkServerAddress)                .sessionTimeoutMs(5000)                .connectionTimeoutMs(5000)                .retryPolicy(retryPolicy)                .build();        zkClient.start();        return zkClient;    }}


        打印结果如下,一开始会有 8 个输出结果为 读取库存,当前库存为: 3 然后在写锁中回去顺序的扣减少库存。

          读取库存,当前库存为: 3读取库存,当前库存为: 3读取库存,当前库存为: 3读取库存,当前库存为: 3读取库存,当前库存为: 3读取库存,当前库存为: 3读取库存,当前库存为: 3读取库存,当前库存为: 3购买成功, 剩余库存: 2购买成功, 剩余库存: 1购买成功, 剩余库存: 0进入秒杀,库存不足进入秒杀,库存不足进入秒杀,库存不足进入秒杀,库存不足进入秒杀,库存不足读取库存,当前库存为: 0读取库存,当前库存为: 0读取库存,当前库存为: 0读取库存,当前库存为: 0读取库存,当前库存为: 0读取库存,当前库存为: 0读取库存,当前库存为: 0读取库存,当前库存为: 0进入秒杀,库存不足进入秒杀,库存不足进入秒杀,库存不足进入秒杀,库存不足进入秒杀,库存不足进入秒杀,库存不足进入秒杀,库存不足进入秒杀,库存不足


          分布式锁的选择


          咱们最常用的就是 Redis 的分布式锁和 Zookeeper 的分布式锁,在性能方面 Redis 的每秒钟 TPS 可以上轻松上万。在大规模的高并发场景我推荐使用 Redis 分布式锁来作为推荐的技术方案。如果对并发要求不是特别高的场景可以使用 Zookeeper 分布式来处理。

          相关实践学习
          基于MSE实现微服务的全链路灰度
          通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
          相关文章
          |
          1月前
          |
          消息中间件 架构师 数据库
          本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
          45岁资深架构师尼恩分享了一篇关于分布式事务的文章,详细解析了如何在10Wqps高并发场景下实现分布式事务。文章从传统单体架构到微服务架构下分布式事务的需求背景出发,介绍了Seata这一开源分布式事务解决方案及其AT和TCC两种模式。随后,文章深入探讨了经典ebay本地消息表方案,以及如何使用RocketMQ消息队列替代数据库表来提高性能和可靠性。尼恩还分享了如何结合延迟消息进行事务数据的定时对账,确保最终一致性。最后,尼恩强调了高端面试中需要准备“高大上”的答案,并提供了多个技术领域的深度学习资料,帮助读者提升技术水平,顺利通过面试。
          本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
          |
          8月前
          |
          存储 NoSQL Java
          分布式锁中的王者方案 - Redission
          分布式锁中的王者方案 - Redission
          97 1
          |
          1月前
          |
          消息中间件 SQL 中间件
          大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
          分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
          214 7
          |
          1月前
          |
          缓存 NoSQL Java
          Spring Boot中的分布式缓存方案
          Spring Boot提供了简便的方式来集成和使用分布式缓存。通过Redis和Memcached等缓存方案,可以显著提升应用的性能和扩展性。合理配置和优化缓存策略,可以有效避免常见的缓存问题,保证系统的稳定性和高效运行。
          51 3
          |
          2月前
          |
          NoSQL 安全 PHP
          hyperf-wise-locksmith,一个高效的PHP分布式锁方案
          `hyperf-wise-locksmith` 是 Hyperf 框架下的互斥锁库,支持文件锁、分布式锁、红锁及协程锁,有效防止分布式环境下的竞争条件。本文介绍了其安装、特性和应用场景,如在线支付系统的余额扣减,确保操作的原子性。
          33 4
          |
          2月前
          |
          NoSQL 算法 关系型数据库
          分布式 ID 详解 ( 5大分布式 ID 生成方案 )
          本文详解分布式全局唯一ID及其5种实现方案,关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
          分布式 ID 详解 ( 5大分布式 ID 生成方案 )
          |
          7月前
          |
          消息中间件 数据挖掘 程序员
          【建议收藏】高并发下的分布式事务:如何选择最优方案?
          本文介绍了分布式事务的三种常见解决方案。在分布式系统中,事务处理变得复杂,需确保ACID特性。TCC(Try-Confirm-Cancel)方案适用于严格资金要求的场景,如银行转账,通过预留、确认和取消步骤确保一致性。可靠消息最终一致性方案适合一致性要求较低的场景,如电商积分处理,通过消息中间件实现最终一致性。最大努力通知方案则用于允许不一致的场景,如数据分析,通过重复通知尽可能达成一致性。选择合适的方案取决于具体应用场景。
          198 5
          |
          3月前
          |
          存储 缓存 NoSQL
          分布式架构下 Session 共享的方案
          【10月更文挑战第15天】在实际应用中,需要根据具体的业务需求、系统架构和性能要求等因素,选择合适的 Session 共享方案。同时,还需要不断地进行优化和调整,以确保系统的稳定性和可靠性。
          |
          3月前
          |
          SQL NoSQL 安全
          分布式环境的分布式锁 - Redlock方案
          【10月更文挑战第2天】Redlock方案是一种分布式锁实现,通过在多个独立的Redis实例上加锁来提高容错性和可靠性。客户端需从大多数节点成功加锁且总耗时小于锁的过期时间,才能视为加锁成功。然而,该方案受到分布式专家Martin的质疑,指出其在特定异常情况下(如网络延迟、进程暂停、时钟偏移)可能导致锁失效,影响系统的正确性。Martin建议采用fencing token方案,以确保分布式锁的正确性和安全性。
          62 0
          |
          5月前
          |
          存储 NoSQL Java
          一天五道Java面试题----第十一天(分布式架构下,Session共享有什么方案--------->分布式事务解决方案)
          这篇文章是关于Java面试中的分布式架构问题的笔记,包括分布式架构下的Session共享方案、RPC和RMI的理解、分布式ID生成方案、分布式锁解决方案以及分布式事务解决方案。
          一天五道Java面试题----第十一天(分布式架构下,Session共享有什么方案--------->分布式事务解决方案)