分布式锁Redission

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 分布式锁Redission

Redisson 作为分布式锁

官方文档:https://github.com/redisson/redisson/wiki

  1. 引入依赖
<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.11.1</version>
</dependency>
  1. 2.配置redission
@Configuration
public class MyRedissonConfig {
    /**
     * 所有对 Redisson 的使用都是通过 RedissonClient
     *
     * @return
     * @throws IOException
     */
    @Bean(destroyMethod = "shutdown")
    public RedissonClient redisson() throws IOException {
        // 1、创建配置
        Config config = new Config();
        // Redis url should start with redis:// or rediss://
        config.useSingleServer().setAddress("redis://192.168.163.131:6379");
        // 2、根据 Config 创建出 RedissonClient 实例
        return Redisson.create(config);
    }
}
  1. 3.测试
@Autowired
    RedissonClient redissonClient;
    @Test
    public  void redission()
    {
        System.out.println(redissonClient);
    }
  1. 4.使用
@ResponseBody
    @GetMapping("/hello")
    public  String hello()
    {
        // 1. 获取一把锁
        RLock lock = redisson.getLock("my-lock");
        // 2. 加锁, 阻塞式等待
        lock.lock();
        try {
            System.out.println("加锁成功,执行业务...");
            Thread.sleep(15000);
        } catch (Exception e) {
        } finally {
            // 3. 解锁 假设解锁代码没有运行,Redisson 会出现死锁吗?(不会)
            System.out.println("释放锁"+Thread.currentThread().getId());
            lock.unlock();
        }
        return "hello";
    }

假设解锁代码没有运行,Redisson 会出现死锁吗?

不会

  • 锁的自动续期,如果业务时间很长,运行期间自动给锁续期 30 s,不用担心业务时间过长,锁自动过期被删掉;
  • 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动续期,默认也会在 30 s 后解锁

源码分析-Redission如何解决死锁

Ctrl+Alt查看方法实现

1f3b090f6db2f15b3b3b1bbc3a153fe3_202110061204074.png

这是一个加锁方法,不传过期时间

public void lock() {
        try {
            //这里过期时间自动赋值成-1
            this.lock(-1L, (TimeUnit)null, false);
        } catch (InterruptedException var2) {
            throw new IllegalStateException();
        }
    }

然后会调用 this.lock(-1L, (TimeUnit)null, false)方法

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        //得到线程ID
        long threadId = Thread.currentThread().getId();
       //通过线程ID获取到锁
        Long ttl = this.tryAcquire(leaseTime, unit, threadId);
        //如果没有获取到锁
        if (ttl != null) {
            RFuture<RedissonLockEntry> future = this.subscribe(threadId);
            this.commandExecutor.syncSubscription(future);
            try {
                while(true) {
                    ttl = this.tryAcquire(leaseTime, unit, threadId);
                    if (ttl == null) {
                        return;
                    }
                    if (ttl >= 0L) {
                        try {
                            this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException var13) {
                            if (interruptibly) {
                                throw var13;
                            }
                            this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        }
                    } else if (interruptibly) {
                        this.getEntry(threadId).getLatch().acquire();
                    } else {
                        this.getEntry(threadId).getLatch().acquireUninterruptibly();
                    }
                }
            } finally {
                this.unsubscribe(future, threadId);
            }
        }
    }

获取锁方法

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
    }

里面又调用了tryAcquireAsync

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    //如果传了过期时间    
    if (leaseTime != -1L) {
            return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } 
    //没有传过期时间
    else {
            RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                if (e == null) {
                    if (ttlRemaining == null) {
                        this.scheduleExpirationRenewal(threadId);
                    }
                }
            });
            return ttlRemainingFuture;
        }
    }

有指定过期时间走tryLockInnerAsync方法,尝试用异步加锁

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        //先把时间转换成internalLockLeaseTime
        this.internalLockLeaseTime = unit.toMillis(leaseTime);
        //然后执行lua脚本 发给redis执行
        return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
    }

没有指定过期时间调用getLockWatchdogTimeout()方法,获取锁的默认看门狗时间,30秒

public long getLockWatchdogTimeout() {
    return this.lockWatchdogTimeout;
}
this.lockWatchdogTimeout = 30000L;

还是调用tryLockInnerAsyncredis发送命令,占锁成功返回一个以不变异步编排的RFuture对象,来进行监听,里面有两个参数ttlRemaining, e

ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                if (e == null) {
                    if (ttlRemaining == null) {
                        this.scheduleExpirationRenewal(threadId);
                    }
                }
            });

里面有个scheduleExpirationRenewal方法

private void scheduleExpirationRenewal(long threadId) {
        RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
        RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            //重新设置过期时间
            this.renewExpiration();
        }
    }

里面的关键方法renewExpiration执行定时任务,

private void renewExpiration() {
        RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
        if (ee != null) {
            //里面会执行一个定时任务
            Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                public void run(Timeout timeout) throws Exception {
                    RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
                    if (ent != null) {
                        Long threadId = ent.getFirstThreadId();
                        if (threadId != null) {
                            RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                            future.onComplete((res, e) -> {
                                if (e != null) {
                                    RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
                                } else {
                                    if (res) {
                                        RedissonLock.this.renewExpiration();
                                    }
                                }
                            });
                        }
                    }
                }
                //看门狗时间/3 10秒钟重试一次
            }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
            ee.setTimeout(task);
        }
    }

主要是来运行renewExpirationAsync这个方法

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
    }

里面传入了一个internalLockLeaseTime时间参数

da905ed2c52c38e48fe2283b62345ba9_202110061633585.png

又是获取看门狗时间

