使用ZooKeeper原生API实现分布式锁

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 分布式锁的引入一个很典型的秒杀场景,或者说并发量非常高的场景下,对商品库存的操作,我用一个SpringBoot小项目模拟一下。用到的技术知识:SpringBootRedisZooKeeper我提前将库存 stock 放在redis,初始值为288:

分布式锁的引入

一个很典型的秒杀场景,或者说并发量非常高的场景下,对商品库存的操作,我用一个SpringBoot小项目模拟一下。

用到的技术知识:

  • SpringBoot
  • Redis
  • ZooKeeper

我提前将库存 stock 放在redis,初始值为288:

127.0.0.1:6379> set stock 288
OK
127.0.0.1:6379> get stock
"288"
复制代码

扣减库存的api:

@RequestMapping("/v1/reduce")
public String reduceStock() {
    String stockStr = redisTemplate.opsForValue().get("stock");
    int stock = Integer.parseInt(stockStr);
    if (stock > 0) {
        int realStock = stock - 1;
        redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
        System.out.println(Thread.currentThread().getName() + " 减库存成功,剩余库存:" + realStock);
    } else {
        System.out.println("不能再减了,没有库存了!");
    }
    return port + ": reduce stock end";
}
复制代码

单机环境下的高并发:

网络异常,图片无法展示
|

我用 Apache JMeter 模拟在同一时刻,有 500 个请求打到 /stock/v1/reduce 上进行减库存的操作:

网络异常,图片无法展示
|

网络异常,图片无法展示
|

Apache JMeter是Apache组织开发的基于Java的压力测试工具。用它很容易模拟出高并发场景。

Apache JMeter官网:
jmeter.apache.org/download_jm…

启动SpringBoot项目,执行压测,运行结果:

网络异常,图片无法展示
|

500个并发,才扣减了5个!!!BOSS该找你事了!

不过这种情况我们程序员是不会让它出现的,加个 synchronized ,让每个线程拿到锁之后再去扣减库存:

网络异常,图片无法展示
|

代码实现:

@RequestMapping("/v2/reduce")
public String reduceStockV2() {
    //加线程同步
    synchronized (this) {
        String stockStr = redisTemplate.opsForValue().get("stock");
        int stock = Integer.parseInt(stockStr);
        if (stock > 0) {
            int realStock = stock - 1;
            redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
            System.out.println(Thread.currentThread().getName() + " 减库存成功,剩余库存:" + realStock);
        } else {
            System.out.println("不能再减了,没有库存了!");
        }
    }
    return port + ": reduce stock end";
}
复制代码

压测结果:

网络异常,图片无法展示
|

synchronized只能帮我们解决单进程内的线程安全问题,而事实上,在分布式环境下,我们一个服务,比如这个扣减库存的服务,可能存在于多个服务器(多个tomcat进程)中,再使用 synchronized 就无法解决分布式服务下的高并发问题了。

我们来模拟一下分布式环境下的高并发问题,我本地开启两个扣减库存的服务,一个8080端口,一个8090端口,使用Nginx做反向代理:

网络异常,图片无法展示
|

我的Nginx服务器的IP地址为 192.168.134.135 ,我开启 200个线程循环4次 ,相当于一共有 800 次访问
http://192.168.134.135/stock/v2/reduce
,如此就可以模拟出分布式下的高并发场景了。

网络异常,图片无法展示
|

开始压测:

网络异常,图片无法展示
|

压测结果:

服务1:

网络异常,图片无法展示
|

服务2:

网络异常,图片无法展示
|

发现 synchronized 根本没用,很多线程还是重复扣减了库存!!!

这种时候就需要 分布式锁 来解决这个问题了。

使用ZooKeeper实现分布式锁

本案例采用zk自己的api实现分布式锁。当然还可以使用第三方提供的api实现zk分布式锁,比如 Curator (现在Curator也归属于Apache了)。

Curator官网:curator.apache.org/

后续会考虑出一个Curator版本的zk分布式锁的实现。本文提供的方式掌握了,其他版本就不在话下了,哈哈!

为什么zk能实现分布式锁呢?我们之前已经了解过zk拥有类似于Linux文件系统的数据结构,还有一套事件监听机制。

网络异常,图片无法展示
|

zk的数据结构中,每个节点还可以存数据,这个就比较厉害了。还可以创建顺序节点,节点带编号。临时节点还有随session存在而存在的特性,所以当客户端失去连接的时候session消亡,临时节点也就消失了。

还有,zk的事件监听机制。zk上所有的节点,持久节点,顺序节点,临时顺序节点等都可以对它进行监听,当某一个节点上发生了变化,比如当节点被创建了、数据更新、 节点被删除 等,这些 事件都会被监听 到,同时 触发回调 ,那么我们在代码中就能够做一些相应的处理。

网络异常,图片无法展示
|

这里其实有个问题,如果我们只关注 /lock 节点的话,并发量一高会带来通信压力,因为很多client都watch了 /lock 节点,当 /lock 节点发生变化,这些client一窝蜂的进行事件回调争抢锁,压力就出现了。

