【📕分布式锁通关指南 08】源码剖析redisson可重入锁之释放及阻塞与非阻塞获取

简介: 本文深入剖析了Redisson中可重入锁的释放锁Lua脚本实现及其获取锁的两种方式(阻塞与非阻塞)。释放锁流程包括前置检查、重入计数处理、锁删除及消息发布等步骤。非阻塞获取锁(tryLock)通过有限时间等待返回布尔值,适合需快速反馈的场景;阻塞获取锁(lock)则无限等待直至成功,适用于必须获取锁的场景。两者在等待策略、返回值和中断处理上存在显著差异。本文为理解分布式锁实现提供了详实参考。

引言

有加锁自然就有解锁,本篇则将围绕锁的释放锁Lua脚本进行深入剖析,另外,还将对阻塞和非阻塞两张方式分别如何获取锁进行比较。

可重入锁之释放锁

这里我们依然是按照步骤来看看释放锁是如何执行的。

1.首先从入口方法开始:

public void unlock() {
   
    try {
   
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
   
        if (e.getCause() instanceof IllegalMonitorStateException) {
   
            throw (IllegalMonitorStateException) e.getCause();
        } else {
   
            throw e;
        }
    }
}

// 异步解锁方法
public RFuture<Void> unlockAsync(long threadId) {
   
    // 调用解锁的 Lua 脚本
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    return future.thenAccept((opStatus) -> {
   
        // 解锁成功,取消看门狗续期
        cancelExpirationRenewal(threadId);

        // 如果解锁的不是自己的锁,抛出异常
        if (!opStatus) {
   
            throw new IllegalMonitorStateException(
                "attempt to unlock lock, not locked by current thread by node id: "
                + id + " thread-id: " + threadId);
        }
    });
}

2.核心解锁Lua脚本实现:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
   
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
        // 检查锁是否存在
        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
            "return nil;" +
        "end; " +

        // 计算当前线程的重入次数
        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +

        // 如果重入次数还大于0,则更新过期时间
        "if (counter > 0) then " +
            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
            "return 0; " +
        "end; " +

        // 重入次数为0,删除锁
        "redis.call('del', KEYS[1]); " +
        // 发布锁释放的消息
        "redis.call('publish', KEYS[2], ARGV[1]); " +
        "return 1; ",

        // 脚本参数
        Arrays.asList(
            getName(),                // KEYS[1] 锁名称
            getChannelName(),         // KEYS[2] 发布订阅的channel名称
            RedissonLockEntry.UNLOCK_MESSAGE,    // ARGV[1] 解锁消息
            internalLockLeaseTime,    // ARGV[2] 锁过期时间
            getLockName(threadId)     // ARGV[3] 线程标识
        ));
}

3.梳理流程

  • 首先进行解锁的前置检查:检查是否存在对应线程的锁,如果不存在,则返回nil。
  • 如果获取锁成功,则:处理重入计数,即将当前线程的重入计数减1;如果重入计数还大于0,表示还有重入,则重新设置过期时间,返回0则表示锁还未完全释放。
  • 完全释放锁,即:当计数器为0,删除整个锁并发布锁释放的消息,通知等待的线程,返回1则表示锁已完全释放。
  • 后续处理,需要:解锁成功后取消看门狗续期和处理异常情况。

可重入锁之阻塞和非阻塞获取锁

redisson提供了两种不同方式获取锁的封装,我们这里比较讲下:

1.非阻塞获取锁 (tryLock)

public boolean tryLock() {
   
    return tryLock(-1, -1, TimeUnit.MILLISECONDS);
}

// 带超时的非阻塞获取锁
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
   
    return tryLock(waitTime, -1, unit);
}

// 核心实现
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
   
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();

    // 第一次尝试获取锁
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // ttl为空表示获取成功
    if (ttl == null) {
   
        return true;
    }

    // 如果没有等待时间,直接返回失败
    if (time <= 0) {
   
        return false;
    }

    // 计算剩余等待时间
    time -= System.currentTimeMillis() - current;
    if (time <= 0) {
   
        return false;
    }

    // 订阅锁释放通知
    RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
   
        return false;
    }

    try {
   
        while (true) {
   
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(leaseTime, unit, threadId);

            // 获取成功
            if (ttl == null) {
   
                return true;
            }

            // 超过等待时间,返回失败
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
   
                return false;
            }

            // 等待锁释放通知
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
   
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
   
                getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }

            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
   
                return false;
            }
        }
    } finally {
   
        // 取消订阅
        unsubscribe(subscribeFuture, threadId);
    }
}

2.阻塞获取锁 (Lock)

public void lock() {
   
    try {
   
        lock(-1, null, false);
    } catch (InterruptedException e) {
   
        throw new IllegalStateException();
    }
}

// 带超时的阻塞获取锁
public void lock(long leaseTime, TimeUnit unit) {
   
    try {
   
        lock(leaseTime, unit, false);
    } catch (InterruptedException e) {
   
        throw new IllegalStateException();
    }
}