总结

  • 如果传了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
  • 如果未指定锁的超时时间,就是使用lockWatchdogTimeout的默认时间30秒,只要占锁成功就会启动一个定时任务【重新给所设置时间,新的过期时间就是lockWatchdogTimeout的默认时间】
    最佳实践使用自定义过期时间,省掉了自动续期时间,自动加锁

读写锁测试

@GetMapping("/write")
    @ResponseBody
    public  String writeValue()
    {
        RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
        String s="";
        RLock rLock=readWriteLock.writeLock();
        try{
            //加写锁
            rLock.lock();
            s= UUID.randomUUID().toString();
            Thread.sleep(30000);
            redisTemplate.opsForValue().set("writeValue",s);
        }catch (Exception e){
            e.printStackTrace();
        }
        finally {
            rLock.unlock();
        }
        return  s;
    }
    @GetMapping("/read")
    @ResponseBody
    public  String readValue()
    {
        RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
        String s="";
        //加读锁
        RLock rLock=readWriteLock.readLock();
        rLock.lock();
        try{
            s=redisTemplate.opsForValue().get("writeValue");
        }catch (Exception e){
            e.printStackTrace();
        }
        finally {
            rLock.unlock();
        }
        return  s;
    }

写锁没释放读锁就必须等待,没有写锁读锁都可以读

保证数据的一致性,写锁是一个排他锁、互斥锁,读锁是共享锁。

读读共享、读写互斥、写写互斥、写读互斥,只要有写的存在都必须等待

信号量测试

像车库停车,每进来一辆车,车库减少一个车位,只有当车库还有车位才可以停车

@GetMapping("/park")
    @ResponseBody
    public  String park() throws InterruptedException {
        RSemaphore park = redisson.getSemaphore("park");
        //获取一个信号 占一个值
        park.acquire();
        return  "ok";
    }
    @GetMapping("/go")
    @ResponseBody
    public  String go(){
        RSemaphore park = redisson.getSemaphore("park");
        //释放一个车位
        park.release();
        return  "ok";
    }

访问:

gulimall.com/park

gulimall.com/go

信号量可以用作分布式的限流

闭锁

只有等待所有活动都完成才发生,例如当所有班级放学走完才关闭学校大门

@GetMapping("/lockdoor")
    @ResponseBody
    public  String lockDoor() throws InterruptedException {
        RCountDownLatch door = redisson.getCountDownLatch("door");
        door.trySetCount(5);
        door.await();//等待闭锁都完成
        return  "放假啦....";
    }
    @GetMapping("/gogo/{id}")
    @ResponseBody
    public  String gogogo(@PathVariable("id") Long id) throws InterruptedException {
        RCountDownLatch door = redisson.getCountDownLatch("door");
        door.countDown();
        return  id+"班都走了";
    }

缓存一致性解决

在我们读缓存的时候可能会有数据被修改过,为了让我们能够读到最新的数据,有两种处理方法:

双写模式

在把数据写入数据库的时候,同时写入到缓存中

问题:在写的过程中,可能会在第一个线程缓存还没写进,但是第二个查询到缓存又开始写数据,读到的最新数据有延迟,导致产生脏数据

失效模式

在把数据写入数据更新的时候,把缓存删除,下次查询没有缓存再添加缓存

问题:在线程1更新数据的时候消耗大量时间,还没删缓存,线程2进来也没有缓存,读取到原来老的数据,然后更新缓存

我们系统的一致性解决方案

1、缓存的所有数据都有过期时间,数据过期下一次查询触发主动更新

2、读写数据的时候,加上分布式的读写锁

3、遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
存储 NoSQL Java
分布式锁中的王者方案 - Redission
分布式锁中的王者方案 - Redission
初识Redission分布式锁
在微服务系统中,某些场景需要阻塞所有节点的所有线程,对共享资源的访问。比如并发时“超卖”和“余额减为负数”等情况,需要对同一资源进行加锁,这些就需要进行分布式。
初识Redission分布式锁
|
4月前
|
NoSQL Java Redis
jedis 与 redission 实现分布式锁
jedis 与 redission 实现分布式锁
103 4
|
6月前
|
存储 NoSQL Java
分布式锁,Redission,其它实现问题讲解,以及面试题回答案例
分布式锁,Redission,其它实现问题讲解,以及面试题回答案例
242 1
|
6月前
|
缓存 NoSQL 数据库
关于高并发下缓存失效的问题(本地锁 && 分布式锁 && Redission 详解)
关于高并发下缓存失效的问题(本地锁 && 分布式锁 && Redission 详解)
258 0
|
Java Maven
Redission 实现分布式锁
Redission 实现分布式锁
142 1
|
人工智能 NoSQL Java
springboot 高级教程 如何优雅使用redission分布式锁
springboot 高级教程 如何优雅使用redission分布式锁
1160 0
|
存储 NoSQL Java
基于springboot+Redis的前后端分离项目之分布式锁-redission(五)-【黑马点评】
基于setnx实现的分布式锁存在下面的问题: 重入问题:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。
Zp
|
运维 NoSQL 前端开发
Redission分布式锁的使用和原理分析
Redission分布式锁的使用和原理分析
Zp
4683 1
Redission分布式锁的使用和原理分析
|
NoSQL 算法 Java
【Redis基础指南】分析一下Redission实现分布式锁的点点滴滴(含加解锁源码)
【Redis基础指南】分析一下Redission实现分布式锁的点点滴滴(含加解锁源码)
395 0
【Redis基础指南】分析一下Redission实现分布式锁的点点滴滴(含加解锁源码)

热门文章

最新文章

下一篇
无影云桌面