Redis(三十四)-Redisson分布式锁看门狗

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 上一篇文章我们介绍了用如何用Redis做分布式锁Redis(三十二)-用Redis做分布式锁在文章的末尾留下了个问题:

1. 简介

上一篇文章我们介绍了用如何用Redis做分布式锁Redis(三十二)-用Redis做分布式锁

在文章的末尾留下了个问题:

业务还没执行完,Redis分布式锁就过期了该怎么办?,由于我们给锁指定了过期时间,极有可能会出现业务还还没执行完,分布式锁就过期的情况。针对这种情况,我们该如何处理呢?

可能我们最先想到的方案就是:给分布式锁设置更长的有效时间,但是这只是治标不治本的一种方式。你无法保证业务流程在你设置的过期时间内就一定能执行完成。


针对这种情况,最好的方式就是在分布式锁快要过期,但是,自动延长分布式锁的过期时间。如果要我们来实现这个过程可能有点复杂。

还在已经有大佬帮我们实现了。我们只需要直接拿来用就好了。


现在就隆重请出 Redisson:它独有的看门狗(Watchdog)功能就可以帮我们轻易的实现。

2. Redisson怎么用

2.1. 引入依赖

<!--jedis客户端-->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.6.0</version>
        </dependency>
        <!--jedis客户端-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.16.0</version>
        </dependency>

2.2. 写个简单的demo测试

