redis分布式锁案例分析

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
云原生内存数据库 Tair,内存型 2GB
简介: redis分布式锁案例分析

Case 1

未使用锁:


    @RequestMapping("/deduct_stock1")
    public String deductStock1() {
        //获取库存值
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
        if (stock > 0) {
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
            System.out.println("扣减成功,剩余库存:" + realStock);
        } else {
            System.out.println("扣减失败,库存不足");
        }
        return "end";
    }


假设:key = stock ; value = 500

存在并发问题:会发现如果大量线程同时访问,扣减库存的方法时。在某个很小的时间内。获取的库存都是相同的值500.如果此时有10线程调用该方法时。库存为500, 那么这10个线程执行完过后。库存量就为499.

这就出现超卖问题了。

Case 2


添加Jvm级别的锁: 
    @RequestMapping("/deduct_stock2")
    public String deductStock2() {
        synchronized (this){
            //获取库存值
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        }
        return "end";
    }


synchronized 此时如果当前的项目是部署在单机上的(只部署在一台服务器上),那就可以实现一个。如果是集群,锁的生效只有在当前服务器的进程上生效。

Case 3

使用redis中的setnx();设计一个简单的入门级别分布式


    /**
     *  使用redis中的setnx();设计一个简单的入门级别分布式锁
     *  
     * @return
     */
    @RequestMapping("/deduct_stock3")
    public String deductStock3() {
        String localKey = "lock:product:0001";
        Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true");
        if (!aBoolean){
            return "当前系统繁忙";
        }
        try{
            //获取库存值
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        }finally {
            stringRedisTemplate.delete(localKey);
        }
        return "end";
    }


存在的问题:

如果中间的任何一个一部分逻辑抛出了异常,那么就不会执行delete(localKey);的操作。那之后所有的线程都将加锁不成功。也就不会执行后面的业务代码。

优化:

在finally{}中进行delete(localKey)操作。

存在问题:

锁没有释放,宕机了的情况

Case 4

解决case3中存在的宕机没有释放锁的问题


    @RequestMapping("/deduct_stock4")
    public String deductStock4() {
        String localKey = "lock:product:0001";
        Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true");
        stringRedisTemplate.expire(localKey,10,TimeUnit.SECONDS);
        if (!aBoolean){
            return "当前系统繁忙";
        }
        try{
            //获取库存值
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        }finally {
            stringRedisTemplate.delete(localKey);
        }
        return "end";
    }


设置一个过期时间:

存在的问题:存在原子性问题。

原因:还没有执行到expire()时就宕机了


Case 5

解决枷锁时的原子性问题

解决办法:在枷锁时就设置超时时间,也就是枷锁和设置超时时间是原子操作


    @RequestMapping("/deduct_stock5")
    public String deductStock5() {
        String localKey = "lock:product:0001";
        //这条命令能够保证原子性
        Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true",10,TimeUnit.SECONDS);
        if (!aBoolean){
            return "当前系统繁忙";
        }
        try{
            //获取库存值
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        }finally {
            stringRedisTemplate.delete(localKey);
        }
        return "end";
    }


存在问题:如果系统并发量不是特别的大,问题不大。并发特别大的时候依然存在超卖问题。

高并发(每秒几千上万访问量)的场景下存在严重的并发问题:

lock-------------- > -----------delete

假设某个请求A的时间超过了超时时间(10s)(锁失效了),此时该线程A还没有执行delete方法。

另一个线程B这时候就可以加锁成功了,但是这时候线程A执行了delete方法。但是这时候线程A释放的锁是线程B的。

这时候在极端情况下就会出现 请求A释放请求B的锁,B释放C的,C释放D的,… 最后就会导致大量的超卖问题。

Case 6

该如何解决 deductStock5()中存在的问题。

分析:问题存在的根本原因就是在执行delete方法的时候。自己的锁被其他的线程释放了。

解决办法:给每个线程生成一个唯一id.例如使用uuid. 在最后释放锁的时候判断是否是自己的锁。如果是自己的才释放。

注意:不要使用线程id,不同的服务器可能有相同的线程id


    @RequestMapping("/deduct_stock6")
    public String deductStock6() {
        String localKey = "lock:product:0001";
        String uuid = UUID.randomUUID().toString();
        //这条命令能够保证原子性
        Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, uuid,10,TimeUnit.SECONDS);
        if (!aBoolean){
            return "当前系统繁忙";
        }
        try{
            //获取库存值
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        }finally {
            if (uuid.equals(stringRedisTemplate.opsForValue().get(localKey))){
                stringRedisTemplate.delete(localKey);
            }
        }
        return "end";
    }


存在问题:存在原子性问题


            if (uuid.equals(stringRedisTemplate.opsForValue().get(localKey))){
            // 
                stringRedisTemplate.delete(localKey);
            }


上面的代码中不是原子的。在当前线程执行完if判断却还没有执行delete操作的时候。当前锁过期了。

