Redis分布式锁深入分析

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 其实这里收获最大的,应该是自旋锁,虽然说在最后使用了发布订阅来完成异步唤醒,但还是有一些缺点,比如这个锁是否公平,如果说,这里要让你实现公平锁,读者你又打算如何解决?想到这里,我又想到了Reactor模型,其实我们可以做一个BossGroup来存放一下阻塞线程ID,其实就是一个阻塞队列再用一个WorkerGroup来对每个ThreadID进行处理,当然这里提供的是一个思路,如果要完成的话,相当于是写一个小型中间件,也挺有意思,后面打算试一试,hhh。

Redis分布式锁深入分析

有关分布式锁的制作我在之前的文章已经提过,感兴趣的可以看一下

针对RedisTemplate实现WatchDog – Karos (wzl1.top)

但是这个方法仍然有问题存在,下面我们来聊一聊

注意,这篇文章本质上只是讨论分布式锁的问题,如果要看源代码分析,可以看看这位大佬的代码,我在这里引入代码只是想要说明一些解决方案:分布式锁 | Joseph's Blog (gitee.io)

最原始的Redis分布式锁

最开始大家刚学分布式锁的时候,用的是这个指令,

setnx key value

然后使用expire给他设置过期时间

看似没有问题

image-20230616021643706

难道真的没问题吗?

试想一下,在高并发下,redis出现了雪崩,那么你设置了setnx,但是在设置expire之前崩了,呃呃呃~

没错,要解决这个问题,得实现原子性,原子性,我们在MySQL里面通过学习了事务来解决

image-20230616022013037

那么redis,能不能类似实现事务呢?其实redis本身是有事务的,但是这种简单的语句,用Lua也行(没错,就是你打游戏开脚本哪个)

但是在这里我们不讲Lua,主要说一下思想,其实就是通过lua将两个原子语句封装在一起,再发送给redis服务器进行执行

lua-redis快速入门直接看最后

这个分布式锁实现过于简单,就不在这里说了,hhh~

Redis官方针对SETNX的改动

其实Redis官方在后面也看出了SETNX的缺点,所以他在2.6.12版本开始,加入了一个新的指令

set key value EX|PX nx|ex

EX|PX是expire和pexpire,nx是不存在则执行,ex是存在则执行

简单说下,然后RedisTemplate.setValue().setIfAbsent()方法也进行了重写

这样就保证了原子性,这个方法在我之前的文章里面也用过。

并且我参考Redisson的思想制作了分布式锁看门狗机制

当时其实是在想续期问题如何解决,解决之后就感觉自己

image-20230616022948257

直到最近,群佬看博客,指出了一个问题,这个方法是否可重入呢?

image-20230616023100917

说到这里,你可能对可重入有点迷惑,那么现在,我们来介绍一下可重入锁

可重入锁

什么是可重入锁?

来看看介绍吧。

可重入就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁。

简单的例子,这里我用伪代码来解释

syn(this){
   
   
    sout("加锁成功A")
    syn(this){
   
   
        sout("加锁成功B")
    }
}

那么这里,我们会发生什么呢?

按照没接触可重入锁的情况或者没有这样试过的情况来说,执行完 sout("加锁成功A")后便会产生死锁问题

而可重入锁,就是说,在此时,你依然可以进入并执行sout("加锁成功B")

那么应用场景?

最容易想到的是递归调用,但是还有其他的业务方面可以说一说,

比如你要调用业务方法A,业务A中有操纵了要上锁业务B,同时业务A又需要全局上锁,那么这个地方就需要可重入了

基于Redis-Hash的可重入锁实现

在Redisson中,采用的是hash进行锁的存储,然后对hash设置一个过期时间

大概的数据结构是这样的

image-20230616025322273

hashname为key,hashkey为thread1,value是锁的重入次数

