1. 简介
上一篇文章我们介绍了用如何用Redis做分布式锁Redis(三十二)-用Redis做分布式锁
在文章的末尾留下了个问题:
业务还没执行完,Redis分布式锁就过期了该怎么办?,由于我们给锁指定了过期时间,极有可能会出现业务还还没执行完,分布式锁就过期的情况。针对这种情况,我们该如何处理呢?
可能我们最先想到的方案就是:给分布式锁设置更长的有效时间,但是这只是治标不治本的一种方式。你无法保证业务流程在你设置的过期时间内就一定能执行完成。
针对这种情况,最好的方式就是在分布式锁快要过期,但是,自动延长分布式锁的过期时间。如果要我们来实现这个过程可能有点复杂。
还在已经有大佬帮我们实现了。我们只需要直接拿来用就好了。
现在就隆重请出 Redisson:它独有的看门狗(Watchdog)功能就可以帮我们轻易的实现。
2. Redisson怎么用
2.1. 引入依赖
<!--jedis客户端--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.6.0</version> </dependency> <!--jedis客户端--> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.16.0</version> </dependency>
2.2. 写个简单的demo测试
public class LockDemo { private final RedissonClient redissonClient; public LockDemo(RedissonClient redissonClient) { this.redissonClient = redissonClient; } public static void main(String[] args) throws InterruptedException { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); LockDemo lockDemo = new LockDemo(Redisson.create(config)); lockDemo.reentrantLock(); new Thread(lockDemo::reentrantLock_expire, "线程一").start(); TimeUnit.SECONDS.sleep(30); new Thread(lockDemo::reentrantLock_expire, "线程二").start(); TimeUnit.SECONDS.sleep(30); } /** * 加锁,默认的时长是30秒,不指定超时时间 */ public void reentrantLock() { RLock lock = redissonClient.getLock("reentrant-lock-no-expire"); //没有获取到锁,返回 lock.lock(); long startTime = System.currentTimeMillis(); try { // 模拟业务操作耗时 System.out.println("模拟业务开始"); TimeUnit.SECONDS.sleep(60); System.out.println("模拟业务结束,共耗时=" + (System.currentTimeMillis() - startTime) / 1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("手动释放锁,共耗时=" + (System.currentTimeMillis() - startTime) / 1000); lock.unlock(); } } /** * 加锁,指定锁的时长是25秒 */ public void reentrantLock_expire() { RLock lock = redissonClient.getLock("reentrant-lock-expire"); long startTime = System.currentTimeMillis(); lock.lock(25, TimeUnit.SECONDS); System.out.println(Thread.currentThread().getName() + "获取到锁"); try { // 模拟业务操作耗时 System.out.println(Thread.currentThread().getName() + "模拟业务开始"); TimeUnit.SECONDS.sleep(60); System.out.println(Thread.currentThread().getName() + "模拟业务结束,共耗时=" + (System.currentTimeMillis() - startTime) / 1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(Thread.currentThread().getName() + "手动释放锁,共耗时=" + (System.currentTimeMillis() - startTime) / 1000); lock.unlock(); } } public void release() { this.redissonClient.shutdown(); } }
运行结果:
从上述运行结果可以看出,如下几个结论。
1.Redisson的lock()方法不指定过期时间的话,默认的过期时间是30秒,当过期时间超过1/3时,看门狗会自动续期(比如过期时间是30秒,则在10s的时候,看门狗就会自动续期),续期后的锁的时长重新变成30s
2.Redisson的lock(long leaseTime, TimeUnit unit)方法指定过期时间时,当到达过期时间时锁会自动释放,也就是说在这种情况下,看门狗失效。
3.Redisson的锁与线程相关,每个线程只能释放自己的锁,不能释放别的线程的锁。
3.源码分析
3.1. 获取锁
//不指定过期时间 @Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } } //指定过期时间 @Override public void lock(long leaseTime, TimeUnit unit) { try { lock(leaseTime, unit, false); } catch (InterruptedException e) { throw new IllegalStateException(); } }
这两个访问最终调用的都是lock(long leaseTime, TimeUnit unit, boolean interruptibly) 方法,唯一的区别是lock()方法传入的是参数leaseTime值是-1,而lock(long leaseTime, TimeUnit unit)传入的参数值是leaseTime。
让我们接着看看lock(long leaseTime, TimeUnit unit, boolean interruptibly)方法。
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); //获取锁,获取锁时带入线程ID Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired (获取到锁直接返回) if (ttl == null) { return; } //订阅锁,这样锁释放时会被通知到 RFuture<RedissonLockEntry> future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } //没获取到锁,死循环等待 .....
这个方法主要tryAcquire(-1, leaseTime, unit, threadId) 方法来获取锁,获取时传入了当前线程ID。而tryAcquire方法内部主逻辑优势调用tryAcquireAsync方法。下面就直接查看tryAcquireAsync方法。
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; if (leaseTime != -1) { //指定过期时间 ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //不指定过期时间时,过期时间设为看门狗超时时间internalLockLeaseTime,然后由看门狗一直续期,直到锁释放 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } // ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired(获取到锁) if (ttlRemaining == null) { if (leaseTime != -1) { //指定过期时间的话,则不续期 internalLockLeaseTime = unit.toMillis(leaseTime); } else { // 未指定过期时间,需要开启Watchdog自动续期 scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
从这里面我们可以看出两个重要的信息,第一个信息是指定过期时间的话不续期,未指定过期时间的话,则会开启Watchdog自动续期。internalLockLeaseTime的默认时间是30秒。private long lockWatchdogTimeout = 30 * 1000;
首先看下尝试获取锁的实现,tryLockInnerAsync方法通过EVAL执行LUA脚本,代码如下:
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
它的主要逻辑是:
1.若锁不存在,则设置锁,并设置过期时间,然后返回nil。
2.若锁存在且由本线程持有,则锁计数加一,并重设过期时间,然后返回nil;
3.否则返回锁的过期时间;
然后,看下看门狗是如何给锁续期的呢?直接查看scheduleExpirationRenewal方法。
protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { //重入加锁 oldEntry.addThreadId(threadId); } else { //第一次加锁,触发定时任务。 entry.addThreadId(threadId); renewExpiration(); } }
接着看看renewExpiration这个方法,
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // 借助Netty的Timeout实现自动续期 // 超时时间为1/3过期时间,确保在过期前能够重设过期时间 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } if (res) { // reschedule itself renewExpiration(); } }); } //过期时间超过1/3的话则会需求 }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
续期的方法是renewExpirationAsync方法。这个方法也是一个LUA脚本,这个脚本的主要逻辑是,如果锁存在的话,则将过期时间重新设置为30s。
protected RFuture<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }