redis分布式锁案例分析

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: redis分布式锁案例分析

Case 1

未使用锁:


    @RequestMapping("/deduct_stock1")
    public String deductStock1() {
        //获取库存值
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
        if (stock > 0) {
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
            System.out.println("扣减成功,剩余库存:" + realStock);
        } else {
            System.out.println("扣减失败,库存不足");
        }
        return "end";
    }


假设:key = stock ; value = 500

存在并发问题:会发现如果大量线程同时访问,扣减库存的方法时。在某个很小的时间内。获取的库存都是相同的值500.如果此时有10线程调用该方法时。库存为500, 那么这10个线程执行完过后。库存量就为499.

这就出现超卖问题了。

Case 2


添加Jvm级别的锁: 
    @RequestMapping("/deduct_stock2")
    public String deductStock2() {
        synchronized (this){
            //获取库存值
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        }
        return "end";
    }


synchronized 此时如果当前的项目是部署在单机上的(只部署在一台服务器上),那就可以实现一个。如果是集群,锁的生效只有在当前服务器的进程上生效。

Case 3

使用redis中的setnx();设计一个简单的入门级别分布式


    /**
     *  使用redis中的setnx();设计一个简单的入门级别分布式锁
     *  
     * @return
     */
    @RequestMapping("/deduct_stock3")
    public String deductStock3() {
        String localKey = "lock:product:0001";
        Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true");
        if (!aBoolean){
            return "当前系统繁忙";
        }
        try{
            //获取库存值
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        }finally {
            stringRedisTemplate.delete(localKey);
        }
        return "end";
    }


存在的问题:

如果中间的任何一个一部分逻辑抛出了异常,那么就不会执行delete(localKey);的操作。那之后所有的线程都将加锁不成功。也就不会执行后面的业务代码。

优化:

在finally{}中进行delete(localKey)操作。

存在问题:

锁没有释放,宕机了的情况

Case 4

解决case3中存在的宕机没有释放锁的问题


    @RequestMapping("/deduct_stock4")
    public String deductStock4() {
        String localKey = "lock:product:0001";
        Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true");
        stringRedisTemplate.expire(localKey,10,TimeUnit.SECONDS);
        if (!aBoolean){
            return "当前系统繁忙";
        }
        try{
            //获取库存值
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        }finally {
            stringRedisTemplate.delete(localKey);
        }
        return "end";
    }


设置一个过期时间:

存在的问题:存在原子性问题。

原因:还没有执行到expire()时就宕机了


Case 5

解决枷锁时的原子性问题

解决办法:在枷锁时就设置超时时间,也就是枷锁和设置超时时间是原子操作


    @RequestMapping("/deduct_stock5")
    public String deductStock5() {
        String localKey = "lock:product:0001";
        //这条命令能够保证原子性
        Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true",10,TimeUnit.SECONDS);
        if (!aBoolean){
            return "当前系统繁忙";
        }
        try{
            //获取库存值
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        }finally {
            stringRedisTemplate.delete(localKey);
        }
        return "end";
    }


存在问题:如果系统并发量不是特别的大,问题不大。并发特别大的时候依然存在超卖问题。

高并发(每秒几千上万访问量)的场景下存在严重的并发问题:

lock-------------- > -----------delete

假设某个请求A的时间超过了超时时间(10s)(锁失效了),此时该线程A还没有执行delete方法。

另一个线程B这时候就可以加锁成功了,但是这时候线程A执行了delete方法。但是这时候线程A释放的锁是线程B的。

这时候在极端情况下就会出现 请求A释放请求B的锁,B释放C的,C释放D的,… 最后就会导致大量的超卖问题。

Case 6

该如何解决 deductStock5()中存在的问题。

分析:问题存在的根本原因就是在执行delete方法的时候。自己的锁被其他的线程释放了。

解决办法:给每个线程生成一个唯一id.例如使用uuid. 在最后释放锁的时候判断是否是自己的锁。如果是自己的才释放。

注意:不要使用线程id,不同的服务器可能有相同的线程id


    @RequestMapping("/deduct_stock6")
    public String deductStock6() {
        String localKey = "lock:product:0001";
        String uuid = UUID.randomUUID().toString();
        //这条命令能够保证原子性
        Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, uuid,10,TimeUnit.SECONDS);
        if (!aBoolean){
            return "当前系统繁忙";
        }
        try{
            //获取库存值
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        }finally {
            if (uuid.equals(stringRedisTemplate.opsForValue().get(localKey))){
                stringRedisTemplate.delete(localKey);
            }
        }
        return "end";
    }


存在问题:存在原子性问题


            if (uuid.equals(stringRedisTemplate.opsForValue().get(localKey))){
            // 
                stringRedisTemplate.delete(localKey);
            }


上面的代码中不是原子的。在当前线程执行完if判断却还没有执行delete操作的时候。当前锁过期了。

又可能会出现超卖问题。当前的线程释放了其他线程的锁


解决方式:

1.锁续命(实现不容易)

使用一个分线程,使用定时任务,每过一段时间,判断业务的主线程有没有结束(是否还加着锁)。如果还加着锁,将锁的超时时间重新设置。

2.使用现成的 例如redisson

Case 7


    @RequestMapping("/deduct_stock7")
    public String deductStock7() {
        String lockKey = "lock:product:0001";
        //获取锁对象
        RLock redissonLock = redisson.getLock(lockKey);
        //加分布式锁
        redissonLock.lock();
        try {
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        } finally {
            //解锁
            redissonLock.unlock();
        }
        return "end";
    }


核心使用lua脚本


    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }


Redis Lua脚本


Redis在2.6推出了脚本功能,允许开发者使用Lua语言编写脚本传到Redis中执行。使用脚本的好处如下:


1、减少网络开销:本来5次网络请求的操作,可以用一个请求完成,原先5次请求的逻辑放在redis服务器上完成。使用脚本,减少了网络往返时延。这点跟管道类似。

2、原子操作:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。管道不是原子的,不过redis的批量操作命令(类似mset)是原子的。

3、替代redis的事务功能:redis自带的事务功能很鸡肋,而redis的lua脚本几乎实现了常规的事务功能,官方推荐如果要使用redis的事务功能可以用redis lua替代。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
236 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
2月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
|
2月前
|
数据采集 存储 NoSQL
基于Scrapy-Redis的分布式景点数据爬取与热力图生成
基于Scrapy-Redis的分布式景点数据爬取与热力图生成
219 67
|
22天前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
335 7
|
2月前
|
存储 数据挖掘
Vsan数据恢复——Vsan分布式文件系统数据恢复案例
一台采用VsSAN分布式文件系统的存储设备由于未知原因关机重启。管理员发现上层的虚拟机不可用,存储内的数据丢失。
85 30
|
2月前
|
NoSQL 算法 安全
redis分布式锁在高并发场景下的方案设计与性能提升
本文探讨了Redis分布式锁在主从架构下失效的问题及其解决方案。首先通过CAP理论分析,Redis遵循AP原则,导致锁可能失效。针对此问题,提出两种解决方案:Zookeeper分布式锁(追求CP一致性)和Redlock算法(基于多个Redis实例提升可靠性)。文章还讨论了可能遇到的“坑”,如加从节点引发超卖问题、建议Redis节点数为奇数以及持久化策略对锁的影响。最后,从性能优化角度出发,介绍了减少锁粒度和分段锁的策略,并结合实际场景(如下单重复提交、支付与取消订单冲突)展示了分布式锁的应用方法。
177 3
|
2月前
|
缓存 监控 NoSQL
Redis设计与实现——分布式Redis
Redis Sentinel 和 Cluster 是 Redis 高可用与分布式架构的核心组件。Sentinel 提供主从故障检测与自动切换,通过主观/客观下线判断及 Raft 算法选举领导者完成故障转移,但存在数据一致性和复杂度问题。Cluster 支持数据分片和水平扩展,基于哈希槽分配数据,具备自动故障转移和节点发现机制,适合大规模高并发场景。复制机制包括全量同步和部分同步,通过复制积压缓冲区优化同步效率,但仍面临延迟和资源消耗挑战。两者各有优劣,需根据业务需求选择合适方案。
|
2月前
|
存储 NoSQL Java
从扣减库存场景来讲讲redis分布式锁中的那些“坑”
本文从一个简单的库存扣减场景出发,深入分析了高并发下的超卖问题,并逐步优化解决方案。首先通过本地锁解决单机并发问题,但集群环境下失效;接着引入Redis分布式锁,利用SETNX命令实现加锁,但仍存在死锁、锁过期等隐患。文章详细探讨了通过设置唯一标识、续命机制等方法完善锁的可靠性,并最终引出Redisson工具,其内置的锁续命和原子性操作极大简化了分布式锁的实现。最后,作者剖析了Redisson源码,揭示其实现原理,并预告后续关于主从架构下分布式锁的应用与性能优化内容。
113 0
|
4月前
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
399 0
分布式爬虫框架Scrapy-Redis实战指南
|
5月前
|
NoSQL Java 中间件
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
本文介绍了从单机锁到分布式锁的演变,重点探讨了使用Redis实现分布式锁的方法。分布式锁用于控制分布式系统中多个实例对共享资源的同步访问,需满足互斥性、可重入性、锁超时防死锁和锁释放正确防误删等特性。文章通过具体示例展示了如何利用Redis的`setnx`命令实现加锁,并分析了简化版分布式锁存在的问题,如锁超时和误删。为了解决这些问题,文中提出了设置锁过期时间和在解锁前验证持有锁的线程身份的优化方案。最后指出,尽管当前设计已解决部分问题,但仍存在进一步优化的空间,将在后续章节继续探讨。
768 131
【📕分布式锁通关指南 02】基于Redis实现的分布式锁

热门文章

最新文章