Redis分布式锁深入分析

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 其实这里收获最大的,应该是自旋锁,虽然说在最后使用了发布订阅来完成异步唤醒,但还是有一些缺点,比如这个锁是否公平,如果说,这里要让你实现公平锁,读者你又打算如何解决?想到这里,我又想到了Reactor模型,其实我们可以做一个BossGroup来存放一下阻塞线程ID,其实就是一个阻塞队列再用一个WorkerGroup来对每个ThreadID进行处理,当然这里提供的是一个思路,如果要完成的话,相当于是写一个小型中间件,也挺有意思,后面打算试一试,hhh。

最原始的Redis分布式锁

最开始大家刚学分布式锁的时候,用的是这个指令,

setnx key value

然后使用expire给他设置过期时间

看似没有问题

image-20230616021643706

难道真的没问题吗?

试想一下,在高并发下,redis出现了雪崩,那么你设置了setnx,但是在设置expire之前崩了,呃呃呃~

没错,要解决这个问题,得实现原子性,原子性,我们在MySQL里面通过学习了事务来解决

image-20230616022013037

那么redis,能不能类似实现事务呢?其实redis本身是有事务的,但是这种简单的语句,用Lua也行(没错,就是你打游戏开脚本哪个)

但是在这里我们不讲Lua,主要说一下思想,其实就是通过lua将两个原子语句封装在一起,再发送给redis服务器进行执行

lua-redis快速入门直接看最后

这个分布式锁实现过于简单,就不在这里说了,hhh~

Redis官方针对SETNX的改动

其实Redis官方在后面也看出了SETNX的缺点,所以他在2.6.12版本开始,加入了一个新的指令

set key value EX|PX nx|ex

EX|PX是expire和pexpire,nx是不存在则执行,ex是存在则执行

简单说下,然后RedisTemplate.setValue().setIfAbsent()方法也进行了重写

这样就保证了原子性,这个方法在我之前的文章里面也用过。

并且我参考Redisson的思想制作了分布式锁看门狗机制

当时其实是在想续期问题如何解决,解决之后就感觉自己

image-20230616022948257

直到最近,群佬看博客,指出了一个问题,这个方法是否可重入呢?

image-20230616023100917

说到这里,你可能对可重入有点迷惑,那么现在,我们来介绍一下可重入锁

可重入锁

什么是可重入锁?3ymnuu3uzghks_20230616_61e653dea1bd4e86bbbbfc37b4effb80.jpeg

来看看介绍吧。

可重入就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁。

简单的例子,这里我用伪代码来解释

syn(this){
    sout("加锁成功A")
    syn(this){
        sout("加锁成功B")
    }
}

那么这里,我们会发生什么呢?

按照没接触可重入锁的情况或者没有这样试过的情况来说,执行完 sout("加锁成功A")后便会产生死锁问题

而可重入锁,就是说,在此时,你依然可以进入并执行sout("加锁成功B")

那么应用场景?

最容易想到的是递归调用,但是还有其他的业务方面可以说一说,

比如你要调用业务方法A,业务A中有操纵了要上锁业务B,同时业务A又需要全局上锁,那么这个地方就需要可重入了

基于Redis-Hash的可重入锁实现

在Redisson中,采用的是hash进行锁的存储,然后对hash设置一个过期时间

大概的数据结构是这样的

image-20230616025322273

hashname为key,hashkey为thread1,value是锁的重入次数

但是这里我要提一点,这里的thread1,可不仅仅是threadId,使用分布式锁通常是在分布式、微服务i项目下,不同的服务中也有可能出现线程ID相同的问题,所以这里加一个服务名,其实生成个UUID就可以了

大概的格式就是这样:

mylock:HEX(uuid+theadId):num

但是还有个设置过期时间的问题,如何设置?

我这里跟着我之前的帖子来讲,在那里我是使用的RedisTemplate来实现分布式锁+看门狗机制

但是没有考虑可重入的问题,那么我这次就加上

我们要加过期时间,同时又要确保原子性,那么就用Lua

加锁