// 核心实现
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
   
    long threadId = Thread.currentThread().getId();

    // 第一次尝试获取锁
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    if (ttl == null) {
   
        return;
    }

    // 订阅锁释放通知
    RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    if (interruptibly) {
   
        subscribeFuture.syncUninterruptibly();
    } else {
   
        subscribeFuture.sync();
    }

    try {
   
        while (true) {
   
            ttl = tryAcquire(leaseTime, unit, threadId);
            // 获取成功
            if (ttl == null) {
   
                break;
            }

            // 等待锁释放通知
            if (ttl >= 0) {
   
                try {
   
                    getEntry(threadId).getLatch().await(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
   
                    if (interruptibly) {
   
                        throw e;
                    }
                    getEntry(threadId).getLatch().await();
                }
            } else {
   
                if (interruptibly) {
   
                    getEntry(threadId).getLatch().await();
                } else {
   
                    getEntry(threadId).getLatch().awaitUninterruptibly();
                }
            }
        }
    } finally {
   
        // 取消订阅
        unsubscribe(subscribeFuture, threadId);
    }
}

3.两种方式的关键区别:

  • 等待策略:

    tryLock:有限时间等待,超时返回false

    lock:无限等待直到获取到锁

  • 返回值:

    tryLock:返回boolean,表示是否获取成功

    lock:无返回值,要么获取成功,要么一直等待

  • 中断处理:

    tryLock:支持中断

    lock:默认不响应中断,但可以通过lockInterruptibly方法支持中断

小结

关于可重入锁的相关源码刨析就告一段落了,在接下来的篇章中我们将继续分析不同类型锁的实现。

目录
相关文章
|
7月前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
631 0
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
8月前
|
NoSQL Redis
分布式锁设计吗,你是如何实现锁类型切换、锁策略切换基于限流的?
本方案基于自定义注解与AOP实现分布式锁,支持锁类型(如可重入锁、公平锁等)与加锁策略(如重试、抛异常等)的灵活切换,并结合Redisson实现可重入、自动续期等功能,通过LUA脚本保障原子性,兼顾扩展性与实用性。
184 0
|
9月前
|
缓存 NoSQL Java
【📕分布式锁通关指南 11】源码剖析redisson之读写锁的实现
Redisson 的 `RedissonReadWriteLock` 提供了高效的分布式读写锁实现,适用于读多写少的场景。通过 Redis 与 Lua 脚本结合,确保读锁并行、写锁互斥,以及读写之间的互斥,保障了分布式环境下的数据一致性。它支持可重入、自动过期和锁释放机制,提升了系统并发性能与资源控制能力。
229 0
|
9月前
|
NoSQL Java Redis
基于Redisson和自定义注解的分布式锁实现策略。
在实现分布式锁时,保证各个组件配置恰当、异常处理充足、资源清理彻底是至关重要的。这样保障了在分布布局场景下,锁的正确性和高效性,使得系统的稳健性得到增强。通过这种方式,可以有效预防并发环境下的资源冲突问题。
423 29
|
NoSQL 安全 调度
【📕分布式锁通关指南 10】源码剖析redisson之MultiLock的实现
Redisson 的 MultiLock 是一种分布式锁实现,支持对多个独立的 RLock 同时加锁或解锁。它通过“整锁整放”机制确保所有锁要么全部加锁成功,要么完全回滚,避免状态不一致。适用于跨多个 Redis 实例或节点的场景,如分布式任务调度。其核心逻辑基于遍历加锁列表,失败时自动释放已获取的锁,保证原子性。解锁时亦逐一操作,降低死锁风险。MultiLock 不依赖 Lua 脚本,而是封装多锁协调,满足高一致性需求的业务场景。
398 0
【📕分布式锁通关指南 10】源码剖析redisson之MultiLock的实现
|
7月前
|
缓存 NoSQL 关系型数据库
Redis缓存和分布式锁
Redis 是一种高性能的键值存储系统,广泛用于缓存、消息队列和内存数据库。其典型应用包括缓解关系型数据库压力,通过缓存热点数据提高查询效率,支持高并发访问。此外,Redis 还可用于实现分布式锁,解决分布式系统中的资源竞争问题。文章还探讨了缓存的更新策略、缓存穿透与雪崩的解决方案,以及 Redlock 算法等关键技术。
|
8月前
|
存储 负载均衡 NoSQL
【赵渝强老师】Redis Cluster分布式集群
Redis Cluster是Redis的分布式存储解决方案,通过哈希槽(slot)实现数据分片,支持水平扩展,具备高可用性和负载均衡能力,适用于大规模数据场景。
546 2
|
8月前
|
存储 缓存 NoSQL
【📕分布式锁通关指南 12】源码剖析redisson如何利用Redis数据结构实现Semaphore和CountDownLatch
本文解析 Redisson 如何通过 Redis 实现分布式信号量(RSemaphore)与倒数闩(RCountDownLatch),利用 Lua 脚本与原子操作保障分布式环境下的同步控制,帮助开发者更好地理解其原理与应用。
539 6
|
9月前
|
存储 缓存 NoSQL
Redis核心数据结构与分布式锁实现详解
Redis 是高性能键值数据库,支持多种数据结构,如字符串、列表、集合、哈希、有序集合等,广泛用于缓存、消息队列和实时数据处理。本文详解其核心数据结构及分布式锁实现,帮助开发者提升系统性能与并发控制能力。
|
9月前
|
NoSQL Redis
Lua脚本协助Redis分布式锁实现命令的原子性
利用Lua脚本确保Redis操作的原子性是分布式锁安全性的关键所在,可以大幅减少由于网络分区、客户端故障等导致的锁无法正确释放的情况,从而在分布式系统中保证数据操作的安全性和一致性。在将这些概念应用于生产环境前,建议深入理解Redis事务与Lua脚本的工作原理以及分布式锁的可能问题和解决方案。
327 8
下一篇
开通oss服务