redis分布式锁redisson
redisson使用
@Configuration public class RedissonConfig { @Bean public Redisson redisson(){ Config config = new Config(); //单机版 config.useSingleServer().setAddress("redis://192.168.56.102:8001").setDatabase(0); //集群版 //config.useClusterServers().addNodeAddress(""); return (Redisson) Redisson.create(config); } }
//redisson使用 @Resource private Redisson redisson; public void test(){ String lockKey = "lock:product_1001"; RLock lock = redisson.getLock(lockKey); //加锁 lock.lock(); try{ //业务逻辑 }catch (Exception e){ }finally { //释放锁 lock.unlock(); } }
redisson原理
lock.lock();
- 底层会尝试去加锁,如果加锁失败,会睡眠,自旋加锁,直到获取到锁为止。
- 加锁成功后,会开启一个后台线程,进行锁续命操作。
redisson while循环加锁
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = this.tryAcquire(leaseTime, unit, threadId); if (ttl != null) { //订阅消息,当锁释放后,唤醒睡眠的线程 RFuture<RedissonLockEntry> future = this.subscribe(threadId); this.commandExecutor.syncSubscription(future); try { while(true) { ttl = this.tryAcquire(leaseTime, unit, threadId); //加锁成功会返回null if (ttl == null) { return; } //返回锁剩余的超时时间,按照这个时间去休眠 if (ttl >= 0L) { //根据信号量会阻塞ttl时间 //这里如果线程主动释放了锁?但是获取的ttl时间未到,那剩下的线程都会等着吗? //有休眠就会有唤醒逻辑。在unlock逻辑里面有发布订阅模式,当锁释放后,会发布消息。 this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { //重新去获取一次锁 this.getEntry(threadId).getLatch().acquire(); } } } finally { this.unsubscribe(future, threadId); } } }
redisson加锁源码
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) " + //如果当前key不存在 "then redis.call('hset', KEYS[1], ARGV[2], 1); " + //调用hset 设置 key value 1 这个1代表重入次数 "redis.call('pexpire', KEYS[1], ARGV[1]); " + //设置超时时间 "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) " + "then redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);" Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)}); //KEYS[1] 分布式锁的key的值 //ARGV[1] internalLockLeaseTime 的值, this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); //获取锁看门狗的超时时间,默认S为30秒 //ARGV[2] this.getLockName(threadId) /* String getLockName(long threadId) { * return this.id + ":" + threadId; //通过一个uuid : 当前线程id确定 */ }
redisson锁续命
当加锁成功后会立即执行 RedissonLock.this.scheduleExpirationRenewal(threadId);
private void scheduleExpirationRenewal(final long threadId) { if (!expirationRenewalMap.containsKey(this.getEntryName())) { Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { //判断主线程是否结束,也就是主线程加的那把锁是否还存在,如果还存在,则续命30秒。 RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)}); future.addListener(new FutureListener<Boolean>() { public void operationComplete(Future<Boolean> future) throws Exception { RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName()); if (!future.isSuccess()) { RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause()); } else { if ((Boolean)future.getNow()) { //自旋方式实现锁续命 RedissonLock.this.scheduleExpirationRenewal(threadId); } } } }); } }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) { task.cancel(); } } }
lock.unlock
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) " + "then redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) " + "then return nil;" + "end;" + " local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); return 1; " + "end; " + "return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)}); }