对于加锁的Lua如下

    -- 如果Lock存在
    if redis.call("exists",KEYS[1]) ~= 0 then
        -- 如果不是自己的锁
        if redis.call("exists",KEYS[1],ARGS[1]) == 0 then
            -- 不是自己的锁,返回剩余时间
            return redis.call("pttl",KEYS[1]);
        end
        -- 如果是自己的锁就记录次数
        redis.call("hincrby",KEYS[1],ARGS[1],1);
        -- 延期
        redis.call("pexpire",KEYS[1],ARGS[2]);
    else
        redis.call("hset",KEYS[1],ARGS[1],1);
        -- 设置默认延期
        redis.call("pexpire",KEYS[1],ARGS[2]);
    end
    -- 如果Lock不存在,那么就直接加上就可以了,hhh
    return nil;

这里解释一下KEY和ARG,key是hash名,args是指命令携带参数

key1:索命

args1:服务线程唯一ID

args2:过期时间

然后在代码里面的实现

image-20230616025942863

解锁

解锁也差不多

--    解锁的逻辑和加锁相似
    -- 如果Lock存在
    if redis.call("exists",KEYS[1]) ~= 0 then
        -- 如果是自己的锁
        if redis.call("hexists",KEYS[1],ARGS[1]) ~= 0 then
            -- 如果是最后一层 直接delete
            if redis.call("hget",KEYS[1],ARGS[1]) == 0 then
                redis.call("del",KEYs[1]);
                a=0
            else
            -- 如果不是,那么久锁层数减一
                a=redis.call("hincrby",KEYS[1],ARGS[1],-1);
            end
        end
        return a;
    end
    -- 如果Lock不存在,那么就return,hhh
    return nil;

image-20230616030009479

续期的话本来就是一条语句,不变就可以了

然后我和之前的代码相比,自旋锁改了一下,hhh

image-20230616030117535

看门狗机制实现

之前其实已经实现过,这里就再来看看吧,这里我为了方便一点,用的Hutool来演示,但是实际用的时候还是用Netty等框架比较好,毕竟Redission也是用的Netty

image-20230616030317267

目前还存在的问题+Reddisson源码分析 —— 自旋锁

没错,别以为这样就完了,细心的话会发现我上面的代码里面,写的是最暴力的自旋锁(图一个方便,hhh)

如果说一直循环下去,那么无疑是非常浪费CPU的

站着茅坑不拉屎是我的错,但是看着别人蹲上了,我心里又特别难过表情包图片gif动图 - 求表情网,斗图从此不求人!

那么如何解决?

解决方案

细心的同学已经发现了,在我加锁失败的时候,会返回一个ttl,也就是当前key还有多久失效

那么我们是不是可以在while里面是指一个阻塞,然后等过了这么久再唤醒线程就可以了?

没错,Reddisson底层也是这样实现的,基于Redis发布订阅,但是这里我给大家简单引个路子

你可以理解为把阻塞的线程ID放进一个阻塞队列里面,而我们的服务器就去订阅这个队列,其实这个队列在Redis里面叫做Channel,感兴趣的可以去看看。

那么是如何订阅的呢?

其实在源代码中,Redisson是放了一个“消息检测器”来进行监听

下面来看看Redisson加锁的代码

阻塞加锁源码 lock()

