从零开始学Redis之分布式锁

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 前言文本已收录至我的GitHub仓库,欢迎Star:github.com/bin39232820…种一棵树最好的时间是十年前,其次是现在

絮叨


其实前面我去年11月份,把Redis的很多基础知识写了一遍,虽然说没有啥深度,但是入门总是可以的,最近我去面试,竟然被问到了,还是自己平时没有用心准备,痛定思痛,我就得把Redis 分布式锁弄清楚点。所以有了今天的文章

🔥从零开始学Redis之金刚凡境

🔥从零开始学Redis之自在地境

🔥从零开始学Redis之逍遥天境

🔥从零开始学Redis之半步神游

🔥从零开始学Redis之神游玄境


引题


一般面试的时候,问完一些Redis的问题之后,有时候可能会问你分布式锁是怎么实现的,然后大家可能就搭用它的 set NX EX命令 然后用lua脚本做成一个原子性操作来实现分布式锁。其实这么搭也可以吧,然后我们一般在生产环境的话,可能会用一些开源框架,你不如说Redisson来实现分布式锁,那可能就会问到你Redisson是怎么实现的

我们先来看看下面几个简单的代码

RLock lock = redisson.getLock(LOCKER_PREFIX + resourceName);
 success = lock.tryLock(100, lockTime, TimeUnit.SECONDS);
lock.unlock();
复制代码


以上简单的代码就能实现分布式锁 是不是很简单,并且还支持分布式 主从 哨兵等模式


源码解析


通过 getLock 方法获取锁对象

@Override
    public RLock getLock(String name) {
/**
*  构造并返回一个 RedissonLock 对象
* commandExecutor: 与 Redis 节点通信并发送指令的真正实现。需要说明一下,CommandExecutor 实现是通过 eval 命令来执行 Lua 脚本
* name: 锁的全局名称
* id: Redisson 客户端唯一标识,实际上就是一个 UUID.randomUUID()
*/
        return new RedissonLock(connectionManager.getCommandExecutor(), name);
    }
复制代码


通过tryLock方法尝试获取锁


