Redisson 作为分布式锁
官方文档:https://github.com/redisson/redisson/wiki
- 引入依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.11.1</version> </dependency>
- 2.配置redission
@Configuration public class MyRedissonConfig { /** * 所有对 Redisson 的使用都是通过 RedissonClient * * @return * @throws IOException */ @Bean(destroyMethod = "shutdown") public RedissonClient redisson() throws IOException { // 1、创建配置 Config config = new Config(); // Redis url should start with redis:// or rediss:// config.useSingleServer().setAddress("redis://192.168.163.131:6379"); // 2、根据 Config 创建出 RedissonClient 实例 return Redisson.create(config); } }
- 3.测试
@Autowired RedissonClient redissonClient; @Test public void redission() { System.out.println(redissonClient); }
- 4.使用
@ResponseBody @GetMapping("/hello") public String hello() { // 1. 获取一把锁 RLock lock = redisson.getLock("my-lock"); // 2. 加锁, 阻塞式等待 lock.lock(); try { System.out.println("加锁成功,执行业务..."); Thread.sleep(15000); } catch (Exception e) { } finally { // 3. 解锁 假设解锁代码没有运行,Redisson 会出现死锁吗?(不会) System.out.println("释放锁"+Thread.currentThread().getId()); lock.unlock(); } return "hello"; }
假设解锁代码没有运行,Redisson 会出现死锁吗?
不会
- 锁的自动续期,如果业务时间很长,运行期间自动给锁续期 30 s,不用担心业务时间过长,锁自动过期被删掉;
- 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动续期,默认也会在 30 s 后解锁
源码分析-Redission如何解决死锁
Ctrl+Alt查看方法实现
这是一个加锁方法,不传过期时间
public void lock() { try { //这里过期时间自动赋值成-1 this.lock(-1L, (TimeUnit)null, false); } catch (InterruptedException var2) { throw new IllegalStateException(); } }
然后会调用 this.lock(-1L, (TimeUnit)null, false)方法
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { //得到线程ID long threadId = Thread.currentThread().getId(); //通过线程ID获取到锁 Long ttl = this.tryAcquire(leaseTime, unit, threadId); //如果没有获取到锁 if (ttl != null) { RFuture<RedissonLockEntry> future = this.subscribe(threadId); this.commandExecutor.syncSubscription(future); try { while(true) { ttl = this.tryAcquire(leaseTime, unit, threadId); if (ttl == null) { return; } if (ttl >= 0L) { try { this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException var13) { if (interruptibly) { throw var13; } this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else if (interruptibly) { this.getEntry(threadId).getLatch().acquire(); } else { this.getEntry(threadId).getLatch().acquireUninterruptibly(); } } } finally { this.unsubscribe(future, threadId); } } }
获取锁方法
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId)); }
里面又调用了tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { //如果传了过期时间 if (leaseTime != -1L) { return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } //没有传过期时间 else { RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining == null) { this.scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } }
有指定过期时间走tryLockInnerAsync
方法,尝试用异步加锁
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { //先把时间转换成internalLockLeaseTime this.internalLockLeaseTime = unit.toMillis(leaseTime); //然后执行lua脚本 发给redis执行 return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)}); }
没有指定过期时间调用getLockWatchdogTimeout()方法,获取锁的默认看门狗时间,30秒
public long getLockWatchdogTimeout() { return this.lockWatchdogTimeout; } this.lockWatchdogTimeout = 30000L;
还是调用tryLockInnerAsync
给redis
发送命令,占锁成功返回一个以不变异步编排的RFuture
对象,来进行监听,里面有两个参数ttlRemaining, e
ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining == null) { this.scheduleExpirationRenewal(threadId); } } });
里面有个scheduleExpirationRenewal
方法
private void scheduleExpirationRenewal(long threadId) { RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry(); RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); //重新设置过期时间 this.renewExpiration(); } }
里面的关键方法renewExpiration
执行定时任务,
private void renewExpiration() { RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName()); if (ee != null) { //里面会执行一个定时任务 Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName()); if (ent != null) { Long threadId = ent.getFirstThreadId(); if (threadId != null) { RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e); } else { if (res) { RedissonLock.this.renewExpiration(); } } }); } } } //看门狗时间/3 10秒钟重试一次 }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); ee.setTimeout(task); } }
主要是来运行renewExpirationAsync
这个方法
protected RFuture<Boolean> renewExpirationAsync(long threadId) { return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)}); }
里面传入了一个internalLockLeaseTime
时间参数
又是获取看门狗时间
总结
- 如果传了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
- 如果未指定锁的超时时间,就是使用lockWatchdogTimeout的默认时间30秒,只要占锁成功就会启动一个定时任务【重新给所设置时间,新的过期时间就是
lockWatchdogTimeout
的默认时间】
最佳实践使用自定义过期时间,省掉了自动续期时间,自动加锁
读写锁测试
@GetMapping("/write") @ResponseBody public String writeValue() { RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock"); String s=""; RLock rLock=readWriteLock.writeLock(); try{ //加写锁 rLock.lock(); s= UUID.randomUUID().toString(); Thread.sleep(30000); redisTemplate.opsForValue().set("writeValue",s); }catch (Exception e){ e.printStackTrace(); } finally { rLock.unlock(); } return s; } @GetMapping("/read") @ResponseBody public String readValue() { RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock"); String s=""; //加读锁 RLock rLock=readWriteLock.readLock(); rLock.lock(); try{ s=redisTemplate.opsForValue().get("writeValue"); }catch (Exception e){ e.printStackTrace(); } finally { rLock.unlock(); } return s; }
写锁没释放读锁就必须等待,没有写锁读锁都可以读
保证数据的一致性,写锁是一个排他锁、互斥锁,读锁是共享锁。
读读共享、读写互斥、写写互斥、写读互斥,只要有写的存在都必须等待
信号量测试
像车库停车,每进来一辆车,车库减少一个车位,只有当车库还有车位才可以停车
@GetMapping("/park") @ResponseBody public String park() throws InterruptedException { RSemaphore park = redisson.getSemaphore("park"); //获取一个信号 占一个值 park.acquire(); return "ok"; } @GetMapping("/go") @ResponseBody public String go(){ RSemaphore park = redisson.getSemaphore("park"); //释放一个车位 park.release(); return "ok"; }
访问:
信号量可以用作分布式的限流
闭锁
只有等待所有活动都完成才发生,例如当所有班级放学走完才关闭学校大门
@GetMapping("/lockdoor") @ResponseBody public String lockDoor() throws InterruptedException { RCountDownLatch door = redisson.getCountDownLatch("door"); door.trySetCount(5); door.await();//等待闭锁都完成 return "放假啦...."; } @GetMapping("/gogo/{id}") @ResponseBody public String gogogo(@PathVariable("id") Long id) throws InterruptedException { RCountDownLatch door = redisson.getCountDownLatch("door"); door.countDown(); return id+"班都走了"; }
缓存一致性解决
在我们读缓存的时候可能会有数据被修改过,为了让我们能够读到最新的数据,有两种处理方法:
双写模式
在把数据写入数据库的时候,同时写入到缓存中
问题:在写的过程中,可能会在第一个线程缓存还没写进,但是第二个查询到缓存又开始写数据,读到的最新数据有延迟,导致产生脏数据
失效模式
在把数据写入数据更新的时候,把缓存删除,下次查询没有缓存再添加缓存
问题:在线程1更新数据的时候消耗大量时间,还没删缓存,线程2进来也没有缓存,读取到原来老的数据,然后更新缓存
我们系统的一致性解决方案:
1、缓存的所有数据都有过期时间,数据过期下一次查询触发主动更新
2、读写数据的时候,加上分布式的读写锁。
3、遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。