Redis分布式锁常见坑点分析

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Redis分布式锁常见坑点分析

image.png

开启掘金成长之旅!这是我参与「掘金日新计划 · 2 月更文挑战」的第 4 天,点击查看活动详情

相信在日常开发中,基于 Redis 天然支持分布式锁,大家在线上分布式项目中都使用过 Redis 锁。本文主要针对日常开发中加锁过程中某些异常场景进行讲解与分析。本文讲解示例代码都在 github.com/wayn111/new… 项目 test 目录下 RedisLockTest 类中。

版本声明:

一、任务超时,锁已经过期

这个异常场景说实话发生概率很低,大部分情况下加锁时任务执行都会很快,锁还没到期,任务自己就会删除锁。除非说任务调用第三方接口不稳定导致超时、数据库查询突然变得非常慢就可能会产生这个异常场景。

那怎么处理这个异常嘞?大部分人可能都会回答添加一个定时任务,在定时任务内检测锁快过期时,进行续期操作。OK,这么做好像是可以解决这个异常,那么博主在这里给出自己的见解。

1.1 先说一个暴论:如果料想到有这类异常产生,为什么不在加锁时,就把加锁过期时间设置大一点

不管所续期还是增大加锁时长,都会导致一个问题,其他线程会迟迟获取不到锁,一直被阻塞。那结果都一样,为什么不直接增大加锁时间?

想法是好的,但是实际上,加锁时间的设置是我们主观臆断的,我们无法保证这个加锁代码的执行时间一定在我们的锁过期时间内。作为一个严谨的程序员,我们需要对我们的代码有客观认知,任务执行可能几千上亿万次都是正常,但就是那么一次它执行超时了,可能由于外部依赖、当前运行环境的异常导致。

1.2 直接不设置过期时间,任务不执行完,不释放锁

如果在加锁时就不设置过期时间的话,理论上好像是可以解决这个问题,任务不执行完,锁就不会释放。但是作为程序员,总觉得哪里怪怪的,任务不执行完,锁就不会释放!

仔细想想,我们一般在 try 中进行加锁 在 finally 进行锁释放,这个好像也没毛病哦。但是实际针对一些极端异常场景下,如果任务执行过程中,服务器宕机、程序突然被杀掉、网络断连等都可能造成这个锁释放不了,另一个任务就一直获取不到锁。

这个方案程序正常的情况下,可以满足我们的要求,但是一旦发生异常将导致锁无法释放的后果,也就是说只要我们解决这个锁在异常场景下无法释放的问题,这个方案还是OK的。博主这里直接给出方案:

在不设置过期时间的加锁操作成功时,给一个默认过期时间比如三十秒,同时启动一个定时任务,给我们的锁进行自动续期,每隔 默认过期时间 / 3 秒后执行一次续期操作,发生锁剩余时长小于 默认过期时间 / 2 就重新赋值过期时长为三十秒。这样的话,可以保证锁必须由任务执行完才能释放,当程序异常发生时,仍然能保证锁会在三十秒内释放。

1.3 设置过期时间,任务不执行完,不释放锁

这个方案本质上与方案二的解决方案相同,还是启动定时任务进行续期操作,流程这里不做多余讲述。需要注意的就是加锁指定过期时间会比较符合我们的客观认知。实际上他的底层逻辑跟方案二相同,无非就是定时任务执行间隔,锁剩余时长续期判断要根据过期时间来计算。


综合来看:方案三会最合适,符合我们的客观认知,跟我们之前对 Redis 的使用逻辑较为相近。

二、线程B加锁执行中未释放锁,线程A释放了线程B的锁

说实话我仔细思考了一下这个异常场景,发现这个异常是个伪命题,如果线程 B 正在执行时,线程 A 怎么能获取到线程B的锁!线程 A 获取不到线程 B 的锁,谈何来去释放线程 B 的锁!如果线程 A 能获取到线程 B 的锁那么这个分布式锁的代码一开始就已经错了。

