【📕分布式锁通关指南 12】源码剖析redisson如何利用Redis数据结构实现Semaphore和CountDownLatch

简介: 本文解析 Redisson 如何通过 Redis 实现分布式信号量(RSemaphore)与倒数闩(RCountDownLatch),利用 Lua 脚本与原子操作保障分布式环境下的同步控制,帮助开发者更好地理解其原理与应用。

引言

本期我们继续探索 Redisson 的分布式同步组件。本篇聚焦在 Semaphore(信号量) 和 CountDownLatch(倒数闩),看看它们在单机环境中常用的同步思想是如何“穿越”到分布式场景,并依赖 Redis 来保证一致性的。

在普通的 Java 线程编程中,Semaphore 常用于控制对特定资源的同时访问数,而 CountDownLatch 则常用于等待其他线程把某些工作执行完毕后再继续。但若应用部署在分布式环境,单机版本的对象自然约束在某一 JVM 中,无法跨节点协同控制。此时我们可以借助 Redisson 提供的 RSemaphoreRCountDownLatch,让这两种同步模式在分布式下也能“有法可依”。

介绍

RSemaphore

  • 使用场景

可以限制同时访问共享资源(如数据库连接、缓存访问、某段关键逻辑等)的并发数量。

  • 关键特性

1.可动态调整可用许可数量;

2.在分布式集群环境下,任何一个节点都能正确感知许可的剩余数量。

RCountDownLatch

  • 使用场景

类似“倒计时”,需要等待某些操作完成后才能继续。例如:只有当分布式系统中多个子任务都执行完成后,主流程才能继续执行。

  • 关键特性

1.可设置初始倒计数值,不同节点根据执行进度减少计数。

2.当计数归零时,就会唤醒等待该闩的节点。

接下来,让我们直击它们在源码中的实现方式。

Semaphore 的实现思路

Redisson 提供的分布式信号量接口 RSemaphore 在底层主要依赖 Redis 的 Lua 脚本Redis key 自增/自减 这类能力:

  1. 获取许可 (acquire)

    Lua 脚本会检查是否存在足够的许可值(通常是 Redis 中的某个 key 记录当前可用许可数);

    如果足够,则进行减操作;否则阻塞或返回失败。

  2. 释放许可 (release)

    将 Redis 中存储的许可数 +1,并通知阻塞挂起中的线程(如果有)。

  3. 可重入机制(可选场景):
    一些分布式信号量场景还会考虑同一个线程可否重复获取许可,但多数情况下信号量都是面向线程独立的计数控制,所以可能不涉及可重入。

核心结构

RedissonSemaphore 源码示意,整体只展示关键思路:

public class RedissonSemaphore extends RedissonBaseDistributedObject implements RSemaphore {
   

    public RedissonSemaphore(CommandAsyncExecutor commandExecutor, String name) {
   
        super(commandExecutor, name);
    }

    @Override
    public void acquire() throws InterruptedException {
   
        tryAcquire(-1, TimeUnit.MILLISECONDS); 
    }

    @Override
    public boolean tryAcquire(long waitTime, TimeUnit unit) throws InterruptedException {
   
        return get(tryAcquireAsync(waitTime, unit, 1));
    }

    @Override
    public void release() {
   
        get(releaseAsync(1));
    }

    // ...
}

RedissonSemaphore 依托一些异步方法,例如 tryAcquireAsyncreleaseAsync 等,这些方法内部最终会组装 Lua 脚本并发送到 Redis,来做自增自减和阻塞通知。

获取许可

tryAcquireAsync 中,一般会:

  1. 先检查 Redis 中的可用许可数(key 的数值)是否足够;
  2. 若足够,做 key = key - 1
  3. 若不足则阻塞或直接返回 false(根据 tryAcquire 的超时时间和模式)。
private RFuture<Boolean> tryAcquireAsync(long waitTime, TimeUnit unit, int permits) {
   
    // Lua脚本核心:判断key是否 >= permits,否则阻塞/返回
    // 执行成功后减去相应数量
    return commandExecutor.evalWriteAsync(
      getName(), 
      RedisCommands.EVAL_BOOLEAN, 
      "local currVal = redis.call('get', KEYS[1]); " + 
      "if (currVal ~= false and tonumber(currVal) >= tonumber(ARGV[1])) then " +
          "redis.call('decrby', KEYS[1], ARGV[1]); " + 
          "return 1; " +
      "end; " +
      "return 0;", 
      Collections.<Object>singletonList(getName()),
      permits);
}