//阻塞加锁
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;        //这里拿到锁了
        }
        CompletableFuture<RedissonLockEntry> future = subscribe(threadId);    //对当前线程进行消息订阅
        pubSub.timeout(future);        //设置订阅超时
        RedissonLockEntry entry;
        if (interruptibly) {
            entry = commandExecutor.getInterrupted(future);
        } else {
            entry = commandExecutor.get(future);
        }

        try {
            while (true) {
             // 循环重试获取锁,直至重新获取锁成功才跳出循环
            // 此种做法阻塞进程,一直处于等待锁手动释放或者超时才继续线程 
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    try {
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        entry.getLatch().acquire();
                    } else {
                        entry.getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(entry, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

非阻塞加锁

//不阻塞加锁,waitTime是最大容忍时间,这个概念不做过多解释,就是等待你自选的时间
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        // 计算第一次尝试获取锁后剩余的时间
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);    //获取失败
            return false;
        }
        
        current = System.currentTimeMillis();
        //消息订阅
        CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        try {
            subscribeFuture.get(time, TimeUnit.MILLISECONDS);    //设置一个最多订阅时间
        } catch (TimeoutException e) {
            if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
                    "Unable to acquire subscription lock after " + time + "ms. " +
                            "Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
                subscribeFuture.whenComplete((res, ex) -> {
                    if (ex == null) {
                        unsubscribe(res, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId);
            return false;
        } catch (ExecutionException e) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);//阻塞,等待消息
                } else {
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);//阻塞,等待消息
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

消息订阅

其上述类容中的订阅,都通过下面的方法进行回调,在解锁的时候会发布消息

package org.redisson.pubsub;

import org.redisson.RedissonLockEntry;

import java.util.concurrent.CompletableFuture;

/**
 * LockPubSub类是一个用于锁的发布-订阅实现。
 * 它继承自PublishSubscribe类,用于处理锁的订阅和消息发布。
 * 锁的订阅者是RedissonLockEntry对象。
 * 当接收到特定的消息时,会执行相应的操作。
 */
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    // 解锁消息
    public static final Long UNLOCK_MESSAGE = 0L;
    // 读锁解锁消息
    public static final Long READ_UNLOCK_MESSAGE = 1L;

    public LockPubSub(PublishSubscribeService service) {
        super(service);
    }
    
    @Override
    protected RedissonLockEntry createEntry(CompletableFuture<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(UNLOCK_MESSAGE)) {
            // 获取等待执行的Runnable对象,并执行
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }

            // 释放锁计数器
            value.getLatch().release();
        } else if (message.equals(READ_UNLOCK_MESSAGE)) {
            // 循环执行等待执行的Runnable对象,并执行
            while (true) {
                Runnable runnableToExecute = value.getListeners().poll();
                if (runnableToExecute == null) {
                    break;
                }
                runnableToExecute.run();
            }

            // 释放锁计数器,释放所有等待的读锁
            value.getLatch().release(value.getLatch().getQueueLength());
        }
    }

}

总结

其实这里收获最大的,应该是自旋锁,虽然说在最后使用了发布订阅来完成异步唤醒,但还是有一些缺点,比如这个锁是否公平,如果说,这里要让你实现公平锁,读者你又打算如何解决?

想到这里,我又想到了Reactor模型,其实我们可以做一个BossGroup来存放一下阻塞线程ID,其实就是一个阻塞队列

再用一个WorkerGroup来对每个ThreadID进行处理,当然这里提供的是一个思路,如果要完成的话,相当于是写一个小型中间件,也挺有意思,后面打算试一试,hhh

Redis-Lua快速学习

当编写 Lua 脚本与 Redis 进行交互时,以下是一些常用的 Lua 脚本指南和技巧:

  1. 命令调用:使用 redis.call 函数来调用 Redis 命令。例如,redis.call('GET', 'mykey') 将调用 Redis 的 GET 命令并返回键为 'mykey' 的值。
  2. 参数访问:可以使用 KEYS 表来访问传递给 Lua 脚本的键列表,使用 ARGV 表来访问传递给 Lua 脚本的额外参数。例如,KEYS[1] 表示第一个键,ARGV[1] 表示第一个额外参数。
  3. 返回结果:Lua 脚本可以通过使用 return 语句来返回结果。例如,return redis.call('GET', 'mykey') 将返回键为 'mykey' 的值。
  4. 循环和条件:Lua 提供了一些基本的循环和条件语句,例如 forwhileif 等,可以在 Lua 脚本中使用。
  5. 容错处理:在编写 Lua 脚本时,可以考虑添加容错处理,例如使用 pcall 函数来捕获 Redis 命令的错误并进行处理。
  6. 事务支持:Redis 的 Lua 脚本支持事务,可以使用 redis.call('MULTI') 开始事务,然后使用 redis.call('EXEC') 执行事务。在事务中,可以执行多个 Redis 命令,并将其作为一个原子操作进行提交或回滚。
  7. 脚本缓存:Redis 可以缓存 Lua 脚本,以提高执行效率。您可以使用 EVALSHA 命令来执行缓存的脚本。在 Java RedisTemplate 中,您可以使用 execute 方法的 execute(script, keys, args) 形式来执行缓存的脚本。