这里回到这个异常场景本身,我们可以给每个线程设置请求ID,加锁成功将请求ID设置为加锁 key 的对应 value,线程释放锁时需要判断当前线程的请求ID与 加锁 key 的对应 value 是否相同,相同则可以释放锁,不相同则不允许释放。

三、线程加锁成功后继续申请加锁

这个场景主要发生在加锁代码内部调用栈过深,比如说加锁成功执行方法 a,在方法 a 内又重复申请了同一把锁,导致线程把自己锁住了,这个业界的主流叫法是叫锁的可重入性。

解决方式有两种,一是修改方法内的加锁逻辑,不要加同一把锁,修改方法 a 内的加锁 key 名称。二是针对加锁逻辑做修改,实现可重入性。

这里简单介绍如何实现可重入性,给每个线程设置请求ID,加锁成功将请求ID设置为加锁 key 的对应 value,针对同一个线程的重复加锁,判断当前线程已存在请求ID的情况下,请求ID直接与加锁 key 的对应 value 相比较,相同则直接返回加锁成功。

四、 代码实践

4.1 加锁自动续期实践

设置锁过期时间为10秒,然后该任务执行15秒,代码如下:

ps: 以下代码都可以在 github.com/wayn111/new… 项目 test 目录下 RedisLockTest 类中找到