又可能会出现超卖问题。当前的线程释放了其他线程的锁


解决方式:

1.锁续命(实现不容易)

使用一个分线程,使用定时任务,每过一段时间,判断业务的主线程有没有结束(是否还加着锁)。如果还加着锁,将锁的超时时间重新设置。

2.使用现成的 例如redisson

Case 7


    @RequestMapping("/deduct_stock7")
    public String deductStock7() {
        String lockKey = "lock:product:0001";
        //获取锁对象
        RLock redissonLock = redisson.getLock(lockKey);
        //加分布式锁
        redissonLock.lock();
        try {
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        } finally {
            //解锁
            redissonLock.unlock();
        }
        return "end";
    }


核心使用lua脚本


    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }


Redis Lua脚本


Redis在2.6推出了脚本功能,允许开发者使用Lua语言编写脚本传到Redis中执行。使用脚本的好处如下:


1、减少网络开销:本来5次网络请求的操作,可以用一个请求完成,原先5次请求的逻辑放在redis服务器上完成。使用脚本,减少了网络往返时延。这点跟管道类似。

2、原子操作:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。管道不是原子的,不过redis的批量操作命令(类似mset)是原子的。

3、替代redis的事务功能:redis自带的事务功能很鸡肋,而redis的lua脚本几乎实现了常规的事务功能,官方推荐如果要使用redis的事务功能可以用redis lua替代。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
4天前
|
存储 监控 固态存储
【vSAN分布式存储服务器数据恢复】VMware vSphere vSAN 分布式存储虚拟化平台VMDK文件1KB问题数据恢复案例
在一例vSAN分布式存储故障中,因替换故障闪存盘后磁盘组失效,一台采用RAID0策略且未使用置备的虚拟机VMDK文件受损,仅余1KB大小。经分析发现,该VMDK文件与内部虚拟对象关联失效导致。恢复方案包括定位虚拟对象及组件的具体物理位置,解析分配空间,并手动重组RAID0结构以恢复数据。此案例强调了深入理解vSAN分布式存储机制的重要性,以及定制化数据恢复方案的有效性。
19 5
|
1天前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解、如何添加锁解决缓存击穿问题?分布式情况下如何添加分布式锁
这篇文章介绍了如何在SpringBoot项目中整合Redis,并探讨了缓存穿透、缓存雪崩和缓存击穿的问题以及解决方法。文章还提供了解决缓存击穿问题的加锁示例代码,包括存在问题和问题解决后的版本,并指出了本地锁在分布式情况下的局限性,引出了分布式锁的概念。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解、如何添加锁解决缓存击穿问题?分布式情况下如何添加分布式锁
|
1天前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
|
4天前
|
存储 固态存储 虚拟化
【vSAN分布式存储服务器数据恢复】VMware vSphere vSAN ESXi超融合HCI分布式存储数据恢复案例
近期,我司处理了一个由10台华为OceanStor存储组成的vSAN超融合架构,其中一台存储闪存盘出现故障,用户取下后用新的闪存盘代替,然后对该闪存盘所在的磁盘组进行重建,导致集群中一台使用0置备策略的虚拟机数据丢失。
22 6
|
1天前
|
NoSQL 安全 Java
nicelock--一个注解即可使用Redis分布式锁!
Nicelock的引入为分布式系统中的资源同步访问提供了一个简单高效和可靠的解决方案。通过注解的方式,简化了锁的实现和使用,使开发人员可以将更多精力专注于业务逻辑的实现,而不是锁的管理。此外,Nicelock在保持简单易用的同时,也提供了足够的灵活性和可靠性,满足了不同应用场景下对分布式锁的需求。
8 1
|
5天前
|
NoSQL Java Redis
Redis字符串数据类型之INCR命令,通常用于统计网站访问量,文章访问量,实现分布式锁
这篇文章详细解释了Redis的INCR命令,它用于将键的值增加1,通常用于统计网站访问量、文章访问量,以及实现分布式锁,同时提供了Java代码示例和分布式锁的实现思路。
13 0
|
1月前
|
NoSQL Java Redis
实现基于Redis的分布式锁机制
实现基于Redis的分布式锁机制
|
27天前
|
存储 缓存 NoSQL
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
redis分布式锁、redisson、可重入、主从一致性、WatchDog、Redlock红锁、zookeeper;Redis集群、主从复制,全量同步、增量同步;哨兵,分片集群,Redis为什么这么快,I/O多路复用模型——用户空间和内核空间、阻塞IO、非阻塞IO、IO多路复用,Redis网络模型
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
|
30天前
|
NoSQL Java Redis
分布式锁实现原理问题之使用Redis的setNx命令来实现分布式锁问题如何解决
分布式锁实现原理问题之使用Redis的setNx命令来实现分布式锁问题如何解决
|
1月前
|
canal 缓存 NoSQL
Redis常见面试题(一):Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略
Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;先删除缓存还是先修改数据库,双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略
Redis常见面试题(一):Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略