此处脚本只是示意,真实的 Redisson 源码会更全面,比如要处理阻塞和等待通知的逻辑。

释放许可

与获取许可对立,release 时关键就是将 Redis 中存储的许可数 +1,或一次性释放多个许可:

private RFuture<Long> releaseAsync(int permits) {
   
    // Lua脚本:对key执行incr操作
    return commandExecutor.evalWriteAsync(
      getName(),
      RedisCommands.EVAL_LONG,
      "return redis.call('incrby', KEYS[1], ARGV[1]);",
      Collections.<Object>singletonList(getName()),
      permits);
}

当可用许可数从 0 变为正数,会通知等待获取许可的线程。

CountDownLatch 的实现思路

Redisson 的 RCountDownLatch 和 Semaphore 类似,同样通过 Redis 来维护一个“倒计数值”。当这个值大于 0 时,需要阻塞调用 await() 的线程;当计数归零后,所有阻塞线程会被唤醒。

核心结构

源码示意:

public class RedissonCountDownLatch extends RedissonBaseDistributedObject implements RCountDownLatch {
   

    public RedissonCountDownLatch(CommandAsyncExecutor commandExecutor, String name) {
   
        super(commandExecutor, name);
    }

    @Override
    public void await() throws InterruptedException {
   
        get(awaitAsync());
    }

    @Override
    public void countDown() {
   
        get(countDownAsync());
    }

    @Override
    public boolean trySetCount(long count) {
   
        return get(trySetCountAsync(count));
    }

    // ...
}

我们可以看到几个核心方法:

  • trySetCount:初始化或重置倒计数值。
  • countDown:使计数值 -1。
  • await:阻塞等待,直到计数值清零。

初始化计数

trySetCount 负责在 Redis 中设置 key 的初始计数值:

private RFuture<Boolean> trySetCountAsync(long count) {
   
    // 如果key不存在,则设置它为 count
    // 如果已经存在,则返回false 表示无法再次设置
    return commandExecutor.evalWriteAsync(
        getName(),
        RedisCommands.EVAL_BOOLEAN,
        "if redis.call('exists', KEYS[1]) == 0 then " +
            "redis.call('set', KEYS[1], ARGV[1]); " +
            "return 1; " +
        "end; " +
        "return 0;",
        Collections.singletonList(getName()),
        count
    );
}

若 key 已存在,这次设置就会失败,说明已经有倒计数在进行中。

递减计数

countDown 的 Lua 脚本则做 decrement 操作。如果计数归零,就发布一个消息或事件,唤醒所有 await() 中阻塞的线程(它们往往在 Redis 中通过订阅频道或等待队列的方式实现阻塞唤醒)。

private RFuture<Long> countDownAsync() {
   
    return commandExecutor.evalWriteAsync(
        getName(),
        RedisCommands.EVAL_LONG,
        "local val = redis.call('decr', KEYS[1]); " + 
        // 如果归零,发布消息到特定 channel,唤醒等待方
        "if val <= 0 then " +
            "redis.call('publish', KEYS[2], '0'); " +
        "end;" +
        "return val;",
        Arrays.asList(getName(), getChannelName(getName()))
    );
}

在真实代码中,getChannelName 通常会加工出一个类似 redisson_countdownlatch:{...} 的频道名称,用于发布/订阅机制来进行分布式唤醒。

阻塞等待

await 在分布式场景下,会订阅 Redis 的特定频道。当倒计数归零时,服务器端执行 PUBLISH,客户端这边接收到消息后就会解除阻塞,完成 CountDownLatch 的分布式同步。

一致性与原子性保障

与之前说到的分布式读写锁类似,Semaphore 与 CountDownLatch 的实现同样依赖 Redis 原子操作 + Lua 脚本

  1. 原子操作:例如 INCRBYDECRBYPUBLISH 等,都能够在服务器端一次性完成。
  2. Lua脚本:复杂逻辑(判断是否够许可、是否应该发布通知等)都打包到 Lua 脚本中执行,保证操作的“不可分割性”。
  3. 发布/订阅:当许可数或倒计数发生关键性改变时,会 publish 到对应的 channel,唤醒在不同节点上阻塞的线程。

