【Redis】6、Redisson 分布式锁的简单使用(可重入、重试机制...)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 【Redis】6、Redisson 分布式锁的简单使用(可重入、重试机制...)


零、自己通过 set nx ex 实现的分布式锁存在的问题

✏️ 不可重入 同一个线程无法多次获取同一把锁

✏️ 不可重试 获取锁只尝试一次就返回 false,没有重试机制

✏️ 超时释放 锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患

一、Redisson 介绍

✏️ Redisson 是一个在 Redis 基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)

✏️ 它提供了一系列分布式的 Java 常用对象

✏️ 提供了许多分布式服务,其中包含了各种分布式锁的实现

官网地址 https://redisson.org

GitHub 地址 https://github.com/redisson/redisson

二、Redisson 基本使用(改造业务)

(1) 依赖

<dependency>
       <groupId>org.redisson</groupId>
       <artifactId>redisson</artifactId>
       <version>3.13.6</version>
   </dependency>

(2) 配置 Redisson 客户端

@Configuration
@SuppressWarnings("all")
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.88.130:6379")
                .setPassword("root");
        return Redisson.create(config);
    }
}

✏️ 配置 redis 地址也可以使用 config.useClusterServers() 添加集群地址

✏️ useSingleServer() 是添加单点地址

(3) 使用 Redisson 的可重入锁

@Test
 public void tesRedisson() throws InterruptedException {
     // 获取可重入锁, 指定锁的名称
     RLock lock = redissonClient.getLock("anLock");
     // 尝试获取锁
     // 参数1:获取锁的最大等待时间(期间会多次重试获取锁)
     // 参数2:锁自动释放时间
     // 参数3:时间单位
     boolean isGetLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
     if (isGetLock) {
         try {
             System.out.println("执行业务");
         } finally {
             lock.unlock();
         }
     }
 }

三、Redisson 可重入锁原理

✏️ 通过 Redis 的 Hash 数据结构实现

✏️ 存入线程标识和 counter【记录锁被重入的次数(被使用的次数),被使用(获取)一次就递增1,被释放则 counter 减1】

✏️ 当 counter 为 0 的时候,整个锁会被释放

@Resource
    private RedissonClient redissonClient;
    @SuppressWarnings("all")
    @Test
    public void tRedisson() {
        RLock lock = redissonClient.getLock("lock");
        boolean getLock = lock.tryLock();
        if (!getLock) {
            System.out.println("tRedisson getLock Failed");
            return;
        }
        try {
            System.out.println("tRedisson getLock Success");
            m(lock);
        } finally {
            System.out.println("tRedisson unlock");
            lock.unlock();
        }
    }
    public void m(RLock lock) {
        boolean getLock = lock.tryLock();
        if (!getLock) {
            System.out.println("m() getLock Failed");
            return;
        }
        try {
            System.out.println("m() getLock Success");
        } finally {
            System.out.println("m() unlock");
            lock.unlock();
        }
    }


✏️ 没有直接使用 hset

✏️ hincrby 命令:当 key 不存在的时候会自动创建 key

✏️ 获取锁成功:返回 nil;获取锁失败:返回当前锁的存活时间

✏️ pttl毫秒为单位返回某个 key 的存活时间

✏️ redis.call('publish', KEYS[2], ARGV[1]); 发布释放锁的消息通知

四、Redisson 可重试原理

下面是 Redisson 的 tryLock() 方法的源码:

@Override
 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
     // 把超时等待时间转换为毫秒
     long time = unit.toMillis(waitTime);
     // 获取当前时间(毫秒)
     long current = System.currentTimeMillis();
     // 获取线程 ID
     long threadId = Thread.currentThread().getId();
     // tryAcquire 尝试获取锁(获取锁成功返回 null, 获取锁失败返回剩余ttl)
     Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
     // lock acquired
     if (ttl == null) { // 成功获取锁
         return true;
     }
     
     // time 是超时释放时间(单位:毫秒)
     // time = time - (System.currentTimeMillis() - current)
     time -= System.currentTimeMillis() - current; // 刷新超时释放时间
     if (time <= 0) { // 超时释放时间到了
         acquireFailed(waitTime, unit, threadId);
         return false; // 获取锁失败
     }
     
     // 获取 当前时间毫秒值
     current = System.currentTimeMillis();
     // 等待释放锁的通知(订阅释放锁的信号)
     RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
  // 等到超时释放时间结束还没有收到释放锁的通知的话, 返回 false
     // 获取锁失败 
     if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
         if (!subscribeFuture.cancel(false)) { // 取消订阅
             subscribeFuture.onComplete((res, e) -> {
                 if (e == null) {
                     unsubscribe(subscribeFuture, threadId);
                 }
             });
         }
     
         acquireFailed(waitTime, unit, threadId);
         return false;
     }
     
     // 接收到释放锁的信号
     try {
      // 判断释放到超时时间
         time -= System.currentTimeMillis() - current;
         if (time <= 0) {
             acquireFailed(waitTime, unit, threadId);
             return false; // 获取锁失败
         }
     
         while (true) { // 反复尝试
             long currentTime = System.currentTimeMillis();
             ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
             // lock acquired
             if (ttl == null) { // 获取锁成功
                 return true;
             }
             time -= System.currentTimeMillis() - currentTime;
             if (time <= 0) {
                 acquireFailed(waitTime, unit, threadId);
                 return false;
             }
             // waiting for message
             // 代码来到这里:没有获取到锁, 超时时间有剩余
             // 等待释放锁的信号
             currentTime = System.currentTimeMillis();
             if (ttl >= 0 && ttl < time) {
                 subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
             } else {
                 subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
             }
             time -= System.currentTimeMillis() - currentTime;
             if (time <= 0) {
                 acquireFailed(waitTime, unit, threadId);
                 return false;
             }
         }
     } finally {
         unsubscribe(subscribeFuture, threadId);
     }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
 }
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
      // 判断超时释放时间是否是 -1
      if (leaseTime != -1) {
          return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
      }
      // 代码来到这里表示超时释放时间是 -1
      // tryLockInnerAsync 异步的方法
      RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                                  commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                                  TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
      ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
          if (e != null) {
              return;
          }
          // lock acquired
          if (ttlRemaining) {
              scheduleExpirationRenewal(threadId);
          }
      });
      return ttlRemainingFuture;
  }

