基于redis的分布式锁

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 基于redis的分布式锁

为什么要使用分布式锁?


因为服务器使用了集群方案。词穷。。。


怎么使用分布式锁?


需求


实现一个查询数据库,在大于0的情况下减库存这样小小的功能。

测试:模拟100并发并看结果


基础代码


没有任何锁


    @RequestMapping("/reduce_stock")
    public String reduceStock() {
        //查数据库(redis)中库存数量
        Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        //判断库存
        if (stock > 0) {
            System.out.println("消费库存成功--->" + stock);
            //更新库存
            stock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
        } else {
            System.out.println("消费库存失败。。。");
        }
        return "helloworld";
    }



用测压工具测压结果:出现并发问题


9.png


有锁:给方法添加synchronized关键字


    @RequestMapping("/reduce_stock")
    public synchronized    String  reduceStock() {
        //查数据库(redis)中库存数量
        Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        //判断库存
        if (stock > 0) {
            System.out.println("消费库存成功--->" + stock);
            //更新库存
            stock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
        } else {
            System.out.println("消费库存失败。。。");
        }
        return "helloworld";
    }


单机测试结果:没有问题

分布式测试结果:出现线程安全问题

分析,如下图所示:

两个微服务,synchronized关键字只能锁住一个微服务,跨微服务是锁不住的。

就像你家的屋子A复制一份为B,A是否锁门和B是否锁门是没有关系的。


10.png


基于redis的分布式锁(理论+实操)

理论


11.png


基于redis的setnx命令实现分布式锁


setnx命令的特点是:当你第一次设置的时候会返回1,后面在设置的时候就会返回0(即修改失败),如下图所示


12.png


手写基于redis分布式锁(此处逻辑、理论大于实操)


一代代码


分析


---逻辑:先获取锁,如果获取锁,就继续;否则就不执行


---问题:容易出现死锁。如果我获取锁成功后在执行业务逻辑的过程中出现异常,则释放锁的过程就没有了,不释放锁就会引起死锁


   @RequestMapping("/reduce_stock")
    public String reduceStock() {
        //key的名称
        String lockKey = "lock";
        //setnx key value    加锁逻辑
        Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
        if (!aBoolean){
            return "fail";
        }
        //查数据库(redis)中库存数量
        Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        //判断库存
        if (stock > 0) {
            System.out.println("消费库存成功--->" + stock);
            //更新库存
            stock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
        } else {
            System.out.println("消费库存失败。。。");
        }
        // del key  释放锁逻辑
        stringRedisTemplate.delete(lockKey);
        return "helloworld";
    }


二代代码


分析:


---优点:在finally中释放锁,解决了死锁的问题