这些指南和技巧可帮助您编写更复杂和灵活的 Lua 脚本与 Redis 进行交互。在编写 Lua 脚本时,请参考 Redis 官方文档以及 Lua 官方文档,以了解更多 Lua 编程语言和 Redis 命令的细节和用法。

当编写 Lua 脚本时,可以使用循环和条件语句来实现逻辑控制。以下是一些示例:

  1. 使用 for 循环:
for i = 1, 10 do
  -- 执行操作,例如打印循环变量
  print(i)
end
  1. 使用 while 循环:
local i = 1
while i <= 10 do
  -- 执行操作,例如打印循环变量
  print(i)
  i = i + 1
end
  1. 使用 if-else 条件:
local num = 5
if num < 0 then
  print("Number is negative")
elseif num == 0 then
  print("Number is zero")
else
  print("Number is positive")
end
  1. 使用 repeat-until 循环:
local i = 1
repeat
  -- 执行操作,例如打印循环变量
  print(i)
  i = i + 1
until i > 10

这些示例展示了在 Lua 脚本中使用循环和条件语句的基本用法。您可以根据自己的需求和逻辑在 Lua 脚本中编写更复杂的循环和条件控制结构。请注意,在 Lua 中,条件语句使用 if-elseif-else 结构,而不是像其他编程语言中的 if-else 结构。此外,Lua 的索引从 1 开始,而不是从 0 开始,这与一些其他编程语言有所不同。

请确保根据您的实际需求和逻辑编写正确的循环和条件控制结构,并根据 Redis 脚本的要求将其集成到您的 Lua 脚本中。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
2月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
2月前
|
存储 NoSQL Redis
Redis 新版本引入多线程的利弊分析
【10月更文挑战第16天】Redis 新版本引入多线程是一个具有挑战性和机遇的改变。虽然多线程带来了一些潜在的问题和挑战,但也为 Redis 提供了进一步提升性能和扩展能力的可能性。在实际应用中,我们需要根据具体的需求和场景,综合评估多线程的利弊,谨慎地选择和使用 Redis 的新版本。同时,Redis 开发者也需要不断努力,优化和完善多线程机制,以提供更加稳定、高效和可靠的 Redis 服务。
56 1
|
16天前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
47 5
|
20天前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
38 8
|
25天前
|
机器学习/深度学习 分布式计算 算法
【大数据分析&机器学习】分布式机器学习
本文主要介绍分布式机器学习基础知识,并介绍主流的分布式机器学习框架,结合实例介绍一些机器学习算法。
156 5
|
1月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
57 16
|
29天前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
39 5
|
1月前
|
缓存 监控 NoSQL
Redis 缓存穿透的检测方法与分析
【10月更文挑战第23天】通过以上对 Redis 缓存穿透检测方法的深入探讨,我们对如何及时发现和处理这一问题有了更全面的认识。在实际应用中,我们需要综合运用多种检测手段,并结合业务场景和实际情况进行分析,以确保能够准确、及时地检测到缓存穿透现象,并采取有效的措施加以解决。同时,要不断优化和改进检测方法,提高检测的准确性和效率,为系统的稳定运行提供有力保障。
55 5
|
2月前
|
程序员
后端|一个分布式锁「失效」的案例分析
小猿最近很苦恼:明明加了分布式锁,为什么并发还是会出问题呢?
35 2
|
NoSQL Redis 数据库
用redis实现分布式锁时容易踩的5个坑
云栖号资讯:【点击查看更多行业资讯】在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来! 近有不少小伙伴投入短视频赛道,也出现不少第三方数据商,为大家提供抖音爬虫数据。 小伙伴们有没有好奇过,这些数据是如何获取的,普通技术小白能否也拥有自己的抖音爬虫呢? 本文会全面解密抖音爬虫的幕后原理,不需要任何编程知识,还请耐心阅读。
用redis实现分布式锁时容易踩的5个坑