@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class RedisLockTest {
    @Autowired
    private RedisLock redisLock;
    @Test
    @Test
    public void redisLockNeNewTest() {
        String key = "test";
        try {
            log.info("---申请加锁");
            if (redisLock.lock(key, 10)) {
                // 模拟任务执行15秒
                log.info("---加锁成功");
                Thread.sleep(15000);
                log.info("---执行完毕");
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        } finally {
            redisLock.unLock(key);
        }
    }
}

执行如下:

image.png

可以看出就算任务执行超过过期时间也能通过自动续期让代码正常执行。

4.2 多线程下其他线程无法共同申请到同一把锁实践

启动两个线程,线程 A 先加锁, 线程 B 后枷锁

@Test
public void redisLockReleaseSelfTest() throws IOException {
    new Thread(() -> {
        String key = "test";
        try {
            log.info("---申请加锁");
            if (redisLock.lock(key, 10)) {
                // 模拟任务执行15秒
                log.info("---加锁成功");
                Thread.sleep(15000);
                log.info("---执行完毕");
            } else {
                log.info("---加锁失败");
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        } finally {
            redisLock.unLock(key);
        }
    }, "thread-A").start();
    new Thread(() -> {
        String key = "test";
        try {
            Thread.sleep(100L);
            log.info("---申请加锁");
            if (redisLock.lock(key, 10)) {
                // 模拟任务执行15秒
                log.info("---加锁成功");
                Thread.sleep(15000);
                log.info("---执行完毕");
            } else {
                log.info("---加锁失败");
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        } finally {
            redisLock.unLock(key);
        }
    }, "thread-B").start();
    System.in.read();
}

结果如下:

image.png

可以看到,线程 A 先申请到锁,线程 B 后申请锁,结果线程 B 申请加锁失败。

4.3 锁得可重入性实践

当前线程加锁成功后,在线程执行中继续申请同一把锁,代码如下:

@Test
public void redisLockReEntryTest() {
    String key = "test";
    try {
        log.info("---申请加锁");
        if (redisLock.lock(key, 10)) {
            // 模拟任务执行15秒
            log.info("---加锁第一次成功");
            if (redisLock.lock(key, 10)) {
                // 模拟任务执行15秒
                log.info("---加锁第二次成功");
                Thread.sleep(15000);
                log.info("---加锁第二次执行完毕");
            } else {
                log.info("---加锁第二次失败");
            }
            Thread.sleep(15000);
            log.info("---加锁第一次执行完毕");
        } else {
            log.info("---加锁第一次失败");
        }
    } catch (Exception e) {
        log.error(e.getMessage(), e);
    } finally {
        redisLock.unLock(key);
    }
}

结果如下:

image.png

可以看到,线程在第一次申请加锁成功后,继续申请同一把锁,仍然加锁成功,因此这个操作是实现了可重入性的,虽然没那么细致。

4.4 加解锁逻辑讲解

直接贴出本文最核心 RedisLock 类全部代码:

@Slf4j
@Component
public class RedisLock {
    @Autowired
    public RedisTemplate redisTemplate;
    /**
     * 默认锁过期时间20秒
     */
    public static final Integer DEFAULT_TIME_OUT = 30;
    /**
     * 保存线程id-ThreadLocal
     */
    private ThreadLocal<String> stringThreadLocal = new ThreadLocal<>();
    /**
     * 保存定时任务(watch-dog)-ThreadLocal
     */
    private ThreadLocal<ExecutorService> executorServiceThreadLocal = new ThreadLocal<>();
    /**
     * 加锁,不指定过期时间
     *
     * @param key key名称
     * @return boolean
     */
    public boolean lock(String key) {
        return lock(key, null);
    }
    /**
     * 加锁
     *
     * @param key     key名称
     * @param timeout 过期时间
     * @return boolean
     */
    public boolean lock(String key, Integer timeout) {
        Integer timeoutTmp;
        if (timeout == null) {
            timeoutTmp = DEFAULT_TIME_OUT;
        } else {
            timeoutTmp = timeout;
        }
        String nanoId;
        if (stringThreadLocal.get() != null) {
            nanoId = stringThreadLocal.get();
        } else {
            nanoId = IdUtil.nanoId();
            stringThreadLocal.set(nanoId);
        }
        RedisScript<Long> redisScript = new DefaultRedisScript<>(buildLuaLockScript(), Long.class);
        Long execute = (Long) redisTemplate.execute(redisScript, Collections.singletonList(key), nanoId, timeoutTmp);
        boolean flag = execute != null && execute == 1;
        if (flag) {
            ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
            executorServiceThreadLocal.set(scheduledExecutorService);
            scheduledExecutorService.scheduleWithFixedDelay(() -> {
                RedisScript<Long> renewRedisScript = new DefaultRedisScript<>(buildLuaRenewScript(), Long.class);
                Long result = (Long) redisTemplate.execute(renewRedisScript, Collections.singletonList(key), nanoId, timeoutTmp);
                if (result != null && result == 2) {
                    ThreadUtil.shutdownAndAwaitTermination(scheduledExecutorService);
                }
            }, 0, timeoutTmp / 3, TimeUnit.SECONDS);
        }
        return flag;
    }
    /**
     * 释放锁
     *
     * @param key key名称
     * @return boolean
     */
    public boolean unLock(final String key) {
        String nanoId = stringThreadLocal.get();
        RedisScript<Long> redisScript = new DefaultRedisScript<>(buildLuaUnLockScript(), Long.class);
        Long execute = (Long) redisTemplate.execute(redisScript, Collections.singletonList(key), nanoId);
        boolean flag = execute != null && execute == 1;
        if (flag) {
            if (executorServiceThreadLocal.get() != null) {
                ThreadUtil.shutdownAndAwaitTermination(executorServiceThreadLocal.get());
            }
        }
        return flag;
    }
    private String buildLuaLockScript() {
        return """
                local key = KEYS[1]
                local value = ARGV[1]
                local time_out = ARGV[2]
                local result = redis.call('get', key)
                if result == value then
                    return 1;
                end
                local lock_result = redis.call('setnx', key, value)
                if tonumber(lock_result) == 1 then
                    redis.call('expire', key, time_out)
                    return 1;
                else
                    return 0;
                end
                """;
    }
    private String buildLuaUnLockScript() {
        return """
                local key = KEYS[1]
                local value = ARGV[1]
                local result = redis.call('get', key)
                if result ~= value then
                    return 0;
                else
                    redis.call('del', key)
                end
                return 1;
                """;
    }
    private String buildLuaRenewScript() {
        return """
                local key = KEYS[1]
                local value = ARGV[1]
                local timeout = ARGV[2]
                local result = redis.call('get', key)
                if result ~= value then
                    return 2;
                end
                local ttl = redis.call('ttl', key)
                if tonumber(ttl) < tonumber(timeout) / 2 then
                    redis.call('expire', key, timeout)
                    return 1;
                else
                    return 0;
                end
                """;
    }
}

加锁逻辑:这里我把加锁逻辑分解成三步展示给大家

  • 加锁前:先判断当前线程是否存在请求ID,不存在则生成,存在就直接使用。
  • 加锁中:通过 lua 脚本执行原子加锁操作, 加锁时先判断当前线程ID与加锁 key 得 value 是否相等,相等则是同一个线程的锁重入,直接返加锁成功。不相等则设置加锁 value 为请求ID以及过期时间。
  • 加锁后:启动一个定时任务,每隔 过期时间 / 3 秒后执行一次续期操作,发现锁剩余时间不足 过期时间 / 2 秒后,通过 lua 脚本进行续期操作。

解锁逻辑:这里我把解锁逻辑分解成两步展示给大家

  • 解锁中:通过 lua 脚本执行解锁操作,先判断加锁 key 的 value 是否与自身请求ID相同,相同则让解锁,不相同则不让解锁。
  • 解锁后:删除续期定时任务。

五、总结

其实本文得核心逻辑有许多都是参考 Redission 客户端而写,对于这些常见得坑点,博主结合自身思考,业界知识总结并自己实现一个分布式锁得工具类。希望大家看了有所收获,对日常业务中 Redis 分布式锁的使用能有更深的理解。



相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
2月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
2月前
|
存储 NoSQL Redis
Redis 新版本引入多线程的利弊分析
【10月更文挑战第16天】Redis 新版本引入多线程是一个具有挑战性和机遇的改变。虽然多线程带来了一些潜在的问题和挑战,但也为 Redis 提供了进一步提升性能和扩展能力的可能性。在实际应用中,我们需要根据具体的需求和场景,综合评估多线程的利弊,谨慎地选择和使用 Redis 的新版本。同时,Redis 开发者也需要不断努力,优化和完善多线程机制,以提供更加稳定、高效和可靠的 Redis 服务。
51 1
|
27天前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
54 16
|
1月前
|
缓存 监控 NoSQL
Redis 缓存穿透的检测方法与分析
【10月更文挑战第23天】通过以上对 Redis 缓存穿透检测方法的深入探讨,我们对如何及时发现和处理这一问题有了更全面的认识。在实际应用中,我们需要综合运用多种检测手段,并结合业务场景和实际情况进行分析,以确保能够准确、及时地检测到缓存穿透现象,并采取有效的措施加以解决。同时,要不断优化和改进检测方法,提高检测的准确性和效率,为系统的稳定运行提供有力保障。
50 5
|
2月前
|
程序员
后端|一个分布式锁「失效」的案例分析
小猿最近很苦恼:明明加了分布式锁,为什么并发还是会出问题呢?
33 2
|
2月前
|
NoSQL Redis 数据库
计数器 分布式锁 redis实现
【10月更文挑战第5天】
51 1
|
2月前
|
NoSQL 算法 关系型数据库
Redis分布式锁
【10月更文挑战第1天】分布式锁用于在多进程环境中保护共享资源,防止并发冲突。通常借助外部系统如Redis或Zookeeper实现。通过`SETNX`命令加锁,并设置过期时间防止死锁。为避免误删他人锁,加锁时附带唯一标识,解锁前验证。面对锁提前过期的问题,可使用守护线程自动续期。在Redis集群中,需考虑主从同步延迟导致的锁丢失问题,Redlock算法可提高锁的可靠性。
83 4
|
机器学习/深度学习 缓存 NoSQL
|
缓存 NoSQL Java
为什么分布式一定要有redis?
1、为什么使用redis 分析:博主觉得在项目中使用redis,主要是从两个角度去考虑:性能和并发。当然,redis还具备可以做分布式锁等其他功能,但是如果只是为了分布式锁这些其他功能,完全还有其他中间件(如zookpeer等)代替,并不是非要使用redis。
1367 0
|
2月前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
78 6