但是这里我要提一点,这里的thread1,可不仅仅是threadId,使用分布式锁通常是在分布式、微服务i项目下,不同的服务中也有可能出现线程ID相同的问题,所以这里加一个服务名,其实生成个UUID就可以了

大概的格式就是这样:

mylock:HEX(uuid+theadId):num

但是还有个设置过期时间的问题,如何设置?

我这里跟着我之前的帖子来讲,在那里我是使用的RedisTemplate来实现分布式锁+看门狗机制

但是没有考虑可重入的问题,那么我这次就加上

我们要加过期时间,同时又要确保原子性,那么就用Lua

加锁

对于加锁的Lua如下

    -- 如果Lock存在
    if redis.call("exists",KEYS[1]) ~= 0 then
        -- 如果不是自己的锁
        if redis.call("exists",KEYS[1],ARGS[1]) == 0 then
            -- 不是自己的锁,返回剩余时间
            return redis.call("pttl",KEYS[1]);
        end
        -- 如果是自己的锁就记录次数
        redis.call("hincrby",KEYS[1],ARGS[1],1);
        -- 延期
        redis.call("pexpire",KEYS[1],ARGS[2]);
    else
        redis.call("hset",KEYS[1],ARGS[1],1);
        -- 设置默认延期
        redis.call("pexpire",KEYS[1],ARGS[2]);
    end
    -- 如果Lock不存在,那么就直接加上就可以了,hhh
    return nil;

这里解释一下KEY和ARG,key是hash名,args是指命令携带参数

key1:索命

args1:服务线程唯一ID

args2:过期时间

然后在代码里面的实现

image-20230616025942863

解锁

解锁也差不多

--    解锁的逻辑和加锁相似
    -- 如果Lock存在
    if redis.call("exists",KEYS[1]) ~= 0 then
        -- 如果是自己的锁
        if redis.call("hexists",KEYS[1],ARGS[1]) ~= 0 then
            -- 如果是最后一层 直接delete
            if redis.call("hget",KEYS[1],ARGS[1]) == 0 then
                redis.call("del",KEYs[1]);
                a=0
            else
            -- 如果不是,那么久锁层数减一
                a=redis.call("hincrby",KEYS[1],ARGS[1],-1);
            end
        end
        return a;
    end
    -- 如果Lock不存在,那么就return,hhh
    return nil;

image-20230616030009479

续期的话本来就是一条语句,不变就可以了

然后我和之前的代码相比,自旋锁改了一下,hhh

image-20230616030117535

看门狗机制实现

之前其实已经实现过,这里就再来看看吧,这里我为了方便一点,用的Hutool来演示,但是实际用的时候还是用Netty等框架比较好,毕竟Redission也是用的Netty

image-20230616030317267

目前还存在的问题+Reddisson源码分析 —— 自旋锁

没错,别以为这样就完了,细心的话会发现我上面的代码里面,写的是最暴力的自旋锁(图一个方便,hhh)

如果说一直循环下去,那么无疑是非常浪费CPU的

站着茅坑不拉屎是我的错,但是看着别人蹲上了,我心里又特别难过表情包图片gif动图 - 求表情网,斗图从此不求人!

那么如何解决?

解决方案

细心的同学已经发现了,在我加锁失败的时候,会返回一个ttl,也就是当前key还有多久失效

那么我们是不是可以在while里面是指一个阻塞,然后等过了这么久再唤醒线程就可以了?

没错,Reddisson底层也是这样实现的,基于Redis发布订阅,但是这里我给大家简单引个路子

你可以理解为把阻塞的线程ID放进一个阻塞队列里面,而我们的服务器就去订阅这个队列,其实这个队列在Redis里面叫做Channel,感兴趣的可以去看看。

那么是如何订阅的呢?

其实在源代码中,Redisson是放了一个“消息检测器”来进行监听

下面来看看Redisson加锁的代码

阻塞加锁源码 lock()