tryLockInnerAsync()、tryAcquire(): ① 尝试获取锁。获取锁成功,返回 nil;获取锁失败,返回超时是否时间(lessTime)

② 该方法是异步的

① 获取锁失败后,不会立刻重新尝试获取锁

② 会等待释放锁的通知

五、Redisson 超时释放(锁的 ttl)

好😵懵逼啊,随便看看得了

📝 可重入:利用hash结构记录线程id和重入次数

📝 可重试:利用信号量和 PubSub 功能实现等待、唤醒,获取锁失败的重试机制

📝 超时续约:利用 watchDog,每隔一段时间(releaseTime / 3),重置超时时间

六、主从一致(连锁 MultiLock)

✏️主节点支持写请求

✏️从节点支持读请求

✏️主节点写入的数据要同步到从节点

✏️假如向主节点写入了数据,主节点还没有来得及向从节点同步数据自己就宕机了,此时 Redis 的哨兵机制会从剩下的从节点中选一个从节点作为新的主节点

✏️ 该新的主节点是没有上个主节点中的 lock = thread1 数据的

✏️ 锁失效,可能存在线程安全问题


✏️其中有任何一个节点宕机都无法拿到锁

七、锁总结

🎄不可重入 Redis 分布式锁:

原理: 利用 setnx 的互斥性;利用 ex 避免死锁;释放锁时判断线程标

缺陷: 不可重入、无法重试、锁超时失效

🎄可重入的 Redis 分布式锁:

原理: 利用 Hash 结构,记录线程标识和重入次数;利用 watchDog 延续锁过期时间;利用信号量控制锁重试等待

缺陷: redis宕机引起锁失效问题

🎄 Redisson 的 multiLock:

原理: 多个独立的 Redis 节点,必须在所有节点都获取重入锁,才算获取锁成功

缺陷: 运维成本高、实现复杂

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
5天前
|
供应链 NoSQL Java
关于Redisson分布式锁的用法
Redisson分布式锁是实现分布式系统中资源同步的有效工具。通过合理配置和使用Redisson的各种锁机制,可以确保系统的高可用性和数据一致性。本文详细介绍了Redisson分布式锁的配置、基本用法和高级用法,并提供了实际应用示例,希望对您在实际项目中使用Redisson分布式锁有所帮助。c
33 10
|
19天前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
50 5
|
22天前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
39 8
|
1月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
57 16
|
1月前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
40 5
|
2月前
|
NoSQL Java API
美团面试:Redis锁如何续期?Redis锁超时,任务没完怎么办?
在40岁老架构师尼恩的读者交流群中,近期有小伙伴在面试一线互联网企业时遇到了关于Redis分布式锁过期及自动续期的问题。尼恩对此进行了系统化的梳理,介绍了两种核心解决方案:一是通过增加版本号实现乐观锁,二是利用watch dog自动续期机制。后者通过后台线程定期检查锁的状态并在必要时延长锁的过期时间,确保锁不会因超时而意外释放。尼恩还分享了详细的代码实现和原理分析,帮助读者深入理解并掌握这些技术点,以便在面试中自信应对相关问题。更多技术细节和面试准备资料可在尼恩的技术文章和《尼恩Java面试宝典》中获取。
美团面试:Redis锁如何续期?Redis锁超时,任务没完怎么办?
|
NoSQL Redis 数据库
用redis实现分布式锁时容易踩的5个坑
云栖号资讯:【点击查看更多行业资讯】在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来! 近有不少小伙伴投入短视频赛道,也出现不少第三方数据商,为大家提供抖音爬虫数据。 小伙伴们有没有好奇过,这些数据是如何获取的,普通技术小白能否也拥有自己的抖音爬虫呢? 本文会全面解密抖音爬虫的幕后原理,不需要任何编程知识,还请耐心阅读。
用redis实现分布式锁时容易踩的5个坑
|
NoSQL Java 关系型数据库
浅谈Redis实现分布式锁
浅谈Redis实现分布式锁
|
存储 canal 缓存