用万字长文来讲讲本地锁至分布式锁的演进和Redis实现,扩展 Redlock 红锁1:https://developer.aliyun.com/article/1394703
3.2、解决死锁问题
最容易想到也是最简单的方式就是给这个分布式锁加个过期时间,比如3s、5s
之类,使用EXPIRE
设置一个过期时间
但如果想到的是这种:
redis> SETNX mykey "Hello" (integer) 1 redis> EXPIRE mykey 10 (integer) 1 redis> SETNX mykey "World" (integer) 0 redis> TTL mykey (integer) 8 redis> GET mykey "Hello"
如果是这么实现的话,咱们就还是对于 Redis 了解太少了。
首先说说这样操作会出现的问题:
- 我成功设置了
key
,但是还没执行到设置时间那一步,应用程序突然挂了,导致死锁问题产生。 - 或者是成功设置了
key
后,Redis 服务突然崩溃了,不干活了,这也导致了后面的EXPIRE
无法执行成功,同样会产生死锁问题。
【重点】:因为加锁和设置时间不是一个原子性操作
怎么才能将加锁和设置锁时间变成一个原子操作呢?
其实这一步Redis已经帮我们做好啦~
Redis 中的set
命令是可以添加参数的,一条set命令即可实现SETNX+EXPIRE
效果,将加锁和设置时间变成一个原子性操作,要么一起成功,要么一起失败。
完整命令参数:文档链接
SET key value [EX seconds|PX milliseconds|KEEPTTL] [NX|XX] [GET]
EX
seconds – 设置键key的过期时间,单位时秒PX
milliseconds – 设置键key的过期时间,单位时毫秒NX
– 只有键key不存在的时候才会设置key的值XX
– 只有键key存在的时候才会设置key的值KEEPTTL
-- 获取 key 的过期时间- GET -- 返回 key 存储的值,如果 key 不存在返回空
注意: 由于SET
命令加上选项已经可以完全取代SETNX
, SETEX, PSETEX, GETSET,的功能,所以在将来的版本中,redis可能会不推荐使用并且最终抛弃这几个命令。
返回值
字符串: 如果SET
命令正常执行那么回返回OK
多行字符串: 使用 GET 选项,返回 key 存储的值,如果 key 不存在返回空: 否则如果加了NX
或者 XX
选项,SET 没执行,那么会返回nil。
例子:
从这个小案例中可以看出,这是符合我们要的命令的~
Java 代码实现:这一步的代码实现和上一小节相比,仅改动了一行代码:
上一小节:
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(SET_NX_EX_MENU_LIST_LOCK, SET_NX_EX_MENU_LIST_LOCK_VALUE);
加上过期时间:
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(SET_NX_EX_MENU_LIST_LOCK, SET_NX_EX_MENU_LIST_LOCK_VALUE, 5L, TimeUnit.SECONDS);
就这样,再那样,再这样,你看死锁问题就被解决啦😜
死锁问题确实被解决了,现在你觉得还有问题吗?
我们把它的每一个步骤都拆分来看,就会出现下面的这样一个场景:
假设我们约定锁过期时间为5s
,但是执行这个业务时突然卡起来,此业务执行时间超过了我们预估的 5s
,那么就可能出现以下情况:
第一条线程抢到锁,业务执行超时,第一条线程所持有的锁被自动释放;此时第二条线程拿到锁,准备执行业务,刚好第一条线程业务执行完成,照常执行了释放锁的过程,导致第二条线程持有的锁被第一条线程所释放,锁被其他人释放。
这个情况中存在两个问题:
- 业务执行超时,锁被自动释放
- 锁被其他人释放,导致业务出现问题
关于第一个问题,就是常说的锁续期问题,这点之后在使用 Redission 时再细谈。
第二个问题,就比较好解决了,我们每次加锁的时候,带上自己的身份标识,在解锁的时候,进行一次判断即可。
接着往下看吧 👇
3.3、锁被其他人释放,该怎么办?
我们每次加锁的时候,带上自己的身份标识,在解锁的时候,进行一次判断即可。
比如:加锁的时候,生成一个UUID
作为 KEY
,释放锁时,获取一下锁,判断一下UUID
是否相等,相等则执行删除,否则不执行删除。
在 Redis 中命令演示如下:
设置锁:set uuid "lock" NX EX 60
释放锁:1、get uuid
,身份标识相等,则执行2;2、 del uuid
Java 代码如下:
/** * @description: * @author: Ning Zaichun * @date: 2022年09月20日 20:59 */ @Service public class SetNxExLockServiceImpl implements ISetNxExLockService { @Autowired StringRedisTemplate stringRedisTemplate; @Autowired private MenuMapper menuMapper; private static final String SET_NX_EX_MENU_LIST = "set:nx:ex:menu:list"; private static final String SET_NX_EX_MENU_LIST_LOCK = "set:nx:ex:menu:list:lock"; private static final String SET_NX_EX_MENU_LIST_LOCK_VALUE = "lock"; @Override public List<MenuEntity> getList() { // 判断缓存是否有数据 String menuJson = stringRedisTemplate.opsForValue().get(SET_NX_EX_MENU_LIST); if (menuJson != null) { System.out.println("缓存中有,直接返回缓存中的数据"); List<MenuEntity> menuEntityList = JSON.parseObject(menuJson, new TypeReference<List<MenuEntity>>() { }); return menuEntityList; } // 从数据库中查询 List<MenuEntity> result = getMenuJsonFromDbWithRedisLock(); return result; } /** * 问题:锁被其他人释放,这该如何处理。 * 加锁的时候,将值给定位一个唯一的标识符(我这里使用的是 UUID ) * 1、解锁之前,先判断是不是自己获取的那把锁, * 2、确定是一把锁就执行 解锁锁操作 * * @return */ public List<MenuEntity> getMenuJsonFromDbWithRedisLock() { List<MenuEntity> result = new ArrayList<>(); System.out.println("缓存中没有,加锁,重新从数据中查询~==>"); // 给锁设定一个时间 String uuid = UUID.randomUUID().toString(); Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(SET_NX_EX_MENU_LIST_LOCK, uuid, 5L, TimeUnit.SECONDS); if (lock) { System.out.println("获取分布式锁成功..."); try { //加锁成功...执行业务 result = menuMapper.selectList(new QueryWrapper<MenuEntity>()); stringRedisTemplate.opsForValue().set(SET_NX_EX_MENU_LIST, JSON.toJSONString(result)); } finally { // 获取锁,判断是不是当前线程的锁 String token = stringRedisTemplate.opsForValue().get(SET_NX_EX_MENU_LIST_LOCK); if (uuid.equals(token)) { // 确定是同一把锁, 才释放锁 stringRedisTemplate.delete(SET_NX_EX_MENU_LIST_LOCK); } } return result; /** * 那这样就没有问题了吗? * 并不是。 * 这里存在的问题也很明显,删除操作已经不在是一个原子性操作了。 * 1、一个是查询判断 * 2、第二个才是解锁操作 * 那么又会拆分成,如果我第一步执行成功,第二步执行失败的场景,所以我们要把它变成原子操作才行。 */ } else { System.out.println("获取分布式锁失败...等待重试..."); //加锁失败...重试机制 //休眠一百毫秒 try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return getMenuJsonFromDbWithRedisLock(); } } @Override public Boolean updateMenuById(MenuEntity menu) { // return updateMenuByIdWithLock(menu); // return updateMenuWithExpireLock(menu); return updateMenuWithRedisLock(menu); } public Boolean updateMenuWithExpireLock(MenuEntity menu) { // 给锁设定一个时间 Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(SET_NX_EX_MENU_LIST_LOCK, SET_NX_EX_MENU_LIST_LOCK_VALUE, 5L, TimeUnit.SECONDS); Boolean update = false; if (lock) { System.out.println("更新操作:获取分布式锁成功===> 清除缓存"); try { // 删除缓存 stringRedisTemplate.delete(SET_NX_EX_MENU_LIST); // 更新数据库 update = menuMapper.updateById(menu) > 0; } finally { // 一定要释放锁,以免造成死锁问题 stringRedisTemplate.delete(SET_NX_EX_MENU_LIST_LOCK); } return update; } else{ try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } return updateMenuWithExpireLock(menu); } } }
那么这就没有问题了吗?
并不是。这里存在的问题也很明显,删除操作又已经不再是一个原子性操作了。
- 第一步是查询判断
- 第二步才是解锁操作
那么继而又会出现,我第一步执行成功,第二步执行失败的场景,所以我们必须要把它变成原子性操作才行。
这个时就要用到了 Redis 中的 lua 脚本啦~
让它去帮助我们实现将判断和解锁变成一步原子性操作~ 接着看吧
用万字长文来讲讲本地锁至分布式锁的演进和Redis实现,扩展 Redlock 红锁3:https://developer.aliyun.com/article/1394710