public class LockDemo {
    private final RedissonClient redissonClient;
    public LockDemo(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }
    public static void main(String[] args) throws InterruptedException {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        LockDemo lockDemo = new LockDemo(Redisson.create(config));
        lockDemo.reentrantLock();
        new Thread(lockDemo::reentrantLock_expire, "线程一").start();
        TimeUnit.SECONDS.sleep(30);
        new Thread(lockDemo::reentrantLock_expire, "线程二").start();
        TimeUnit.SECONDS.sleep(30);
    }
    /**
     * 加锁,默认的时长是30秒,不指定超时时间
     */
    public void reentrantLock() {
        RLock lock = redissonClient.getLock("reentrant-lock-no-expire");
        //没有获取到锁,返回
        lock.lock();
        long startTime = System.currentTimeMillis();
        try {
            // 模拟业务操作耗时
            System.out.println("模拟业务开始");
            TimeUnit.SECONDS.sleep(60);
            System.out.println("模拟业务结束,共耗时=" + (System.currentTimeMillis() - startTime) / 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("手动释放锁,共耗时=" + (System.currentTimeMillis() - startTime) / 1000);
            lock.unlock();
        }
    }
    /**
     * 加锁,指定锁的时长是25秒
     */
    public void reentrantLock_expire() {
        RLock lock = redissonClient.getLock("reentrant-lock-expire");
        long startTime = System.currentTimeMillis();
        lock.lock(25, TimeUnit.SECONDS);
        System.out.println(Thread.currentThread().getName() + "获取到锁");
        try {
            // 模拟业务操作耗时
            System.out.println(Thread.currentThread().getName() + "模拟业务开始");
            TimeUnit.SECONDS.sleep(60);
            System.out.println(Thread.currentThread().getName() + "模拟业务结束,共耗时=" + (System.currentTimeMillis() - startTime) / 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println(Thread.currentThread().getName() + "手动释放锁,共耗时=" + (System.currentTimeMillis() - startTime) / 1000);
            lock.unlock();
        }
    }
    public void release() {
        this.redissonClient.shutdown();
    }
}

运行结果:

从上述运行结果可以看出,如下几个结论。

1.Redisson的lock()方法不指定过期时间的话,默认的过期时间是30秒,当过期时间超过1/3时,看门狗会自动续期(比如过期时间是30秒,则在10s的时候,看门狗就会自动续期),续期后的锁的时长重新变成30s

2.Redisson的lock(long leaseTime, TimeUnit unit)方法指定过期时间时,当到达过期时间时锁会自动释放,也就是说在这种情况下,看门狗失效。

3.Redisson的锁与线程相关,每个线程只能释放自己的锁,不能释放别的线程的锁。

3.源码分析

3.1. 获取锁

//不指定过期时间
  @Override
    public void lock() {
        try {
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }
    //指定过期时间
    @Override
    public void lock(long leaseTime, TimeUnit unit) {
        try {
            lock(leaseTime, unit, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }

这两个访问最终调用的都是lock(long leaseTime, TimeUnit unit, boolean interruptibly) 方法,唯一的区别是lock()方法传入的是参数leaseTime值是-1,而lock(long leaseTime, TimeUnit unit)传入的参数值是leaseTime。

让我们接着看看lock(long leaseTime, TimeUnit unit, boolean interruptibly)方法。

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        //获取锁,获取锁时带入线程ID
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired (获取到锁直接返回)
        if (ttl == null) {
            return;
        }
        //订阅锁,这样锁释放时会被通知到
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }
         //没获取到锁,死循环等待
        .....

这个方法主要tryAcquire(-1, leaseTime, unit, threadId) 方法来获取锁,获取时传入了当前线程ID。而tryAcquire方法内部主逻辑优势调用tryAcquireAsync方法。下面就直接查看tryAcquireAsync方法。

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
        if (leaseTime != -1) {
            //指定过期时间
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            //不指定过期时间时,过期时间设为看门狗超时时间internalLockLeaseTime,然后由看门狗一直续期,直到锁释放
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        //
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }
            // lock acquired(获取到锁)
            if (ttlRemaining == null) {
                if (leaseTime != -1) {
                    //指定过期时间的话,则不续期
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                     // 未指定过期时间,需要开启Watchdog自动续期
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

从这里面我们可以看出两个重要的信息,第一个信息是指定过期时间的话不续期,未指定过期时间的话,则会开启Watchdog自动续期。internalLockLeaseTime的默认时间是30秒。private long lockWatchdogTimeout = 30 * 1000;

首先看下尝试获取锁的实现,tryLockInnerAsync方法通过EVAL执行LUA脚本,代码如下:

return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));

它的主要逻辑是:

1.若锁不存在,则设置锁,并设置过期时间,然后返回nil。

2.若锁存在且由本线程持有,则锁计数加一,并重设过期时间,然后返回nil;

3.否则返回锁的过期时间;

然后,看下看门狗是如何给锁续期的呢?直接查看scheduleExpirationRenewal方法。

protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
           //重入加锁
            oldEntry.addThreadId(threadId);
        } else {
           //第一次加锁,触发定时任务。
            entry.addThreadId(threadId);
            renewExpiration();
        }
    }

接着看看renewExpiration这个方法,

private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
     // 借助Netty的Timeout实现自动续期
    // 超时时间为1/3过期时间,确保在过期前能够重设过期时间
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getRawName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    if (res) {
                        // reschedule itself 
                        renewExpiration();
                    }
                });
            }
            //过期时间超过1/3的话则会需求
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        ee.setTimeout(task);
    }

续期的方法是renewExpirationAsync方法。这个方法也是一个LUA脚本,这个脚本的主要逻辑是,如果锁存在的话,则将过期时间重新设置为30s。

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }

参考

Redisson的“看门狗”机制,一个关于分布式锁的非比寻常的BUG

Redis学习之Redisson分布式锁看门狗

Redisson的看门狗watchDog机制是怎么实现的?

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
3天前
|
供应链 NoSQL Java
关于Redisson分布式锁的用法
Redisson分布式锁是实现分布式系统中资源同步的有效工具。通过合理配置和使用Redisson的各种锁机制,可以确保系统的高可用性和数据一致性。本文详细介绍了Redisson分布式锁的配置、基本用法和高级用法,并提供了实际应用示例,希望对您在实际项目中使用Redisson分布式锁有所帮助。c
29 10
|
17天前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
47 5
|
21天前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
39 8
|
1月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
57 16
|
1月前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
39 5
|
4月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
131 2
基于Redis的高可用分布式锁——RedLock
|
4月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
|
2月前
|
缓存 NoSQL Java
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
69 3
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
|
2月前
|
NoSQL Redis 数据库
计数器 分布式锁 redis实现
【10月更文挑战第5天】
51 1
下一篇
DataWorks