开启掘金成长之旅!这是我参与「掘金日新计划 · 2 月更文挑战」的第 4 天,点击查看活动详情
相信在日常开发中,基于 Redis 天然支持分布式锁,大家在线上分布式项目中都使用过 Redis 锁。本文主要针对日常开发中加锁过程中某些异常场景进行讲解与分析。本文讲解示例代码都在 github.com/wayn111/new… 项目 test
目录下 RedisLockTest
类中。
版本声明:
Spring Boot
版本 3.0.2- 演示项目地址:github.com/wayn111/new…
- github地址:github.com/wayn111 欢迎大家关注,点个star
一、任务超时,锁已经过期
这个异常场景说实话发生概率很低,大部分情况下加锁时任务执行都会很快,锁还没到期,任务自己就会删除锁。除非说任务调用第三方接口不稳定导致超时、数据库查询突然变得非常慢就可能会产生这个异常场景。
那怎么处理这个异常嘞?大部分人可能都会回答添加一个定时任务,在定时任务内检测锁快过期时,进行续期操作。OK,这么做好像是可以解决这个异常,那么博主在这里给出自己的见解。
1.1 先说一个暴论:如果料想到有这类异常产生,为什么不在加锁时,就把加锁过期时间设置大一点
不管所续期还是增大加锁时长,都会导致一个问题,其他线程会迟迟获取不到锁,一直被阻塞。那结果都一样,为什么不直接增大加锁时间?
想法是好的,但是实际上,加锁时间的设置是我们主观臆断的,我们无法保证这个加锁代码的执行时间一定在我们的锁过期时间内。作为一个严谨的程序员,我们需要对我们的代码有客观认知,任务执行可能几千上亿万次都是正常,但就是那么一次它执行超时了,可能由于外部依赖、当前运行环境的异常导致。
1.2 直接不设置过期时间,任务不执行完,不释放锁
如果在加锁时就不设置过期时间的话,理论上好像是可以解决这个问题,任务不执行完,锁就不会释放。但是作为程序员,总觉得哪里怪怪的,任务不执行完,锁就不会释放!
仔细想想,我们一般在 try 中进行加锁 在 finally 进行锁释放,这个好像也没毛病哦。但是实际针对一些极端异常场景下,如果任务执行过程中,服务器宕机、程序突然被杀掉、网络断连等都可能造成这个锁释放不了,另一个任务就一直获取不到锁。
这个方案程序正常的情况下,可以满足我们的要求,但是一旦发生异常将导致锁无法释放的后果,也就是说只要我们解决这个锁在异常场景下无法释放的问题,这个方案还是OK的。博主这里直接给出方案:
在不设置过期时间的加锁操作成功时,给一个默认过期时间比如三十秒,同时启动一个定时任务,给我们的锁进行自动续期,每隔 默认过期时间 / 3
秒后执行一次续期操作,发生锁剩余时长小于 默认过期时间 / 2
就重新赋值过期时长为三十秒。这样的话,可以保证锁必须由任务执行完才能释放,当程序异常发生时,仍然能保证锁会在三十秒内释放。
1.3 设置过期时间,任务不执行完,不释放锁
这个方案本质上与方案二的解决方案相同,还是启动定时任务进行续期操作,流程这里不做多余讲述。需要注意的就是加锁指定过期时间会比较符合我们的客观认知。实际上他的底层逻辑跟方案二相同,无非就是定时任务执行间隔,锁剩余时长续期判断要根据过期时间来计算。
综合来看:方案三会最合适,符合我们的客观认知,跟我们之前对 Redis 的使用逻辑较为相近。
二、线程B加锁执行中未释放锁,线程A释放了线程B的锁
说实话我仔细思考了一下这个异常场景,发现这个异常是个伪命题,如果线程 B 正在执行时,线程 A 怎么能获取到线程B的锁!线程 A 获取不到线程 B 的锁,谈何来去释放线程 B 的锁!如果线程 A 能获取到线程 B 的锁那么这个分布式锁的代码一开始就已经错了。
这里回到这个异常场景本身,我们可以给每个线程设置请求ID,加锁成功将请求ID设置为加锁 key 的对应 value,线程释放锁时需要判断当前线程的请求ID与 加锁 key 的对应 value 是否相同,相同则可以释放锁,不相同则不允许释放。
三、线程加锁成功后继续申请加锁
这个场景主要发生在加锁代码内部调用栈过深,比如说加锁成功执行方法 a,在方法 a 内又重复申请了同一把锁,导致线程把自己锁住了,这个业界的主流叫法是叫锁的可重入性。
解决方式有两种,一是修改方法内的加锁逻辑,不要加同一把锁,修改方法 a 内的加锁 key 名称。二是针对加锁逻辑做修改,实现可重入性。
这里简单介绍如何实现可重入性,给每个线程设置请求ID,加锁成功将请求ID设置为加锁 key 的对应 value,针对同一个线程的重复加锁,判断当前线程已存在请求ID的情况下,请求ID直接与加锁 key 的对应 value 相比较,相同则直接返回加锁成功。
四、 代码实践
4.1 加锁自动续期实践
设置锁过期时间为10秒,然后该任务执行15秒,代码如下:
ps: 以下代码都可以在 github.com/wayn111/new… 项目
test
目录下RedisLockTest
类中找到
@Slf4j @SpringBootTest @RunWith(SpringRunner.class) public class RedisLockTest { @Autowired private RedisLock redisLock; @Test @Test public void redisLockNeNewTest() { String key = "test"; try { log.info("---申请加锁"); if (redisLock.lock(key, 10)) { // 模拟任务执行15秒 log.info("---加锁成功"); Thread.sleep(15000); log.info("---执行完毕"); } } catch (Exception e) { log.error(e.getMessage(), e); } finally { redisLock.unLock(key); } } }
执行如下:
可以看出就算任务执行超过过期时间也能通过自动续期让代码正常执行。
4.2 多线程下其他线程无法共同申请到同一把锁实践
启动两个线程,线程 A 先加锁, 线程 B 后枷锁
@Test public void redisLockReleaseSelfTest() throws IOException { new Thread(() -> { String key = "test"; try { log.info("---申请加锁"); if (redisLock.lock(key, 10)) { // 模拟任务执行15秒 log.info("---加锁成功"); Thread.sleep(15000); log.info("---执行完毕"); } else { log.info("---加锁失败"); } } catch (Exception e) { log.error(e.getMessage(), e); } finally { redisLock.unLock(key); } }, "thread-A").start(); new Thread(() -> { String key = "test"; try { Thread.sleep(100L); log.info("---申请加锁"); if (redisLock.lock(key, 10)) { // 模拟任务执行15秒 log.info("---加锁成功"); Thread.sleep(15000); log.info("---执行完毕"); } else { log.info("---加锁失败"); } } catch (Exception e) { log.error(e.getMessage(), e); } finally { redisLock.unLock(key); } }, "thread-B").start(); System.in.read(); }
结果如下:
可以看到,线程 A 先申请到锁,线程 B 后申请锁,结果线程 B 申请加锁失败。
4.3 锁得可重入性实践
当前线程加锁成功后,在线程执行中继续申请同一把锁,代码如下:
@Test public void redisLockReEntryTest() { String key = "test"; try { log.info("---申请加锁"); if (redisLock.lock(key, 10)) { // 模拟任务执行15秒 log.info("---加锁第一次成功"); if (redisLock.lock(key, 10)) { // 模拟任务执行15秒 log.info("---加锁第二次成功"); Thread.sleep(15000); log.info("---加锁第二次执行完毕"); } else { log.info("---加锁第二次失败"); } Thread.sleep(15000); log.info("---加锁第一次执行完毕"); } else { log.info("---加锁第一次失败"); } } catch (Exception e) { log.error(e.getMessage(), e); } finally { redisLock.unLock(key); } }
结果如下:
可以看到,线程在第一次申请加锁成功后,继续申请同一把锁,仍然加锁成功,因此这个操作是实现了可重入性的,虽然没那么细致。
4.4 加解锁逻辑讲解
直接贴出本文最核心 RedisLock 类全部代码:
@Slf4j @Component public class RedisLock { @Autowired public RedisTemplate redisTemplate; /** * 默认锁过期时间20秒 */ public static final Integer DEFAULT_TIME_OUT = 30; /** * 保存线程id-ThreadLocal */ private ThreadLocal<String> stringThreadLocal = new ThreadLocal<>(); /** * 保存定时任务(watch-dog)-ThreadLocal */ private ThreadLocal<ExecutorService> executorServiceThreadLocal = new ThreadLocal<>(); /** * 加锁,不指定过期时间 * * @param key key名称 * @return boolean */ public boolean lock(String key) { return lock(key, null); } /** * 加锁 * * @param key key名称 * @param timeout 过期时间 * @return boolean */ public boolean lock(String key, Integer timeout) { Integer timeoutTmp; if (timeout == null) { timeoutTmp = DEFAULT_TIME_OUT; } else { timeoutTmp = timeout; } String nanoId; if (stringThreadLocal.get() != null) { nanoId = stringThreadLocal.get(); } else { nanoId = IdUtil.nanoId(); stringThreadLocal.set(nanoId); } RedisScript<Long> redisScript = new DefaultRedisScript<>(buildLuaLockScript(), Long.class); Long execute = (Long) redisTemplate.execute(redisScript, Collections.singletonList(key), nanoId, timeoutTmp); boolean flag = execute != null && execute == 1; if (flag) { ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); executorServiceThreadLocal.set(scheduledExecutorService); scheduledExecutorService.scheduleWithFixedDelay(() -> { RedisScript<Long> renewRedisScript = new DefaultRedisScript<>(buildLuaRenewScript(), Long.class); Long result = (Long) redisTemplate.execute(renewRedisScript, Collections.singletonList(key), nanoId, timeoutTmp); if (result != null && result == 2) { ThreadUtil.shutdownAndAwaitTermination(scheduledExecutorService); } }, 0, timeoutTmp / 3, TimeUnit.SECONDS); } return flag; } /** * 释放锁 * * @param key key名称 * @return boolean */ public boolean unLock(final String key) { String nanoId = stringThreadLocal.get(); RedisScript<Long> redisScript = new DefaultRedisScript<>(buildLuaUnLockScript(), Long.class); Long execute = (Long) redisTemplate.execute(redisScript, Collections.singletonList(key), nanoId); boolean flag = execute != null && execute == 1; if (flag) { if (executorServiceThreadLocal.get() != null) { ThreadUtil.shutdownAndAwaitTermination(executorServiceThreadLocal.get()); } } return flag; } private String buildLuaLockScript() { return """ local key = KEYS[1] local value = ARGV[1] local time_out = ARGV[2] local result = redis.call('get', key) if result == value then return 1; end local lock_result = redis.call('setnx', key, value) if tonumber(lock_result) == 1 then redis.call('expire', key, time_out) return 1; else return 0; end """; } private String buildLuaUnLockScript() { return """ local key = KEYS[1] local value = ARGV[1] local result = redis.call('get', key) if result ~= value then return 0; else redis.call('del', key) end return 1; """; } private String buildLuaRenewScript() { return """ local key = KEYS[1] local value = ARGV[1] local timeout = ARGV[2] local result = redis.call('get', key) if result ~= value then return 2; end local ttl = redis.call('ttl', key) if tonumber(ttl) < tonumber(timeout) / 2 then redis.call('expire', key, timeout) return 1; else return 0; end """; } }
加锁逻辑:这里我把加锁逻辑分解成三步展示给大家
- 加锁前:先判断当前线程是否存在请求ID,不存在则生成,存在就直接使用。
- 加锁中:通过 lua 脚本执行原子加锁操作, 加锁时先判断当前线程ID与加锁 key 得 value 是否相等,相等则是同一个线程的锁重入,直接返加锁成功。不相等则设置加锁 value 为请求ID以及过期时间。
- 加锁后:启动一个定时任务,每隔
过期时间 / 3
秒后执行一次续期操作,发现锁剩余时间不足过期时间 / 2
秒后,通过 lua 脚本进行续期操作。
解锁逻辑:这里我把解锁逻辑分解成两步展示给大家
- 解锁中:通过 lua 脚本执行解锁操作,先判断加锁 key 的 value 是否与自身请求ID相同,相同则让解锁,不相同则不让解锁。
- 解锁后:删除续期定时任务。
五、总结
其实本文得核心逻辑有许多都是参考 Redission 客户端而写,对于这些常见得坑点,博主结合自身思考,业界知识总结并自己实现一个分布式锁得工具类。希望大家看了有所收获,对日常业务中 Redis 分布式锁的使用能有更深的理解。