---问题:引起锁失效问题。看下面的代码,先加锁,如果加锁失败,返回,但是此时代码也会去执行finally中释放锁的功能,从而使别人加的锁失效。


    @RequestMapping("/reduce_stock")
    public String reduceStock() {
        //key的名称
        String lockKey = "lock";
        try {
            //setnx key value    加锁逻辑
            Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
            if (!aBoolean) {
                return "fail";
            }
            //查数据库(redis)中库存数量
            Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            //判断库存
            if (stock > 0) {
                System.out.println("消费库存成功--->" + stock);
                //更新库存
                stock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            } else {
                System.out.println("消费库存失败。。。");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // del key  释放锁逻辑
            stringRedisTemplate.delete(lockKey);
        }
        return "helloworld";
    }


三代代码


分析:


--优点:解决了锁失效问题


--问题:没有解决因为宕机而引起的死锁,如下图所示,微服务8082获取锁后在执行业务逻辑时系统宕机后就会引起死锁


13.png


    @RequestMapping("/reduce_stock")
    public String reduceStock() {
        //key的名称
        String lockKey = "lock";
        //setnx key value    加锁逻辑
        Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
        if (!aBoolean) {
            return "fail";
        }
        try {
            //查数据库(redis)中库存数量
            Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            //判断库存
            if (stock > 0) {
                System.out.println("消费库存成功--->" + stock);
                //更新库存
                stock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            } else {
                System.out.println("消费库存失败。。。");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // del key  释放锁逻辑
            stringRedisTemplate.delete(lockKey);
        }
        return "helloworld";
    }


四代代码


分析:


--优点:加锁逻辑时设置过期时间,可以解决三代代码的死锁问题,系统中断了我到时间就自动释放锁


14.png


@RequestMapping("/reduce_stock")
    public String reduceStock() {
        //key的名称
        String lockKey = "lock";
        //setnx key value    加锁逻辑
        Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1",30, TimeUnit.SECONDS);
        if (!aBoolean) {
            return "fail";
        }
        try {
            //查数据库(redis)中库存数量
            Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            //判断库存
            if (stock > 0) {
                System.out.println("消费库存成功--->" + stock);
                //更新库存
                stock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            } else {
                System.out.println("消费库存失败。。。");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // del key  释放锁逻辑
            stringRedisTemplate.delete(lockKey);
        }
        return "helloworld";
    }


五代代码


分析:


--优点:解决了四代代码的锁失效问题


--缺点:如下图所示,如果我设置失效时间是30,而我业务逻辑时间是35,在30-35之间是有两个线程同时访问,这与独占锁是矛盾的,所以此处存在问题。


15.png


@RequestMapping("/reduce_stock")
    public String reduceStock() {
        //key的名称
        String lockKey = "lock";
        //value的值
        String clientId = UUID.randomUUID().toString();
        //setnx key value    加锁逻辑
        Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
        if (!aBoolean) {
            return "fail";
        }
        try {
            //查数据库(redis)中库存数量
            Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            //判断库存
            if (stock > 0) {
                System.out.println("消费库存成功--->" + stock);
                //更新库存
                stock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            } else {
                System.out.println("消费库存失败。。。");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //只能释放自己加的锁
            if (clientId.equals(stringRedisTemplate.opsForValue().get("lock"))) {
                // del key  释放锁逻辑
                stringRedisTemplate.delete(lockKey);
            }
        }
        return "helloworld";
    }


瓶颈


我们现在的瓶颈就是超时时间的设置。


如果设置短了会出现五代代码的问题;如果设置长了,你不能保证业务逻辑一定会比你设置的时间短,就算你设置的时间长,10分钟,那万一系统中断10分钟内不能有业务处理,也是不可取的。


如果我们能动态修改这个超时时间,那就无敌了


其实还有一个问题,这短代码的逻辑是获取锁失败后直接返回,其实应该继续尝试获取


基于redisson的分布式锁


原理


16.png


实践

<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.10.0</version>
</dependency>


@Bean
    public Redisson redisson() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.10.30.146:6379").setDatabase(0).setPassword("123456");
        return (Redisson) Redisson.create(config);
    }


@RestController
public class DistributedLockController {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private Redisson redisson;
    @RequestMapping("/reduce_stock")
    public String reduceStock() {
        //key的名称
        String lockKey = "lock";
        RLock lock = redisson.getLock(lockKey);
        lock.lock();
        try {
            //查数据库(redis)中库存数量
            Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            //判断库存
            if (stock > 0) {
                System.out.println("消费库存成功--->" + stock);
                //更新库存
                stock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            } else {
                System.out.println("消费库存失败。。。");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return "helloworld";
    }
}


问题


向redis集群写数据的步骤是:

1)向master节点写数据

2) master节点返回

3)master节点同步到子节点


如果 线程t1 获取锁,写入一个数据    1)  2)成功后 此时master 节点掉线了, 在子节点中选一个master,但是这个master是没有t1写的数据,此时此刻t2是可以获取到锁的,这个是redis做分布式锁的瑕疵。


redis是高性能分布式锁,zk是高可靠分布式锁,看你看重性能还是一致性了。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
1月前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
121 5
|
2月前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
76 8
|
2月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
64 16
|
2月前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
47 5
|
3月前
|
NoSQL Redis 数据库
计数器 分布式锁 redis实现
【10月更文挑战第5天】
59 1
|
3月前
|
NoSQL 算法 关系型数据库
Redis分布式锁
【10月更文挑战第1天】分布式锁用于在多进程环境中保护共享资源,防止并发冲突。通常借助外部系统如Redis或Zookeeper实现。通过`SETNX`命令加锁,并设置过期时间防止死锁。为避免误删他人锁,加锁时附带唯一标识,解锁前验证。面对锁提前过期的问题,可使用守护线程自动续期。在Redis集群中,需考虑主从同步延迟导致的锁丢失问题,Redlock算法可提高锁的可靠性。
90 4
|
3月前
|
缓存 NoSQL 算法
面试题:Redis如何实现分布式锁!
面试题:Redis如何实现分布式锁!
|
缓存 NoSQL Java
为什么分布式一定要有redis?
1、为什么使用redis 分析:博主觉得在项目中使用redis,主要是从两个角度去考虑:性能和并发。当然,redis还具备可以做分布式锁等其他功能,但是如果只是为了分布式锁这些其他功能,完全还有其他中间件(如zookpeer等)代替,并不是非要使用redis。
1371 0
|
机器学习/深度学习 缓存 NoSQL
下一篇
开通oss服务