之前看很多人手写分布式锁,其实 Spring Boot 现在已经做的足够好了,开箱即用,支持主流的 Redis、Zookeeper 中间件,另外还支持 JDBC。
本篇栈长以 Redis 为例(这也是用得最多的方案),教大家如何利用 Spring Boot 集成 Redis 实现缓存,如何简单、快速实现 Redis 分布式锁。
分布式锁介绍
Spring Boot 实现 Redis 分布式锁在 spring-integration 这个项目中,参考:
首先来看下 LockRegistry 锁注册接口的所有实现类结构图:
DefaultLockRegistry 就是纯单机的可重入锁,PassThruLockRegistry 是一个空实现类,也都没有什么利用价值。
Spring Integration 4.0 引入了基于 Redis 的分布式锁:RedisLockRegistry,并且从 5.0 开始实现了 ExpirableLockRegistry 接口,用来移除超时且没有用的锁。
分布式锁实战
添加依赖
上面提到 Spring Boot 实现 Redis 分布式锁在 spring-integration 这个项目中,所以需要这三个依赖:
spring-boot-starter-data-redis
spring-boot-starter-integration
spring-integration-redis
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency>
Spring Boot 基础知识就不介绍了,不熟悉的可以关注公众号Java技术栈,在后台回复:boot,可以阅读我写的历史实战教程。
配置分布式锁
@Bean(destroyMethod = "destroy") public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) { return new RedisLockRegistry(redisConnectionFactory, "lock"); }
使用示例
@GetMapping("/redis/lock") public String lock(@RequestParam("key") String key) { for (int i = 0; i < 10; i++) { new Thread(() -> { redisLockService.lock(key); try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")); redisLockService.unlock(key); } ).start(); } return "OK"; }
RedisLockService 是我封装了的一个 Redis 锁服务,代码有点多,这里就不贴了,完整的代码示例在 Github 上,大家可以 Star 一下:
https://github.com/javastacks/spring-boot-best-practice
测试:
http://localhost:8080/redis/lock?key=yeah
输出:
2020-06-23 11:15:34 2020-06-23 11:15:37 2020-06-23 11:15:40 2020-06-23 11:15:43 2020-06-23 11:15:46 2020-06-23 11:15:49 2020-06-23 11:15:52 2020-06-23 11:15:55 2020-06-23 11:15:58 2020-06-23 11:16:01
可以看到每个线程需要等上一个线程休眠 3 秒后才能获取到锁。
源码分析
集成完了,会使用了,还得研究下 RedisLockRegistry
的源码,不然遇到什么坑还得再踩一篇。
RedisLockRegistry
有两个类构造器:connectionFactory:Redis 连接工厂
registryKey:锁前缀
expireAfter:失效时间(非必须项,默认60秒)
所以,我们要注册 expireAfter 这个选项,默认 60 秒是否满足业务需要,如果超过默认的 60 少时间,否则将导致锁失效。
还有两个和 RedisLockRegistry 相关且很重要的成员变量:
private final String clientId = UUID.randomUUID().toString(); private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();
- clientId:
首先用来标识当前 RedisLockRegistry
实例ID,并且在设置、移除锁的时候都会要用到,用来判断是不是当前的锁注册实例。
- locks:
用来在内存中缓存当前锁注册实例所有锁对象。
获取锁对象
如下面获取锁对象源码所示:
每个 key 在内存中(ConcurrentHashMap)都对应一个锁对象,锁对象有生成过就直接返回,没有就生成再返回,有了这个锁对象才能进行上锁和解锁操作。
这个锁对象(RedisLock)其实也是实现了 Java 中的 java.util.concurrent.locks.Lock 锁接口:
锁对象(RedisLock)也有两个很重要的成员变量:
private final ReentrantLock localLock = new ReentrantLock(); private volatile long lockedAt;
localLock:
localLock 是一个本地内存可重入锁,每次去 Redis 上锁前,都要用本地 localLock 上锁先,这样能做到尽可能的少往 Redis 上锁,也能从一方面提升锁的性能。
lockedAt:
lockedAt 上锁时间,移除过时锁会用到。
阻塞上锁
RedisLock#lock():
每隔 100 毫秒尝试获取一次锁,直到获取锁成功为止,不接受打断异常,遇到其他异常会释放本地锁返回异常并结束。
主要看下设置 Redis 锁的 Lua 脚本:
根据 key 查询其对应的值:clientId,如果和当前 clientId 一致则延长失效时间,如果 clientId 不存在就直接上锁,以上都不成立返回 false。
这样做的好处是,可以将整个 Redis Lua 脚本作为一个原子执行,而不用考虑并发及事务影响。
好了,核心的源码分析完了,其实也很简单,大家还不懂的或者有兴趣的可以再研究下。