redis的分布式锁实现
1.分布式锁介绍
在计算机系统中,锁作为一种控制并发的机制无处不在。
单机环境下,操作系统能够在进程或线程之间通过本地的锁来控制并发程序的行为。而在如今的大型复杂系统中,通常采用的是分布式架构提供服务。
分布式环境下,基于本地单机的锁无法控制分布式系统中分开部署客户端的并发行为,此时分布式锁就应运而生了。
一个可靠的分布式锁应该具备以下特性:
1.互斥性:作为锁,需要保证任何时刻只能有一个客户端(用户)持有锁
2.可重入: 同一个客户端在获得锁后,可以再次进行加锁
3.高可用:获取锁和释放锁的效率较高,不会出现单点故障
4.自动重试机制:当客户端加锁失败时,能够提供一种机制让客户端自动重试
2.分布式锁api接口
复制代码
/**
- 分布式锁 api接口
*/
public interface DistributeLock {
/**
* 尝试加锁
* @param lockKey 锁的key
* @return 加锁成功 返回uuid
* 加锁失败 返回null
* */
String lock(String lockKey);
/**
* 尝试加锁 (requestID相等 可重入)
* @param lockKey 锁的key
* @param expireTime 过期时间 单位:秒
* @return 加锁成功 返回uuid
* 加锁失败 返回null
* */
String lock(String lockKey, int expireTime);
/**
* 尝试加锁 (requestID相等 可重入)
* @param lockKey 锁的key
* @param requestID 用户ID
* @return 加锁成功 返回uuid
* 加锁失败 返回null
* */
String lock(String lockKey, String requestID);
/**
* 尝试加锁 (requestID相等 可重入)
* @param lockKey 锁的key
* @param requestID 用户ID
* @param expireTime 过期时间 单位:秒
* @return 加锁成功 返回uuid
* 加锁失败 返回null
* */
String lock(String lockKey, String requestID, int expireTime);
/**
* 尝试加锁,失败自动重试 会阻塞当前线程
* @param lockKey 锁的key
* @return 加锁成功 返回uuid
* 加锁失败 返回null
* */
String lockAndRetry(String lockKey);
/**
* 尝试加锁,失败自动重试 会阻塞当前线程 (requestID相等 可重入)
* @param lockKey 锁的key
* @param requestID 用户ID
* @return 加锁成功 返回uuid
* 加锁失败 返回null
* */
String lockAndRetry(String lockKey, String requestID);
/**
* 尝试加锁 (requestID相等 可重入)
* @param lockKey 锁的key
* @param expireTime 过期时间 单位:秒
* @return 加锁成功 返回uuid
* 加锁失败 返回null
* */
String lockAndRetry(String lockKey, int expireTime);
/**
* 尝试加锁 (requestID相等 可重入)
* @param lockKey 锁的key
* @param expireTime 过期时间 单位:秒
* @param retryCount 重试次数
* @return 加锁成功 返回uuid
* 加锁失败 返回null
* */
String lockAndRetry(String lockKey, int expireTime, int retryCount);
/**
* 尝试加锁 (requestID相等 可重入)
* @param lockKey 锁的key
* @param requestID 用户ID
* @param expireTime 过期时间 单位:秒
* @return 加锁成功 返回uuid
* 加锁失败 返回null
* */
String lockAndRetry(String lockKey, String requestID, int expireTime);
/**
* 尝试加锁 (requestID相等 可重入)
* @param lockKey 锁的key
* @param expireTime 过期时间 单位:秒
* @param requestID 用户ID
* @param retryCount 重试次数
* @return 加锁成功 返回uuid
* 加锁失败 返回null
* */
String lockAndRetry(String lockKey, String requestID, int expireTime, int retryCount);
/**
* 释放锁
* @param lockKey 锁的key
* @param requestID 用户ID
* @return true 释放自己所持有的锁 成功
* false 释放自己所持有的锁 失败
* */
boolean unLock(String lockKey, String requestID);
}
复制代码
3.基于redis的分布式锁的简单实现
3.1 基础代码
当前实现版本的分布式锁基于redis实现,使用的是jedis连接池来和redis进行交互,并将其封装为redisClient工具类(仅封装了demo所需的少数接口)
redisClient工具类:
View Code
所依赖的工具类:
View Code
初始化lua脚本 LuaScript.java:
在分布式锁初始化时,使用init方法读取lua脚本
View Code
单例的RedisDistributeLock基础属性
复制代码
public final class RedisDistributeLock implements DistributeLock {
/**
* 无限重试
* */
public static final int UN_LIMIT_RETRY = -1;
private RedisDistributeLock() {
LuaScript.init();
}
private static DistributeLock instance = new RedisDistributeLock();
/**
* 持有锁 成功标识
* */
private static final Long ADD_LOCK_SUCCESS = 1L;
/**
* 释放锁 失败标识
* */
private static final Integer RELEASE_LOCK_SUCCESS = 1;
/**
* 默认过期时间 单位:秒
* */
private static final int DEFAULT_EXPIRE_TIME_SECOND = 300;
/**
* 默认加锁重试时间 单位:毫秒
* */
private static final int DEFAULT_RETRY_FIXED_TIME = 3000;
/**
* 默认的加锁浮动时间区间 单位:毫秒
* */
private static final int DEFAULT_RETRY_TIME_RANGE = 1000;
/**
* 默认的加锁重试次数
* */
private static final int DEFAULT_RETRY_COUNT = 30;
/**
* lockCount Key前缀
* */
private static final String LOCK_COUNT_KEY_PREFIX = "lock_count:";
public static DistributeLock getInstance(){
return instance;
}
}
复制代码
3.2 加锁实现
使用redis实现分布式锁时,加锁操作必须是原子操作,否则多客户端并发操作时会导致各种各样的问题。详情请见:Redis分布式锁的正确实现方式。
由于我们实现的是可重入锁,加锁过程中需要判断客户端ID的正确与否。而redis原生的简单接口没法保证一系列逻辑的原子性执行,因此采用了lua脚本来实现加锁操作。lua脚本可以让redis在执行时将一连串的操作以原子化的方式执行。
加锁lua脚本 lock.lua
复制代码
-- 获取参数
local requestIDKey = KEYS[1]
local lockCountKey = KEYS[2]
local currentRequestID = ARGV[1]
local expireTimeTTL = ARGV[2]
-- setnx 尝试加锁
local lockSet = redis.call('setnx',requestIDKey,currentRequestID)
if lockSet == 1
then
-- 加锁成功 设置过期时间和重入次数
redis.call('expire',requestIDKey,expireTimeTTL)
redis.call('set',lockCountKey,1)
redis.call('expire',lockCountKey,expireTimeTTL)
return 1
else
-- 判断是否是重入加锁
local oldRequestID = redis.call('get',requestIDKey)
if currentRequestID == oldRequestID
then
-- 是重入加锁
redis.call('incr',lockCountKey)
-- 重置过期时间
redis.call('expire',requestIDKey,expireTimeTTL)
redis.call('expire',lockCountKey,expireTimeTTL)
return 1
else
-- requestID不一致,加锁失败
return 0
end
end
复制代码
加锁方法实现:
加锁时,通过判断eval的返回值来判断加锁是否成功。
复制代码
@Override
public String lock(String lockKey) {
String uuid = UUID.randomUUID().toString();
return lock(lockKey,uuid);
}
@Override
public String lock(String lockKey, int expireTime) {
String uuid = UUID.randomUUID().toString();
return lock(lockKey,uuid,expireTime);
}
@Override
public String lock(String lockKey, String requestID) {
return lock(lockKey,requestID,DEFAULT_EXPIRE_TIME_SECOND);
}
@Override
public String lock(String lockKey, String requestID, int expireTime) {
RedisClient redisClient = RedisClient.getInstance();
List<String> keyList = Arrays.asList(
lockKey,
LOCK_COUNT_KEY_PREFIX + lockKey
);
List<String> argsList = Arrays.asList(
requestID,
expireTime + ""
);
Long result = (Long)redisClient.eval(LuaScript.LOCK_SCRIPT, keyList, argsList);
if(result.equals(ADD_LOCK_SUCCESS)){
return requestID;
}else{
return null;
}
}
复制代码
3.3 解锁实现
解锁操作同样需要一连串的操作,由于原子化操作的需求,因此同样使用lua脚本实现解锁功能。
解锁lua脚本 unlock.lua
复制代码
-- 获取参数
local requestIDKey = KEYS[1]
local lockCountKey = KEYS[2]
local currentRequestID = ARGV[1]
-- 判断requestID一致性
if redis.call('get', requestIDKey) == currentRequestID
then
-- requestID相同,重入次数自减
local currentCount = redis.call('decr',lockCountKey)
if currentCount == 0
then
-- 重入次数为0,删除锁
redis.call('del',requestIDKey)
redis.call('del',lockCountKey)
return 1
else
return 0 end
else
return 0 end
复制代码
解锁方法实现:
复制代码
@Override
public boolean unLock(String lockKey, String requestID) {
List<String> keyList = Arrays.asList(
lockKey,
LOCK_COUNT_KEY_PREFIX + lockKey
);
List<String> argsList = Collections.singletonList(requestID);
Object result = RedisClient.getInstance().eval(LuaScript.UN_LOCK_SCRIPT, keyList, argsList);
// 释放锁成功
return RELEASE_LOCK_SUCCESS.equals(result);
}
复制代码
3.4 自动重试机制实现
调用lockAndRetry方法进行加锁时,如果加锁失败,则当前客户端线程会短暂的休眠一段时间,并进行重试。在重试了一定的次数后,会终止重试加锁操作,从而加锁失败。
需要注意的是,加锁失败之后的线程休眠时长是"固定值 + 随机值",引入随机值的主要目的是防止高并发时大量的客户端在几乎同一时间被唤醒并进行加锁重试,给redis服务器带来周期性的、不必要的瞬时压力。
复制代码
@Override
public String lockAndRetry(String lockKey) {
String uuid = UUID.randomUUID().toString();
return lockAndRetry(lockKey,uuid);
}
@Override
public String lockAndRetry(String lockKey, String requestID) {
return lockAndRetry(lockKey,requestID,DEFAULT_EXPIRE_TIME_SECOND);
}
@Override
public String lockAndRetry(String lockKey, int expireTime) {
String uuid = UUID.randomUUID().toString();
return lockAndRetry(lockKey,uuid,expireTime);
}
@Override
public String lockAndRetry(String lockKey, int expireTime, int retryCount) {
String uuid = UUID.randomUUID().toString();
return lockAndRetry(lockKey,uuid,expireTime,retryCount);
}
@Override
public String lockAndRetry(String lockKey, String requestID, int expireTime) {
return lockAndRetry(lockKey,requestID,expireTime,DEFAULT_RETRY_COUNT);
}
@Override
public String lockAndRetry(String lockKey, String requestID, int expireTime, int retryCount) {
if(retryCount <= 0){
// retryCount小于等于0 无限循环,一直尝试加锁
while(true){
String result = lock(lockKey,requestID,expireTime);
if(result != null){
return result;
}
// 休眠一会
sleepSomeTime();
}
}else{
// retryCount大于0 尝试指定次数后,退出
for(int i=0; i<retryCount; i++){
String result = lock(lockKey,requestID,expireTime);
if(result != null){
return result;
}
// 休眠一会
sleepSomeTime();
}
return null;
}
}
复制代码
4.使用注解切面简化redis分布式锁的使用
通过在方法上引入RedisLock注解切面,让对应方法被redis分布式锁管理起来,可以简化redis分布式锁的使用。
切面注解 RedisLock
复制代码
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLock {
/**
* 无限重试
* */
int UN_LIMIT_RETRY = RedisDistributeLock.UN_LIMIT_RETRY;
String lockKey();
int expireTime();
int retryCount();
}
复制代码
RedisLock 切面实现
复制代码
@Component
@Aspect
public class RedisLockAspect {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisLockAspect.class);
private static final ThreadLocal<String> REQUEST_ID_MAP = new ThreadLocal<>();
@Pointcut("@annotation(annotation.RedisLock)")
public void annotationPointcut() {
}
@Around("annotationPointcut()")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature methodSignature = (MethodSignature)joinPoint.getSignature();
Method method = methodSignature.getMethod();
RedisLock annotation = method.getAnnotation(RedisLock.class);
boolean lockSuccess = lock(annotation);
if(lockSuccess){
Object result = joinPoint.proceed();
unlock(annotation);
return result;
}
return null;
}
/**
* 加锁
* */
private boolean lock(RedisLock annotation){
DistributeLock distributeLock = RedisDistributeLock.getInstance();
int retryCount = annotation.retryCount();
String requestID = REQUEST_ID_MAP.get();
if(requestID != null){
// 当前线程 已经存在requestID
distributeLock.lockAndRetry(annotation.lockKey(),requestID,annotation.expireTime(),retryCount);
LOGGER.info("重入加锁成功 requestID=" + requestID);
return true;
}else{
// 当前线程 不存在requestID
String newRequestID = distributeLock.lockAndRetry(annotation.lockKey(),annotation.expireTime(),retryCount);
if(newRequestID != null){
// 加锁成功,设置新的requestID
REQUEST_ID_MAP.set(newRequestID);
LOGGER.info("加锁成功 newRequestID=" + newRequestID);
return true;
}else{
LOGGER.info("加锁失败,超过重试次数,直接返回 retryCount={}",retryCount);
return false;
}
}
}
/**
* 解锁
* */
private void unlock(RedisLock annotation){
DistributeLock distributeLock = RedisDistributeLock.getInstance();
String requestID = REQUEST_ID_MAP.get();
if(requestID != null){
// 解锁成功
boolean unLockSuccess = distributeLock.unLock(annotation.lockKey(),requestID);
if(unLockSuccess){
// 移除 ThreadLocal中的数据
REQUEST_ID_MAP.remove();
LOGGER.info("解锁成功 requestID=" + requestID);
}
}
}
}
复制代码
使用例子
复制代码
@Service("testService")
public class TestServiceImpl implements TestService {
@Override
@RedisLock(lockKey = "lockKey", expireTime = 100, retryCount = RedisLock.UN_LIMIT_RETRY)
public String method1() {
return "method1";
}
@Override
@RedisLock(lockKey = "lockKey", expireTime = 100, retryCount = 3)
public String method2() {
return "method2";
}
}
复制代码
5.总结
5.1 当前版本缺陷
主从同步可能导致锁的互斥性失效
在redis主从结构下,出于性能的考虑,redis采用的是主从异步复制的策略,这会导致短时间内主库和从库数据短暂的不一致。
试想,当某一客户端刚刚加锁完毕,redis主库还没有来得及和从库同步就挂了,之后从库中新选拔出的主库是没有对应锁记录的,这就可能导致多个客户端加锁成功,破坏了锁的互斥性。
休眠并反复尝试加锁效率较低
lockAndRetry方法在客户端线程加锁失败后,会休眠一段时间之后再进行重试。当锁的持有者持有锁的时间很长时,其它客户端会有大量无效的重试操作,造成系统资源的浪费。
进一步优化时,可以使用发布订阅的方式。这时加锁失败的客户端会监听锁被释放的信号,在锁真正被释放时才会进行新的加锁操作,从而避免不必要的轮询操作,以提高效率。
不是一个公平的锁
当前实现版本中,多个客户端同时对锁进行抢占时,是完全随机的,既不遵循先来后到的顺序,客户端之间也没有加锁的优先级区别。
后续优化时可以提供一个创建公平锁的接口,能指定加锁的优先级,内部使用一个优先级队列维护加锁客户端的顺序。公平锁虽然效率稍低,但在一些场景能更好的控制并发行为。
5.2 经验总结
前段时间看了一篇关于redis分布式锁的技术文章,发现自己对于分布式锁的了解还很有限。纸上得来终觉浅,为了更好的掌握相关知识,决定尝试着自己实现一个demo级别的redis分布式锁,通过这次实践,更进一步的学习了lua语言和redis相关内容。
这篇博客的完整代码在我的github上:https://github.com/1399852153/RedisDistributedLock,存在许多不足之处,请多多指教。
原文地址https://www.cnblogs.com/xiaoxiongcanguan/p/10718202.html