zk的 临时顺序节点 能帮我们解决这个问题,我们只监听顺序节点的前一个节点,看它是不是顺序最小的,让顺序最小的获得锁!

网络异常,图片无法展示
|

因此,基于zk的 临时顺序节点事件监听 ,我们就可以实现 分布式锁

代码实现

总体框架,三大步:

  1. 争抢锁
  2. 做自己爱做的事情
  3. 释放锁

先把这个架子搭起来:

@RequestMapping("/v3/reduce")
public String reduceStockV3() {
    try {
        //1. 抢锁
        tryLock
        //2. 做自己爱做的事
        //比如减库存
        //3. 释放锁
        releaseLock
    } catch (Exception e) {
        e.printStackTrace();
    }
    return port + ": reduce stock end";
}
复制代码

zk API的响应式编程很爽,前文 使用ZooKeeper客户端原生API实现分布式配置中心 就是用的是响应式编程。

响应式编程很好理解,就是对事件加监听,当完成某个事件的时候,就出发相应的回调函数,zk的很多api都提供了方法的异步调用版本。

前文分析,实现分布式锁的流程是: 抢锁就是创建临时有序节点,监听创建成功后,获取根节点(连接zk集群时候的根节点)下的所有孩子节点,然后比较当前节点时候为第一个(要排好序),若是第一个,则抢锁成功,做自己的业务,然后释放锁-删除节点,当然删除节点也会触发回调。

因此,核心代码逻辑都在 监听回调 里,抽象出一个 WatchAndCallback 出来:

public class WatchAndCallback implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
    //肯定得有zookeeper
    private final ZooKeeper zk;
    //创建的节点名称,节点监听它的前一个顺序节点时需要用到
    private String pathName;
    //也得有CountDownLatch,用来保证程序阻塞与继续执行
    private final CountDownLatch latch = new CountDownLatch(1);
    //线程名称,辅助观察
    private final String threadName;
    public WatchAndCallback(ZooKeeper zk, String threadName) {
        this.zk = zk;
        this.threadName = threadName;
    }
    //Children2Callback
    //此回调用于检索节点的子节点和stat
    //处理异步调用的结果
    // List<String> children 给定路径上节点的子节点的无序数组,基于watch前一个节点,最小节点获得锁的机制,需要给children排个序
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        //实现分布式锁的核心
    }
    //StatCallback
//    处理异步调用的结果
//    成功, rc is KeeperException.Code.OK.
//    失败, rc is set to the corresponding failure code in KeeperException.
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        //TODO
    }
    //创建节点时的回调 StringCallback
    //此回调用于检索节点的名称
    // name: 创建的znode的名称。如果成功,名称和路径通常相等,除非创建了顺序节点。
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
    }
    //Watcher
    //如果某个线程释放锁了,也就是节点被删除了,也要触发监听
    //如果不是释放锁而是某个节点本身出问题了,zk也会删除node,也需要一个监听
    @Override
    public void process(WatchedEvent watchedEvent) {
    }
}
复制代码

我把分布式锁的代码逻辑都放在 WatchAndCallback 类中。

  1. 抢锁,tryLock
/**
 * 抢锁---zk节点具有互斥性,当已存在该节点时会创建失败,所以创建临时节点可以知道是否抢锁成功
 * @author 行百里者
 * @create 2020/9/18 15:37
 **/