利用这些特性,Redisson 让一台 JVM 中的 tryAcquire()countDown() 等操作,可以对远程 Redis 的数据产生影响,并同步反馈给其他 JVM 中等待的线程,实现分布式级别的互联和协作。

小结

Semaphore 与 CountDownLatch 是高并发编程中两种常见的“并发工具类”,Redisson 在其基础上通过以下手段将它们延伸到分布式维度:

  1. 借助 Redis 存储计数或许可值,全局共享。
  2. 使用 Lua 脚本保证操作的原子性
  3. 利用 Redis 订阅发布(Pub/Sub)机制 为阻塞等待的线程发出唤醒通知。

这样,我们就能在分布式环境中“像用本地工具类一样”去使用信号量或倒数闩。通过阅读源码,我们可以更好地理解其内部原理,从而在分布式系统设计和排错中游刃有余。希望本文能带给你新的收获,下一期再见!

目录
相关文章
|
5月前
|
存储 负载均衡 NoSQL
【赵渝强老师】Redis Cluster分布式集群
Redis Cluster是Redis的分布式存储解决方案,通过哈希槽(slot)实现数据分片,支持水平扩展,具备高可用性和负载均衡能力,适用于大规模数据场景。
412 2
|
4月前
|
消息中间件 缓存 NoSQL
Redis各类数据结构详细介绍及其在Go语言Gin框架下实践应用
这只是利用Go语言和Gin框架与Redis交互最基础部分展示;根据具体业务需求可能需要更复杂查询、事务处理或订阅发布功能实现更多高级特性应用场景。
323 86
|
4月前
|
存储 消息中间件 NoSQL
Redis数据结构:别小看这5把“瑞士军刀”,用好了性能飙升!
Redis提供5种基础数据结构及多种高级结构,如String、Hash、List、Set、ZSet,底层通过SDS、跳表等实现高效操作。灵活运用可解决缓存、计数、消息队列、排行榜等问题,结合Bitmap、HyperLogLog、GEO更可应对签到、UV统计、地理位置等场景,是高性能应用的核心利器。
|
4月前
|
存储 缓存 NoSQL
Redis基础命令与数据结构概览
Redis是一个功能强大的键值存储系统,提供了丰富的数据结构以及相应的操作命令来满足现代应用程序对于高速读写和灵活数据处理的需求。通过掌握这些基础命令,开发者能够高效地对Redis进行操作,实现数据存储和管理的高性能方案。
142 12
|
4月前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
310 0
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
4月前
|
缓存 NoSQL 关系型数据库
Redis缓存和分布式锁
Redis 是一种高性能的键值存储系统,广泛用于缓存、消息队列和内存数据库。其典型应用包括缓解关系型数据库压力,通过缓存热点数据提高查询效率,支持高并发访问。此外,Redis 还可用于实现分布式锁,解决分布式系统中的资源竞争问题。文章还探讨了缓存的更新策略、缓存穿透与雪崩的解决方案,以及 Redlock 算法等关键技术。
|
4月前
|
存储 消息中间件 NoSQL
【Redis】常用数据结构之List篇:从常用命令到典型使用场景
本文将系统探讨 Redis List 的核心特性、完整命令体系、底层存储实现以及典型实践场景,为读者构建从理论到应用的完整认知框架,助力开发者在实际业务中高效运用这一数据结构解决问题。
|
4月前
|
存储 缓存 NoSQL
【Redis】 常用数据结构之String篇:从SET/GET到INCR的超全教程
无论是需要快速缓存用户信息,还是实现高并发场景下的精准计数,深入理解String的特性与最佳实践,都是提升Redis使用效率的关键。接下来,让我们从基础命令开始,逐步揭开String数据结构的神秘面纱。
|
8月前
|
存储 NoSQL 算法
Redis设计与实现——数据结构与对象
Redis 是一个高性能的键值存储系统,其数据结构设计精妙且高效。主要包括以下几种核心数据结构:SDS、链表、字典、跳跃表、整数集合、压缩列表。此外,Redis 对象通过类型和编码方式动态转换,优化内存使用,并支持引用计数、共享对象和淘汰策略(如 LRU/LFU)。这些特性共同确保 Redis 在性能与灵活性之间的平衡。
|
存储 消息中间件 NoSQL
Redis 数据结构与对象
【10月更文挑战第15天】在实际应用中,需要根据具体的业务需求和数据特点来选择合适的数据结构,并合理地设计数据模型,以充分发挥 Redis 的优势。
256 64