//阻塞加锁
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
   
   
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
   
   
            return;        //这里拿到锁了
        }
        CompletableFuture<RedissonLockEntry> future = subscribe(threadId);    //对当前线程进行消息订阅
        pubSub.timeout(future);        //设置订阅超时
        RedissonLockEntry entry;
        if (interruptibly) {
   
   
            entry = commandExecutor.getInterrupted(future);
        } else {
   
   
            entry = commandExecutor.get(future);
        }

        try {
   
   
            while (true) {
   
   
             // 循环重试获取锁,直至重新获取锁成功才跳出循环
            // 此种做法阻塞进程,一直处于等待锁手动释放或者超时才继续线程 
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
   
   
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
   
   
                    try {
   
   
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
   
   
                        if (interruptibly) {
   
   
                            throw e;
                        }
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
   
   
                    if (interruptibly) {
   
   
                        entry.getLatch().acquire();
                    } else {
   
   
                        entry.getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
   
   
            unsubscribe(entry, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

非阻塞加锁

//不阻塞加锁,waitTime是最大容忍时间,这个概念不做过多解释,就是等待你自选的时间
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
   
   
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
   
   
            return true;
        }
        // 计算第一次尝试获取锁后剩余的时间
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
   
   
            acquireFailed(waitTime, unit, threadId);    //获取失败
            return false;
        }

        current = System.currentTimeMillis();
        //消息订阅
        CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        try {
   
   
            subscribeFuture.get(time, TimeUnit.MILLISECONDS);    //设置一个最多订阅时间
        } catch (TimeoutException e) {
   
   
            if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
                    "Unable to acquire subscription lock after " + time + "ms. " +
                            "Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
   
   
                subscribeFuture.whenComplete((res, ex) -> {
   
   
                    if (ex == null) {
   
   
                        unsubscribe(res, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId);
            return false;
        } catch (ExecutionException e) {
   
   
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        try {
   
   
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
   
   
                acquireFailed(waitTime, unit, threadId);
                return false;
            }

            while (true) {
   
   
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
   
   
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
   
   
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
   
   
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);//阻塞,等待消息
                } else {
   
   
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);//阻塞,等待消息
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
   
   
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
   
   
            unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

消息订阅

其上述类容中的订阅,都通过下面的方法进行回调,在解锁的时候会发布消息

package org.redisson.pubsub;

import org.redisson.RedissonLockEntry;

import java.util.concurrent.CompletableFuture;

/**
 * LockPubSub类是一个用于锁的发布-订阅实现。
 * 它继承自PublishSubscribe类,用于处理锁的订阅和消息发布。
 * 锁的订阅者是RedissonLockEntry对象。
 * 当接收到特定的消息时,会执行相应的操作。
 */
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
   
   

    // 解锁消息
    public static final Long UNLOCK_MESSAGE = 0L;
    // 读锁解锁消息
    public static final Long READ_UNLOCK_MESSAGE = 1L;

    public LockPubSub(PublishSubscribeService service) {
   
   
        super(service);
    }

    @Override
    protected RedissonLockEntry createEntry(CompletableFuture<RedissonLockEntry> newPromise) {
   
   
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
   
   
        if (message.equals(UNLOCK_MESSAGE)) {
   
   
            // 获取等待执行的Runnable对象,并执行
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
   
   
                runnableToExecute.run();
            }

            // 释放锁计数器
            value.getLatch().release();
        } else if (message.equals(READ_UNLOCK_MESSAGE)) {
   
   
            // 循环执行等待执行的Runnable对象,并执行
            while (true) {
   
   
                Runnable runnableToExecute = value.getListeners().poll();
                if (runnableToExecute == null) {
   
   
                    break;
                }
                runnableToExecute.run();
            }

            // 释放锁计数器,释放所有等待的读锁
            value.getLatch().release(value.getLatch().getQueueLength());
        }
    }

}

总结

其实这里收获最大的,应该是自旋锁,虽然说在最后使用了发布订阅来完成异步唤醒,但还是有一些缺点,比如这个锁是否公平,如果说,这里要让你实现公平锁,读者你又打算如何解决?

想到这里,我又想到了Reactor模型,其实我们可以做一个BossGroup来存放一下阻塞线程ID,其实就是一个阻塞队列

再用一个WorkerGroup来对每个ThreadID进行处理,当然这里提供的是一个思路,如果要完成的话,相当于是写一个小型中间件,也挺有意思,后面打算试一试,hhh

Redis-Lua快速学习

当编写 Lua 脚本与 Redis 进行交互时,以下是一些常用的 Lua 脚本指南和技巧:

  1. 命令调用:使用 redis.call 函数来调用 Redis 命令。例如,redis.call('GET', 'mykey') 将调用 Redis 的 GET 命令并返回键为 'mykey' 的值。
  2. 参数访问:可以使用 KEYS 表来访问传递给 Lua 脚本的键列表,使用 ARGV 表来访问传递给 Lua 脚本的额外参数。例如,KEYS[1] 表示第一个键,ARGV[1] 表示第一个额外参数。
  3. 返回结果:Lua 脚本可以通过使用 return 语句来返回结果。例如,return redis.call('GET', 'mykey') 将返回键为 'mykey' 的值。
  4. 循环和条件:Lua 提供了一些基本的循环和条件语句,例如 forwhileif 等,可以在 Lua 脚本中使用。
  5. 容错处理:在编写 Lua 脚本时,可以考虑添加容错处理,例如使用 pcall 函数来捕获 Redis 命令的错误并进行处理。
  6. 事务支持:Redis 的 Lua 脚本支持事务,可以使用 redis.call('MULTI') 开始事务,然后使用 redis.call('EXEC') 执行事务。在事务中,可以执行多个 Redis 命令,并将其作为一个原子操作进行提交或回滚。
  7. 脚本缓存:Redis 可以缓存 Lua 脚本,以提高执行效率。您可以使用 EVALSHA 命令来执行缓存的脚本。在 Java RedisTemplate 中,您可以使用 execute 方法的 execute(script, keys, args) 形式来执行缓存的脚本。

这些指南和技巧可帮助您编写更复杂和灵活的 Lua 脚本与 Redis 进行交互。在编写 Lua 脚本时,请参考 Redis 官方文档以及 Lua 官方文档,以了解更多 Lua 编程语言和 Redis 命令的细节和用法。

当编写 Lua 脚本时,可以使用循环和条件语句来实现逻辑控制。以下是一些示例:

  1. 使用 for 循环:
for i = 1, 10 do
  -- 执行操作,例如打印循环变量
  print(i)
end
  1. 使用 while 循环:
local i = 1
while i <= 10 do
  -- 执行操作,例如打印循环变量
  print(i)
  i = i + 1
end
  1. 使用 if-else 条件:
local num = 5
if num < 0 then
  print("Number is negative")
elseif num == 0 then
  print("Number is zero")
else
  print("Number is positive")
end
  1. 使用 repeat-until 循环:
local i = 1
repeat
  -- 执行操作,例如打印循环变量
  print(i)
  i = i + 1
until i > 10

这些示例展示了在 Lua 脚本中使用循环和条件语句的基本用法。您可以根据自己的需求和逻辑在 Lua 脚本中编写更复杂的循环和条件控制结构。请注意,在 Lua 中,条件语句使用 if-elseif-else 结构,而不是像其他编程语言中的 if-else 结构。此外,Lua 的索引从 1 开始,而不是从 0 开始,这与一些其他编程语言有所不同。

请确保根据您的实际需求和逻辑编写正确的循环和条件控制结构,并根据 Redis 脚本的要求将其集成到您的 Lua 脚本中。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
4天前
|
存储 消息中间件 缓存
Redis的高性能使得它非常适合用于实时分析场景
【5月更文挑战第15天】Redis在Python Web开发中扮演关键角色,常用于缓存系统,提高数据读取速度;会话管理,存储用户信息;分布式锁,确保数据一致性;排行榜和计数,利用有序集合和哈希结构;消息队列,基于列表结构实现异步处理;实时分析,高效处理实时数据。其丰富的数据结构和高性能使其在多种场景下应用广泛。
12 3
|
1天前
|
前端开发 JavaScript 算法
分布式系统的一致性级别划分及Zookeeper一致性级别分析
分布式系统的一致性级别划分及Zookeeper一致性级别分析
|
4天前
|
存储 监控 NoSQL
【Redis】分布式锁及其他常见问题
【Redis】分布式锁及其他常见问题
16 0
|
4天前
|
NoSQL Java Redis
【Redis】Redis实现分布式锁
【Redis】Redis实现分布式锁
7 0
|
4天前
|
NoSQL 网络协议 Java
Redis客户端Lettuce深度分析介绍(上)
Spring Boot自2.0版本开始默认使用Lettuce作为Redis的客户端(注1)。Lettuce客户端基于Netty的NIO框架实现,对于大多数的Redis操作,只需要维持单一的连接即可高效支持业务端的并发请求 —— 这点与Jedis的连接池模式有很大不同。同时,Lettuce支持的特性更加全面,且其性能表现并不逊于,甚至优于Jedis。本文通过分析Lettuce的特性和内部实现(基于6.0版本),及其与Jedis的对照比较,对这两种客户端,以及Redis服务端进行深度探讨。
|
4天前
|
监控 NoSQL 算法
探秘Redis分布式锁:实战与注意事项
本文介绍了Redis分区容错中的分布式锁概念,包括利用Watch实现乐观锁和使用setnx防止库存超卖。乐观锁通过Watch命令监控键值变化,在事务中执行修改,若键值被改变则事务失败。Java代码示例展示了具体实现。setnx命令用于库存操作,确保无超卖,通过设置锁并检查库存来更新。文章还讨论了分布式锁存在的问题,如客户端阻塞、时钟漂移和单点故障,并提出了RedLock算法来提高可靠性。Redisson作为生产环境的分布式锁实现,提供了可重入锁、读写锁等高级功能。最后,文章对比了Redis、Zookeeper和etcd的分布式锁特性。
134 16
探秘Redis分布式锁:实战与注意事项
|
4天前
|
NoSQL Java 大数据
介绍redis分布式锁
分布式锁是解决多进程在分布式环境中争夺资源的问题,与本地锁相似但适用于不同进程。以Redis为例,通过`setIfAbsent`实现占锁,加锁同时设置过期时间避免死锁。然而,获取锁与设置过期时间非原子性可能导致并发问题,解决方案是使用`setIfAbsent`的超时参数。此外,释放锁前需验证归属,防止误删他人锁,可借助Lua脚本确保原子性。实际应用中还有锁续期、重试机制等复杂问题,现成解决方案如RedisLockRegistry和Redisson。
|
4天前
|
缓存 NoSQL Java
【亮剑】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护,如何使用注解来实现 Redis 分布式锁的功能?
【4月更文挑战第30天】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护。基于 Redis 的分布式锁利用 SETNX 或 SET 命令实现,并考虑自动过期、可重入及原子性以确保可靠性。在 Java Spring Boot 中,可通过 `@EnableCaching`、`@Cacheable` 和 `@CacheEvict` 注解轻松实现 Redis 分布式锁功能。
|
4天前
|
数据采集 存储 运维
如何使用SkyWalking收集分析分布式系统的追踪数据
通过以上步骤,你可以使用 SkyWalking 工具实现对分布式系统的数据采集和可视化。SkyWalking 提供了强大的追踪和度量功能,帮助开发者和运维人员更好地理解系统的性能状况。欢迎关注威哥爱编程,一起学习成长。
|
4天前
|
NoSQL Redis 微服务
分布式锁_redis实现
分布式锁_redis实现