public void tryLock() {
    System.out.println(threadName + " 试图抢锁");
    //创建临时序列节点,data就设置为当前线程名字即可,实际业务可设置为用户id
    //创建节点也有callback,传一个this即可,会触发调用processResult(int rc, String path, Object ctx, String name)
    zk.create("/stock",
            threadName.getBytes(),
            ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.EPHEMERAL_SEQUENTIAL,
            this,
            "create node ctx");
    //异步的,要await一下
    try {
        latch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
复制代码
  1. 减库存,保存lucky(实际场景中的订单信息)

在 controller 中,也就是说当一个客户抢锁成功,就要对数据做相应的改变:

//2. 做自己爱做的事
int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
if (stock > 0) {
    int realStock = stock - 1;
    redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
    //同时lucky+1
    redisTemplate.opsForValue().increment("lucky");
} else {
    System.out.println("库存不足");
}
复制代码
  1. 给别人机会,释放锁,releaseLock
//也就是删除临时节点
public void releaseLock() {
    try {
        zk.delete(pathName, -1);
        System.out.println(threadName + " 流程走完了,释放锁");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (KeeperException e) {
        e.printStackTrace();
    }
}
复制代码

压力测试

还是用JMeter模拟高并发场景,一次并发800个请求打到nginx上,nginx反向代理到两个tomcat。

首先开启两个tomcat:

网络异常,图片无法展示
|

初始化库存仍设置为288(redis),同时在设置一个 lucky ,表示有多少人抢到了库存中的数据,这个数据最终为288才是正确的:

127.0.0.1:6379> set stock 288
OK
127.0.0.1:6379> get stock
"288"
127.0.0.1:6379> set lucky 0
OK
127.0.0.1:6379> get lucky
"0"
复制代码

初始化zk lock根节点:

[zk: localhost:2181(CONNECTED) 5] ls /lock
[]
复制代码

压力测试:

网络异常,图片无法展示
|

看一下后台redis中存储的stock和lucky数据:

127.0.0.1:6379> get stock
"0"
127.0.0.1:6379> get lucky
"288"
127.0.0.1:6379> 
复制代码

stock为0说明没有超卖,lucky=288说明也没有同一个库存扣减多次的情况。

ok,zk实现分布式锁就是这么完美!

小结

zk实现分布式锁:

  1. 争抢锁,只有一个能获得锁
  2. 获得锁的人,如果故障了,导致死锁,用 zookeeper 实现分布式锁解决,基于zk的特征,可以创建临时节点,产生一个session,它如果挂了,session会消失,然后释放锁,因此 ZooKeeper能规避死锁
  3. 获得锁的人成功了,释放锁
  4. 锁被释放/删除,别人怎么知道?方式1:for循环,主动轮询,发送心跳,这种方式的弊端:延迟(实时性不强),压力(服务很多,都去轮询访问某一把锁)方式2:watch 解决延迟问题,此法也有弊端:通信压力(watch完了很多服务器回调去抢锁)方式3: 临时顺序节点 + 事件监听机制 (序列节点+watch) watch谁?watch前一个节点,最小的获得锁,一旦最小的释放锁,此法的效率成本是 ZooKeeper只给第二个node发事件回调

网络异常,图片无法展示
|

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
1月前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
420 2
|
6天前
|
运维 Cloud Native 应用服务中间件
阿里云微服务引擎 MSE 及 API 网关 2024 年 05 月产品动态
阿里云微服务引擎 MSE 面向业界主流开源微服务项目, 提供注册配置中心和分布式协调(原生支持 Nacos/ZooKeeper/Eureka )、云原生网关(原生支持Higress/Nginx/Envoy,遵循Ingress标准)、微服务治理(原生支持 Spring Cloud/Dubbo/Sentinel,遵循 OpenSergo 服务治理规范)能力。API 网关 (API Gateway),提供 APl 托管服务,覆盖设计、开发、测试、发布、售卖、运维监测、安全管控、下线等 API 生命周期阶段。帮助您快速构建以 API 为核心的系统架构.满足新技术引入、系统集成、业务中台等诸多场景需要
|
5天前
|
存储 监控 负载均衡
Zookeeper 详解:分布式协调服务的核心概念与实践
Zookeeper 详解:分布式协调服务的核心概念与实践
11 0
|
6天前
|
API
阿里云微服务引擎及 API 网关 2024 年 5 月产品动态
阿里云微服务引擎及 API 网关 2024 年 5 月产品动态。
阿里云微服务引擎及 API 网关 2024 年 5 月产品动态
|
1月前
|
前端开发 JavaScript 算法
分布式系统的一致性级别划分及Zookeeper一致性级别分析
分布式系统的一致性级别划分及Zookeeper一致性级别分析
|
1月前
|
人工智能 API
阿里云微服务引擎及 API 网关 2024 年 4 月产品动态
阿里云微服务引擎及 API 网关 2024 年 4 月产品动态。
|
1月前
|
运维 Cloud Native 应用服务中间件
阿里云微服务引擎 MSE 及 API 网关 2024 年 04 月产品动态
阿里云微服务引擎 MSE 面向业界主流开源微服务项目, 提供注册配置中心和分布式协调(原生支持 Nacos/ZooKeeper/Eureka )、云原生网关(原生支持Higress/Nginx/Envoy,遵循Ingress标准)、微服务治理(原生支持 Spring Cloud/Dubbo/Sentinel,遵循 OpenSergo 服务治理规范)能力。API 网关 (API Gateway),提供 APl 托管服务,覆盖设计、开发、测试、发布、售卖、运维监测、安全管控、下线等 API 生命周期阶段。帮助您快速构建以 API 为核心的系统架构.满足新技术引入、系统集成、业务中台等诸多场景需要。
|
1月前
|
存储 大数据 Apache
深入理解ZooKeeper:分布式协调服务的核心与实践
【5月更文挑战第7天】ZooKeeper是Apache的分布式协调服务,确保大规模分布式系统中的数据一致性与高可用性。其特点包括强一致性、高可用性、可靠性、顺序性和实时性。使用ZooKeeper涉及安装配置、启动服务、客户端连接及执行操作。实际应用中,面临性能瓶颈、不可伸缩性和单点故障等问题,可通过水平扩展、集成其他服务和多集群备份来解决。理解ZooKeeper原理和实践,有助于构建高效分布式系统。
|
1月前
|
存储 Shell 数据安全/隐私保护
ZooKeeper【基础知识 04】控制权限ACL(原生的 Shell 命令)
【4月更文挑战第11天】ZooKeeper【基础知识 04】控制权限ACL(原生的 Shell 命令)
35 7
|
1月前
|
Java API Apache
ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
【4月更文挑战第11天】ZooKeeper【基础 03】Java 客户端 Apache Curator 基础 API 使用举例(含源代码)
41 11

热门文章

最新文章