- 依赖包引入
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency>
- 配置信息
#redis集群连接配置 spring.redis.cluster.nodes=192.168.0.15:6379,192.168.0.15:6380,192.168.0.15:6381,192.168.0.15:6382,192.168.0.15:6383,192.168.0.15:6384 #redis spring.redis.cluster.max-redirects=6 spring.redis.jedis.pool.max-active=80 spring.redis.jedis.pool.max-idle=30 spring.redis.jedis.pool.max-wait=2000s spring.redis.jedis.pool.min-idle=10
- JedisCluster配置
/** * @author m7 * @date 2018年12月19日 **/ @Configuration public class RedisDistributeLockConfig { @Value("${spring.redis.cluster.nodes}") String redisNodes; @Bean //定义分布式锁对象 public RedisDistributeLock redisDistributeLock(JedisCluster jedisCluster){ return new RedisDistributeLock(jedisCluster); } @Bean //定义JedisCluster操作bean public JedisCluster jedisCluster(){ return new JedisCluster(pharseHostAnport()); } private Set<HostAndPort> pharseHostAnport(){ if (StringUtils.isEmpty(redisNodes)){ throw new RuntimeException("redis nodes can't be null or empty"); } String[] hps = redisNodes.split(","); Set<HostAndPort> hostAndPorts = new HashSet<>(); for (String hp : hps) { String[] hap = hp.split(":"); hostAndPorts.add(new HostAndPort(hap[0], Integer.parseInt(hap[1]))); } return hostAndPorts; } }
- 分布式锁实现
/** * JedisCluster + lua脚本实现分布式锁 * @author m7 * @date 2018年12月19日 **/ public class RedisDistributeLock { private Logger logger = LoggerFactory.getLogger(RedisDistributeLock.class); private JedisCluster jedisCluster; /** * lua脚本:判断锁住值是否为当前线程持有,是的话解锁,不是的话解锁失败 */ private static final String DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL = "if" + " redis.call('get', KEYS[1]) == ARGV[1]" + " then" + " return redis.call('del', KEYS[1])" + " else" + " return 0" + " end"; private volatile String unlockSha1 = ""; private static final Long UNLOCK_SUCCESS_CODE = 1L; private static final String LOCK_SUCCESS_CODE = "ok"; public RedisDistributeLock(JedisCluster jedisCluster) { this.jedisCluster = jedisCluster; } /** * 根据loopTryTime循环重试 * @param lockKey 锁key * @param lockVal 锁值,用于解锁校验 * @param expiryTime 锁过期时间 * @param loopTryTime 获取失败时,循环重试获取锁的时长 * @return 是否获得锁 */ public boolean tryLock(String lockKey, String lockVal, long expiryTime, long loopTryTime){ Long endTime = System.currentTimeMillis() + loopTryTime; while (System.currentTimeMillis() < endTime){ if (tryLock(lockKey, lockVal, expiryTime)){ return true; } } return false; } /** * 根据loopTryTime循环重试 * @param lockKey 锁key * @param lockVal 锁值,用于解锁校验 * @param expiryTime 锁过期时间 * @param retryTimes 重试次数 * @param setpTime 每次重试间隔 mills * @return 是否获得锁 */ public boolean tryLock(String lockKey, String lockVal, long expiryTime, int retryTimes, long setpTime){ while (retryTimes > 0){ if (tryLock(lockKey, lockVal, expiryTime)){ return true; } retryTimes--; try { Thread.sleep(setpTime); } catch (InterruptedException e) { logger.error("get distribute lock error" +e.getLocalizedMessage()); } } return false; } /** * 一次尝试,快速失败。不支持重入 * @param lockKey 锁key * @param lockVal 锁值,用于解锁校验 * @param expiryTime 锁过期时间 MILLS * @return 是否获得锁 */ public boolean tryLock(String lockKey, String lockVal, long expiryTime){ //相比一般的分布式锁,这里把setNx和setExpiry操作合并到一起,jedis保证原子性,避免连个命令之间出现宕机等问题 //这里也可以我们使用lua脚本实现 String result = jedisCluster.set(lockKey, lockVal, "NX", "PX", expiryTime); return LOCK_SUCCESS_CODE.equalsIgnoreCase(result); } /** * 释放分布式锁,释放失败最可能是业务执行时间长于lockKey过期时间,应当结合业务场景调整过期时间 * @param lockKey 锁key * @param lockVal 锁值 * @return 是否释放成功 */ public boolean tryUnLock(String lockKey, String lockVal){ List<String> keys = new ArrayList<>(); keys.add(lockKey); List<String> argv = new ArrayList<>(); argv.add(lockVal); try { Object result = jedisCluster.evalsha(unlockSha1, keys, argv); return UNLOCK_SUCCESS_CODE.equals(result); }catch (JedisNoScriptException e){ //没有脚本缓存时,重新发送缓存 logger.info("try to store script......"); storeScript(lockKey); Object result = jedisCluster.evalsha(unlockSha1, keys, argv); return UNLOCK_SUCCESS_CODE.equals(result); }catch (Exception e){ e.printStackTrace(); return false; } } /** * 由于使用redis集群,因此每个节点都需要各自缓存一份脚本数据 * @param slotKey 用来定位对应的slot的slotKey */ public void storeScript(String slotKey){ if (StringUtils.isEmpty(unlockSha1) || !jedisCluster.scriptExists(unlockSha1, slotKey)){ //redis支持脚本缓存,返回哈希码,后续可以继续用来调用脚本 unlockSha1 = jedisCluster.scriptLoad(DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL, slotKey); } } }
分析
- 加锁操作
比一般的redis分布式锁,这里操作jedis的操作方式进行加锁,好处就是Jedis保证set与设置有效期两个操作之间的原子性,避免在set值之后,程序宕机,导致没有设置过期时间,锁就一直被锁住。
这一步操作我们单独使用lua脚本实现也可以,但是幸好jedis已经帮我们进行实现。
/** * 一次尝试,快速失败。不支持重入 * @param lockKey 锁key * @param lockVal 锁值,用于解锁校验 * @param expiryTime 锁过期时间 MILLS * @return 是否获得锁 */ public boolean tryLock(String lockKey, String lockVal, long expiryTime){ //相比一般的分布式锁,这里把setNx和setExpiry操作合并到一起,jedis保证原子性,避免连个命令之间出现宕机等问题 //这里也可以我们使用lua脚本实现 //NX表示setNX操作,PX表示过期时间是mills String result = jedisCluster.set(lockKey, lockVal, "NX", "PX", expiryTime); return LOCK_SUCCESS_CODE.equalsIgnoreCase(result); }
同时加锁操作也有几个简单的重载实现,分别是重试获取和循环获取锁的重载,根据业务场景适当调整使用。
- 解锁操作
这里的分布式锁的解锁操作使用lua脚本帮助实现。
我们知道,分布式锁在解锁时一定需要验证是不是锁的持有者,这种情况下,我们需要进行的操作就有获取key的对应value,然后验证value的值,这个过程,存在一种情况,导致误删别的持有者的锁。分析如下的操作顺序图
上面的操作顺序可能出错的情况就是当lock1尝试释放时,先获取值,判断是否是锁的持有者,如果是,就再发指令删除锁。这个过程可能存在问题就是,lock1在获取值之后,刚好到了有效期了,那么锁可能会在此时被锁竞争者2获得,并且设置锁lock2,然而这时锁竞争者1删除锁的指令刚好重新发送到redis-server,就会误删lock2,导致后续会被其他锁竞争者3获取,发送不可知业务错误。
使用lua脚本的好处就是保证redis指令之间执行的原子性,把get和del执行放在脚本中,保证不会误删别的锁竞争者的锁,假如刚好出现get之后锁值过期,最多就是del操作结果为0,不会出现误删结果。
/** * 释放分布式锁,释放失败最可能是业务执行时间长于lockKey过期时间,应当结合业务场景调整过期时间 * @param lockKey 锁key * @param lockVal 锁值 * @return 是否释放成功 */ public boolean tryUnLock(String lockKey, String lockVal){ List<String> keys = new ArrayList<>(); keys.add(lockKey); List<String> argv = new ArrayList<>(); argv.add(lockVal); try { Object result = jedisCluster.evalsha(unlockSha1, keys, argv); return UNLOCK_SUCCESS_CODE.equals(result); }catch (JedisNoScriptException e){ //没有脚本缓存时,重新发送脚本并缓存 logger.info("try to store script......"); storeScript(lockKey); //重试获取 Object result = jedisCluster.evalsha(unlockSha1, keys, argv); return UNLOCK_SUCCESS_CODE.equals(result); }catch (Exception e){ e.printStackTrace(); return false; } }
- 解锁脚本
/** * lua脚本:判断锁住值是否为当前线程持有,是的话解锁,不是的话解锁失败 */ private static final String DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL = "if" + " redis.call('get', KEYS[1]) == ARGV[1]" + " then" + " return redis.call('del', KEYS[1])" + " else" + " return 0" + " end";
- lua脚本缓存
在redis集群中,为了避免重复发送脚本数据浪费网络资源,可以使用script load命令进行脚本数据缓存,并且返回一个哈希码作为脚本的调用句柄,每次调用脚本只需要发送哈希码来调用即可。
127.0.0.1:6381> script load "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end" "e9f69f2beb755be68b5e456ee2ce9aadfbc4ebf4"
- 上面是在redis-cli中缓存脚本的方式,在程序中,存储lua脚本的方式是如下所示。使用jedis可以很方便就完成脚本缓存,先判断脚本缓存是否存在,不存在就进行脚本数据缓存并且保存哈希码,以备接下来调用脚本。
注意:需要注意的是,在redis集群环境下,每个节点都需要进行一份脚本缓存,否则就会出现
NOSCRIPT No matching script. Please use EVAL.
- 错误,因此我在程序中加了处理。
/** * 由于使用redis集群,因此每个节点都需要各自缓存一份脚本数据 * @param slotKey 用来定位对应的slot的slotKey */ public void storeScript(String slotKey){ if (StringUtils.isEmpty(unlockSha1) || !jedisCluster.scriptExists(unlockSha1, slotKey)){ //redis支持脚本缓存,返回哈希码,后续可以继续用来调用脚本 unlockSha1 = jedisCluster.scriptLoad(DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL, slotKey); } }
- slotKey就是我们set值时的key,redis根据crc16函数 计算key应该对应哪一个slot,如果slot所在的redis节点没有缓存脚本数据就会报处
NOSCRIPT No matching script. Please use EVAL.
异常,因此当捕捉到这个异常时,我们在代码中重新发送脚本数据进行缓存即可。
/** * 释放分布式锁,释放失败最可能是业务执行时间长于lockKey过期时间,应当结合业务场景调整过期时间 * @param lockKey 锁key * @param lockVal 锁值 * @return 是否释放成功 */ public boolean tryUnLock(String lockKey, String lockVal){ List<String> keys = new ArrayList<>(); keys.add(lockKey); List<String> argv = new ArrayList<>(); argv.add(lockVal); try { Object result = jedisCluster.evalsha(unlockSha1, keys, argv); return UNLOCK_SUCCESS_CODE.equals(result); }catch (JedisNoScriptException e){ //没有脚本缓存时,重新发送脚本并缓存 //根据lockkey计算slot,在对应redis节点重新缓存一份脚本数据 logger.info("try to store script......"); storeScript(lockKey); //重试获取 Object result = jedisCluster.evalsha(unlockSha1, keys, argv); return UNLOCK_SUCCESS_CODE.equals(result); }catch (Exception e){ e.printStackTrace(); return false; } }
- 测试用例
@RunWith(SpringRunner.class) @SpringBootTest(classes = {ActivityServiceApplication.class}) @Slf4j public class ActivityServiceApplicationTests { @Resource private RedisDistributeLock redisDistributeLock; @Test public void testRedislock() throws InterruptedException { for(int i=0;i < 50;i++){ int finalI = i; new Thread(() ->{ if (redisDistributeLock.tryLock("TEST_LOCK_KEY", "TEST_LOCK_VAL_"+ finalI, 1000* 100, 1000*20)){ try { log.warn("get lock successfully with lock value:-----" + "TEST_LOCK_VAL_"+ finalI); Thread.sleep(2000); if (!redisDistributeLock.tryUnLock("TEST_LOCK_KEY", "TEST_LOCK_VAL_"+ finalI)){ throw new RuntimeException("release lock fail"); } log.warn("release lock successfully with lock value:-----" + "TEST_LOCK_VAL_"+ finalI); } catch (InterruptedException e) { e.printStackTrace(); } }else { log.warn("get lock fail with lock value:-----" + "TEST_LOCK_VAL_"+ finalI); } }).start(); } Thread.sleep(1000*1000); } }
设置50个线程尝试获取分布式锁,每个线程尝试时间为20秒;获取到锁的线程,sleep2秒,然后释放锁。
最终会出现,10个线程能够依次获得锁,40个线程获取锁超时失败。