一、Redisson概述
什么是Redisson?
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。
其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。
Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
一个基于Redis实现的分布式工具,有基本分布式对象和高级又抽象的分布式服务,为每个试图再造分布式轮子的程序员带来了大部分分布式问题的解决办法。
Redisson和Jedis、Lettuce有什么区别?倒也不是雷锋和雷锋塔
Redisson和它俩的区别就像一个用鼠标操作图形化界面,一个用命令行操作文件。Redisson是更高层的抽象,Jedis和Lettuce是Redis命令的封装。
- Jedis是Redis官方推出的用于通过Java连接Redis客户端的一个工具包,提供了Redis的各种命令支持
- Lettuce是一种可扩展的线程安全的 Redis 客户端,通讯框架基于Netty,支持高级的 Redis 特性,比如哨兵,集群,管道,自动重新连接和Redis数据模型。Spring Boot 2.x 开始 Lettuce 已取代 Jedis 成为首选 Redis 的客户端。
- Redisson是架设在Redis基础上,通讯基于Netty的综合的、新型的中间件,企业级开发中使用Redis的最佳范本
Jedis把Redis命令封装好,Lettuce则进一步有了更丰富的Api,也支持集群等模式。但是两者也都点到为止,只给了你操作Redis数据库的脚手架,而Redisson则是基于Redis、Lua和Netty建立起了成熟的分布式解决方案,甚至redis官方都推荐的一种工具集。
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
二、分布式锁
分布式锁怎么实现?
分布式锁是并发业务下的刚需,虽然实现五花八门:ZooKeeper有Znode顺序节点,数据库有表级锁和乐/悲观锁,Redis有setNx,但是殊途同归,最终还是要回到互斥上来,本篇介绍Redisson,那就以redis为例。
怎么写一个简单的Redis分布式锁?
以Spring Data Redis为例,用RedisTemplate来操作Redis(setIfAbsent已经是setNx + expire的合并命令),如下
// 加锁 public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) { return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit); } // 解锁,防止删错别人的锁,以uuid为value校验是否自己的锁 public void unlock(String lockName, String uuid) { if(uuid.equals(redisTemplate.opsForValue().get(lockName)){ redisTemplate.opsForValue().del(lockName); } } // 结构 if(tryLock){ // todo }finally{ unlock; }
简单1.0版本完成,聪明的小张一眼看出,这是锁没错,但get和del操作非原子性,并发一旦大了,无法保证进程安全。于是小张提议,用Lua脚本
Lua脚本是什么?
Lua脚本是redis已经内置的一种轻量小巧语言,其执行是通过redis的eval /evalsha 命令来运行,把操作封装成一个Lua脚本,如论如何都是一次执行的原子操作。
于是2.0版本通过Lua脚本删除
lockDel.lua如下
if redis.call('get', KEYS[1]) == ARGV[1] then -- 执行删除操作 return redis.call('del', KEYS[1]) else -- 不成功,返回0 return 0 end
delete操作时执行Lua命令
// 解锁脚本 DefaultRedisScript<Object> unlockScript = new DefaultRedisScript(); unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lockDel.lua"))); // 执行lua脚本解锁 redisTemplate.execute(unlockScript, Collections.singletonList(keyName), value);
2.0似乎更像一把锁,但好像又缺少了什么,小张一拍脑袋,synchronized和ReentrantLock都很丝滑,因为他们都是可重入锁,一个线程多次拿锁也不会死锁,我们需要可重入。
怎么保证可重入?
重入就是,同一个线程多次获取同一把锁是允许的,不会造成死锁,这一点synchronized偏向锁提供了很好的思路,synchronized的实现重入是在JVM层面,JAVA对象头MARK WORD中便藏有线程ID和计数器来对当前线程做重入判断,避免每次CAS。
当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储偏向的线程ID,以后该线程在进入和退出同步块时不需要进行CAS操作来加锁和解锁,只需简单测试一下对象头的Mark Word里是否存储着指向当前线程的偏向锁。如果测试成功,表示线程已经获得了锁。如果测试失败,则需要再测试一下Mark Word中偏向锁标志是否设置成1:没有则CAS竞争;设置了,则CAS将对象头偏向锁指向当前线程。
再维护一个计数器,同个线程进入则自增1,离开再减1,直到为0才能释放
可重入锁
仿造该方案,我们需改造Lua脚本:
1.需要存储 锁名称lockName 、获得该锁的线程id 和对应线程的进入次数count
2.加锁
每次线程获取锁时,判断是否已存在该锁
- 不存在
- 设置hash的key为线程id,value初始化为1
- 设置过期时间
- 返回获取锁成功true
- 存在
- 继续判断是否存在当前线程id的hash key
- 存在,线程key的value + 1,重入次数增加1,设置过期时间
- 不存在,返回加锁失败
3.解锁
每次线程来解锁时,判断是否已存在该锁
- 存在
- 是否有该线程的id的hash key,有则减1,无则返回解锁失败
- 减1后,判断剩余count是否为0,为0则说明不再需要这把锁,执行del命令删除
1.存储结构
为了方便维护这个对象,我们用Hash结构来存储这些字段。Redis的Hash类似Java的HashMap,适合存储对象。
hset lockname1 threadId 1
设置一个名字为lockname1 的hash结构,该hash结构key为threadId ,值value为1
hget lockname1 threadId
获取lockname1的threadId的值
存储结构为
lockname 锁名称 key1: threadId 唯一键,线程id value1: count 计数器,记录该线程获取锁的次数
redis中的结构
2.计数器的加减
当同一个线程获取同一把锁时,我们需要对对应线程的计数器count做加减
判断一个redis key是否存在,可以用exists
,而判断一个hash的key是否存在,可以用hexists
而redis也有hash自增的命令hincrby
每次自增1时 hincrby lockname1 threadId 1
,自减1时 hincrby lockname1
3.解锁的判断
当一把锁不再被需要了,每次解锁一次,count减1,直到为0时,执行删除
综合上述的存储结构和判断流程,加锁和解锁Lua如下
加锁 lock.lua
local key = KEYS[1]; local threadId = ARGV[1]; local releaseTime = ARGV[2]; -- lockname不存在 if(redis.call('exists', key) == 0) then redis.call('hset', key, threadId, '1'); redis.call('expire', key, releaseTime); return 1; end; -- 当前线程已id存在 if(redis.call('hexists', key, threadId) == 1) then redis.call('hincrby', key, threadId, '1'); redis.call('expire', key, releaseTime); return 1; end; return 0;
解锁 unlock.lua
local key = KEYS[1]; local threadId = ARGV[1]; -- lockname、threadId不存在 if (redis.call('hexists', key, threadId) == 0) then return nil; end; -- 计数器-1 local count = redis.call('hincrby', key, threadId, -1); -- 删除lock if (count == 0) then redis.call('del', key); return nil; end;
代码
/** * @description 原生redis实现分布式锁 **/ @Getter @Setter public class RedisLock { private RedisTemplate redisTemplate; private DefaultRedisScript<Long> lockScript; private DefaultRedisScript<Object> unlockScript; public RedisLock(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; // 加载加锁的脚本 lockScript = new DefaultRedisScript<>(); this.lockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lock.lua"))); this.lockScript.setResultType(Long.class); // 加载释放锁的脚本 unlockScript = new DefaultRedisScript<>(); this.unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua"))); } /** * 获取锁 */ public String tryLock(String lockName, long releaseTime) { // 存入的线程信息的前缀 String key = UUID.randomUUID().toString(); // 执行脚本 Long result = (Long) redisTemplate.execute( lockScript, Collections.singletonList(lockName), key + Thread.currentThread().getId(), releaseTime); if (result != null && result.intValue() == 1) { return key; } else { return null; } } /** * 解锁 * @param lockName * @param key */ public void unlock(String lockName, String key) { redisTemplate.execute(unlockScript, Collections.singletonList(lockName), key + Thread.currentThread().getId() ); } }
至此已经完成了一把分布式锁,符合互斥、可重入、防死锁的基本特点。
严谨的小张觉得虽然当个普通互斥锁,已经稳稳够用,可是业务里总是又很多特殊情况的,比如A进程在获取到锁的时候,因业务操作时间太长,锁释放了但是业务还在执行,而此刻B进程又可以正常拿到锁做业务操作,两个进程操作就会存在依旧有共享资源的问题 。
而且如果负责储存这个分布式锁的Redis节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态 。
小张不是杠精,因为库存操作总有这样那样的特殊。
所以我们希望在这种情况时,可以延长锁的releaseTime延迟释放锁来直到完成业务期望结果,这种不断延长锁过期时间来保证业务执行完成的操作就是锁续约。
读写分离也是常见,一个读多写少的业务为了性能,常常是有读锁和写锁的。
而此刻的扩展已经超出了一把简单轮子的复杂程度,光是处理续约,就够小张喝一壶,何况在性能(锁的最大等待时间)、优雅(无效锁申请)、重试(失败重试机制)等方面还要下功夫研究。
在小张苦思冥想时,旁边的小白凑过来看了看小张,很好奇,都2021年了,为什么不直接用redisson呢?
Redisson就有这把你要的锁。
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
三、Redisson分布式锁
号称简单的Redisson分布式锁的使用姿势是什么?
1.依赖
<!-- 原生,本章使用--> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.6</version> </dependency> <!-- 另一种Spring集成starter,本章未使用 --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.13.6</version> </dependency>
2.配置
@Configuration public class RedissionConfig { @Value("${spring.redis.host}") private String redisHost; @Value("${spring.redis.password}") private String password; private int port = 6379; @Bean public RedissonClient getRedisson() { Config config = new Config(); config.useSingleServer(). setAddress("redis://" + redisHost + ":" + port). setPassword(password); config.setCodec(new JsonJacksonCodec()); return Redisson.create(config); } }
3.启用分布式锁
@Resource private RedissonClient redissonClient; RLock rLock = redissonClient.getLock(lockName); try { boolean isLocked = rLock.tryLock(expireTime, TimeUnit.MILLISECONDS); if (isLocked) { // TODO } } catch (Exception e) { rLock.unlock(); }
简洁明了,只需要一个RLock,既然推荐Redisson,就往里面看看他是怎么实现的。