@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        //取得最大等待时间
        long time = unit.toMillis(waitTime);
        //记录下当前时间
        long current = System.currentTimeMillis();
        //取得当前线程id(判断是否可重入锁的关键)
        final long threadId = Thread.currentThread().getId();
        //1.尝试申请锁,返回还剩余的锁过期时间
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        //2.如果返回的锁的过期时间为空,表示申请锁成功
        if (ttl == null) {
            return true;
        }
        //3.申请锁的耗时如果大于等于最大等待时间,则申请锁失败
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        //订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的
        //问题:基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel  
        //订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争
        current = System.currentTimeMillis();
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        //await 方法内部是用CountDownLatch来实现阻塞,
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }
        try {
                //计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        //收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁
        //获取锁成功,则立马返回true
        //若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回false结束循环
            while (true) {
                long currentTime = System.currentTimeMillis();
                 // 再次尝试申请锁
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
               // 成功获取锁则直接返回true结束循环
                if (ttl == null) {
                    return true;
                }
                //超过最大等待时间则返回false结束循环,获取锁失败
                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                 //如果剩余时间(ttl)小于wait time ,就时间内,
                 //从Entry的信号量获取一个许可(除非被中断或用的许可)。
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                 //则就在wait time 时间范围内等待可以通过信号量
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
             //7.更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间)
                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
          //7.无论是否获得锁,都要取消订阅解锁消息
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }
复制代码


加锁的流程图


<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
// * 通过 EVAL 命令执行 Lua 脚本获取锁,保证了原子性
        internalLockLeaseTime = unit.toMillis(leaseTime);
   // 1.如果缓存中的key不存在,则执行 hset 命令(hset key UUID+threadId 1),然后通过 pexpire 命令设置锁的过期时间(即锁的租约时间)
   / 返回空值 nil ,表示获取锁成功
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                   // 如果key已经存在,并且value也匹配,表示是当前线程持有的锁,则执行 hincrby 命令,重入次数加1,并且设置失效时间
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                   //如果key已经存在,但是value不匹配,说明锁已经被其他线程持有,通过 pttl 命令获取锁的剩余存活时间并返回,至此获取锁失败
                  "return redis.call('pttl', KEYS[1]);",
                  //这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
复制代码


LUA脚本

  • KEYS[1]就是Collections.singletonList(getName()),表示分布式锁的key;
  • ARGV[1]就是internalLockLeaseTime,即锁的租约时间(持有锁的有效时间),默认30s;
  • ARGV[2]就是getLockName(threadId),是获取锁时set的唯一值 value,即UUID+threadId-


总结一下加锁的流程

  • 第一步先尝试去加锁,返回过期时间,如果为空则可以获得锁 (返回获取锁成功)(,在lua脚本里面会判断你的key和value是不是已经持有锁了,如果是,就是给你重试次数加,然后获取锁也是失败)
  • 如果第一次加锁失败之后,就会去判断你最大等待时间,如果走到这的时候已经超过最大等待时间(直接返回获取锁失败,)
  • 接下来就是说我要去订阅redis解锁这个事件,一旦有人把锁释放就会继续通知所有的线程去竞争锁
  • 然后是一个死循环的去获取锁,当时每次执行这个循环的时候,每次去获取锁之前都要去判断当前是否已经超过最大的等待时间,如果超过了就直接释放锁。只有当获得锁,或者是最大的等待时间超过之后才会返回是否成功获取锁的标志。
  • 通过 Redisson 实现分布式可重入锁,比纯自己通过set key value px milliseconds nx +lua 实现(实现一)的效果更好些,虽然基本原理都一样,因为通过分析源码可知,RedissonLock 是可重入的,并且考虑了失败重试,可以设置锁的最大等待时间, 在实现上也做了一些优化,减少了无效的锁申请,提升了资源的利用率。


一个简单的小案例 业务代码

@Autowired
private RedissonClient redissonClient;
//商品秒杀核心业务逻辑的处理-redisson的分布式锁
@Override
public Boolean killItemV4(Integer killId, Integer userId) throws Exception {
Boolean result=false;
     final String lockKey=new StringBuffer().append(killId).append(userId).append("-RedissonLock").toString();
    RLock lock=redissonClient.getLock(lockKey);
     try {
       //TODO:第一个参数30s=表示尝试获取分布式锁,并且最大的等待获取锁的时间为30s
        //TODO:第二个参数10s=表示上锁之后,10s内操作完毕将自动释放锁
        Boolean cacheRes=lock.tryLock(30,10,TimeUnit.SECONDS);
        if (cacheRes){
            //TODO:核心业务逻辑的处理
            if (itemKillSuccessMapper.countByKillUserId(killId,userId) <= 0){
                ItemKill itemKill=itemKillMapper.selectByIdV2(killId);
                if (itemKill!=null && 1==itemKill.getCanKill() && itemKill.getTotal()>0){
                    int res=itemKillMapper.updateKillItemV2(killId);
                    if (res>0){
                        commonRecordKillSuccessInfo(itemKill,userId);                         result=true;
                    }
                }
            }else{
                throw new Exception("redisson-您已经抢购过该商品了!");
            }
        }
}finally {
        //TODO:释放锁
        lock.unlock();
}
return result;
}
复制代码


redis的zSet

工作中其实也有用到这个数据结构,如果说用它来做一些用户排名统计啥的,但是呢,当面试官问到跳表的时候,我竟然没听过,这不得好好补补


底层数据结构

有序集合对象的编码可以是ziplist或者skiplist。同时满足以下条件时使用ziplist编码:

元素数量小于128个 所有member的长度都小于64字节 以上两个条件的上限值可通过zset-max-ziplist-entries和zset-max-ziplist-value来修改。

ziplist编码的有序集合使用紧挨在一起的压缩列表节点来保存,第一个节点保存member,第二个保存score。ziplist内的集合元素按score从小到大排序,score较小的排在表头位置。

skiplist编码的有序集合底层是一个命名为zset的结构体,而一个zset结构同时包含一个字典和一个跳跃表。跳跃表按score从小到大保存所有集合元素。而字典则保存着从member到score的映射,这样就可以用O(1)的复杂度来查找member对应的score值。虽然同时使用两种结构,但它们会通过指针来共享相同元素的member和score,因此不会浪费额外的内存。


skiplist介绍

跳表(skip List)是一种随机化的数据结构,基于并联的链表,实现简单,插入、删除、查找的复杂度均为O(logN)。简单说来跳表也是链表的一种,只不过它在链表的基础上增加了跳跃功能,正是这个跳跃的功能,使得在查找元素时,跳表能够提供O(logN)的时间复杂度。

先来看一个有序链表,如下图(最左侧的灰色节点表示一个空的头结点):


在这样一个链表中,如果我们要查找某个数据,那么需要从头开始逐个进行比较,直到找到包含数据的那个节点,或者找到第一个比给定数据大的节点为止(没找到)。也就是说,时间复杂度为O(n)。同样,当我们要插入新数据的时候,也要经历同样的查找过程,从而确定插入位置。

假如我们每相邻两个节点增加一个指针,让指针指向下下个节点,如下图:


这样所有新增加的指针连成了一个新的链表,但它包含的节点个数只有原来的一半(上图中是7, 19, 26)。现在当我们想查找数据的时候,可以先沿着这个新链表进行查找。

当碰到比待查数据大的节点时,再回到原来的链表中进行查找。比如,我们想查找23,查找的路径是沿着下图中标红的指针所指向的方向进行的:


  • 23首先和7比较,再和19比较,比它们都大,继续向后比较。
  • 但23和26比较的时候,比26要小,因此回到下面的链表(原链表),与22比较。 23比22要大,沿下面的指针继续向后和26比较。23比26小,说明待查数据23在原链表中不存在,而且它-的插入位置应该在22和26之间。

在这个查找过程中,由于新增加的指针,我们不再需要与链表中每个节点逐个进行比较了。需要比较的节点数大概只有原来的一半。

利用同样的方式,我们可以在上层新产生的链表上,继续为每相邻的两个节点增加一个指针,从而产生第三层链表。如下图:


在这个新的三层链表结构上,如果我们还是查找23,那么沿着最上层链表首先要比较的是19,发现23比19大,接下来我们就知道只需要到19的后面去继续查找,从而一下子跳过了19前面的所有节点。可以想象,当链表足够长的时候,这种多层链表的查找方式能让我们跳过很多下层节点,大大加快查找的速度。

skiplist正是受这种多层链表的想法的启发而设计出来的。实际上,按照上面生成链表的方式,上面每一层链表的节点个数,是下面一层的节点个数的一半,这样查找过程就非常类似于一个二分查找,使得查找的时间复杂度可以降低到O(log n)。但是,这种方法在插入数据的时候有很大的问题。新插入一个节点之后,就会打乱上下相邻两层链表上节点个数严格的2:1的对应关系。如果要维持这种对应关系,就必须把新插入的节点后面的所有节点(也包括新插入的节点)重新进行调整,这会让时间复杂度重新蜕化成O(n)。删除数据也有同样的问题。


结尾


算是对redis知识的小小补充吧。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
27天前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
54 16
|
2月前
|
缓存 NoSQL Java
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
64 3
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
|
2月前
|
NoSQL Redis 数据库
计数器 分布式锁 redis实现
【10月更文挑战第5天】
51 1
|
2月前
|
NoSQL 算法 关系型数据库
Redis分布式锁
【10月更文挑战第1天】分布式锁用于在多进程环境中保护共享资源,防止并发冲突。通常借助外部系统如Redis或Zookeeper实现。通过`SETNX`命令加锁,并设置过期时间防止死锁。为避免误删他人锁,加锁时附带唯一标识,解锁前验证。面对锁提前过期的问题,可使用守护线程自动续期。在Redis集群中,需考虑主从同步延迟导致的锁丢失问题,Redlock算法可提高锁的可靠性。
83 4
|
2月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
57 3
|
2月前
|
缓存 NoSQL 算法
面试题:Redis如何实现分布式锁!
面试题:Redis如何实现分布式锁!
|
机器学习/深度学习 缓存 NoSQL
|
缓存 NoSQL Java
为什么分布式一定要有redis?
1、为什么使用redis 分析:博主觉得在项目中使用redis,主要是从两个角度去考虑:性能和并发。当然,redis还具备可以做分布式锁等其他功能,但是如果只是为了分布式锁这些其他功能,完全还有其他中间件(如zookpeer等)代替,并不是非要使用redis。
1367 0
|
2月前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
78 6