【JUC】信号量Semaphore详解

简介: 【JUC】信号量Semaphore详解

前言


大家应该都用过synchronized 关键字加锁,用来保证某个时刻只允许一个线程运行。那么如果控制某个时刻允许指定数量的线程执行,有什么好的办法呢? 答案就是JUC提供的信号量Semaphore


介绍和使用


  • Semaphore(信号量)可以用来限制能同时访问共享资源的线程上限,它内部维护了一个许可的变量,也就是线程许可的数量
  • Semaphore的许可数量如果小于0个,就会阻塞获取,直到有线程释放许可
  • Semaphore是一个非重入锁


API介绍


  1. 构造方法
  • public Semaphore(int permits)permits 表示许可线程的数量
  • public Semaphore(int permits, boolean fair)fair 表示公平性,如果设为 true,表示是公平,那么等待最久的线程先执行
  1. 常用API
  • public void acquire():表示一个线程获取1个许可,那么线程许可数量相应减少一个
  • public void release():表示释放1个许可,那么线程许可数量相应会增加
  1. 其他API
  • void acquire(int permits):表示一个线程获取n个许可,这个数量由参数permits决定
  • void release(int permits):表示一个线程释放n个许可,这个数量由参数permits决定
  • int availablePermits():返回当前信号量线程许可数量
  • int getQueueLength(): 返回等待获取许可的线程数的预估值


基本使用


public static void main(String[] args) {
        // 1. 创建 semaphore 对象
        Semaphore semaphore = new Semaphore(2);
        // 2. 10个线程同时运行
        for (int i = 0; i < 8; i++) {
            new Thread(() -> {
                // 3. 获取许可
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    log.debug("running...");
                    sleep(1);
                    log.debug("end...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 4. 释放许可
                    semaphore.release();
                }
            }).start();
        }
    }

运行结果:

1671202010245.jpg


原理介绍


1671202031449.jpg


上面是Semaphore的类结构图,其中FairSyncNonfairSync是它的内部类,他们共同继承了AQS类,AQS的共享模式提供了Semaphore的加锁、解锁。

如果对AQS不了解的请移步深入浅出理解Java并发AQS的共享锁模式

为了更好的搞懂原理,我们通过一个例子来帮助我们理解。

假设Semaphorepermits为 3,这时 5 个线程来获取资源,其中Thread-1Thread-2Thread-4CAS 竞争成功,permits 变为 0,而 Thread-0 Thread-3 竞争失败。

1671202039535.jpg


获取许可acquire()


  • acquire()主方法会调用 sync.acquireSharedInterruptibly(1)方法
  • acquireSharedInterruptibly()方法会先调用tryAcquireShared()方法返回许可的数量,如果小于0个,调用doAcquireSharedInterruptibly()方法进入阻塞
// acquire() -> sync.acquireSharedInterruptibly(1),可中断
public final void acquireSharedInterruptibly(int arg) {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 尝试获取通行证,获取成功返回 >= 0的值
    if (tryAcquireShared(arg) < 0)
        // 获取许可证失败,进入阻塞
        doAcquireSharedInterruptibly(arg);
}
  • tryAcquireShared()方法在终会调用到Sync#nonfairTryAcquireShared()方法
  • nonfairTryAcquireShared()方法中会减去获取的许可数量,返回剩余的许可数量
// tryAcquireShared() -> nonfairTryAcquireShared()
// 非公平,公平锁会在循环内 hasQueuedPredecessors()方法判断阻塞队列是否有临头节点(第二个节点)
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        // 获取 state ,state 这里【表示通行证】
        int available = getState();
        // 计算当前线程获取通行证完成之后,通行证还剩余数量
        int remaining = available - acquires;
        // 如果许可已经用完, 返回负数, 表示获取失败,
        if (remaining < 0 ||
            // 许可证足够分配的,如果 cas 重试成功, 返回正数, 表示获取成功
            compareAndSetState(available, remaining))
            return remaining;
    }
}
  • 如果剩余的许可数量<0, 会调用doAcquireSharedInterruptibly()方法将当前线程加入到阻塞队列中阻塞
  • 方法中调用parkAndCheckInterrupt()阻塞当前线程
private void doAcquireSharedInterruptibly(int arg) {
    // 将调用 Semaphore.aquire 方法的线程,包装成 node 加入到 AQS 的阻塞队列中
    final Node node = addWaiter(Node.SHARED);
    // 获取标记
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 前驱节点是头节点可以再次获取许可
            if (p == head) {
                // 再次尝试获取许可,【返回剩余的许可证数量】
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 成功后本线程出队(AQS), 所在 Node设置为 head
                    // r 表示【可用资源数】, 为 0 则不会继续传播
                    setHeadAndPropagate(node, r); 
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // 被打断后进入该逻辑
        if (failed)
            cancelAcquire(node);
    }
}

最终的AQS状态如下图所示:

  • Thread-1Thread-2Thread-4正常运行
  • AQS的state也就是等于0
  • Thread-0Thread-3再阻塞队列中

1671202055360.jpg


释放许可release()


现在Thread-4运行完毕,要释放许可,Thread-0Thread-3又是如何恢复执行的呢?

  • 调用release()方法释放许可,最终调用 Sync#releaseShared()方法
  • 如果方法tryReleaseShared(arg)尝试释放许可成功,那么调用doReleaseShared();进行唤醒
// release() -> releaseShared()
public final boolean releaseShared(int arg) {
    // 尝试释放锁
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }    
    return false;
}
  • tryReleaseShared()方法主要是尝试释放许可
  • 获取当前许可数量 + 释放的数量,然后通过cas设置回去
protected final boolean tryReleaseShared(int releases) {    
    for (;;) {
        // 获取当前锁资源的可用许可证数量
        int current = getState();
        int next = current + releases;
        // 索引越界判断
        if (next < current)            
            throw new Error("Maximum permit count exceeded");        
        // 释放锁
        if (compareAndSetState(current, next))            
            return true;    
    }
}
  • 调用doReleaseShared()方法唤醒队列中的线程
  • 其中unparkSuccessor()方法是唤醒的核心操作
// 唤醒
private void doReleaseShared() {
    // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark 
    // 如果 head.waitStatus == 0 ==> Node.PROPAGATE    
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // 防止 unparkSuccessor 被多次执行
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 唤醒后继节点
                unparkSuccessor(h);
            }
            // 如果已经是 0 了,改为 -3,用来解决传播性
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)
            break;
    }
}

最终AQS状态如下图所示:


1671202069778.jpg


  • 许可state变回1
  • 然后Thread-0开始竞争,如果竞争成功,如下图所示:

1671202078983.jpg


  • 由于Thread-0竞争成功,再次获取到许可,许可数量减1,最终又变回0
  • 然后等待队列中剩余Thread-3


总结


Semaphore信号量类基于AQS的共享锁实现,有公平锁和非公平锁两个版本,它用来限制能同时访问共享资源的线程上限,典型的应用场景是可以用来保护有限的公共资源,比如数据库连接等。

目录
相关文章
|
Oracle Java 关系型数据库
Random和ThreadLocalRandom区别
Random和ThreadLocalRandom区别
201 3
|
Kubernetes 容器 Perl
k8s配置hosts域名的几种方式
k8s配置hosts域名的几种方式
2677 0
|
Prometheus 监控 Cloud Native
一文带你吃透MySQL性能监控解决方案:Prometheus+Grafana
MySQL性能监控解决方案:Prometheus+Grafana问题描述 在对MySQL进行主从复制、分库分表等架构之后,MySQL的节点数量变得越来越多,无法实时监控到每一台MySQL节点,此时应当如何处理? 问题分析与解决方案 针对上面的问题,需要用Prometheus + Grafana对服务器进行统一监控、规划与报警,时刻关注服务器的响应情况。当出现宕机或异常时,Grafana可迅速通过短信、钉钉、邮件等方式通知相关人员,进而快速对生产环节进行补救。 Prometheus概述与适用场景 Prometheus 是 一 个 开 源 的 服 务 监 控 系 统 和 时 间 序 列 数 据
2127 0
|
开发框架 架构师 Java
《深入理解分布式事务:原理与实战》,不可错过的精品!
在分布式应用系统中,特别是在金融相关的场景下,分布式事务是大家都关注的核心技术,同样也是系统的技术难点。本书从数据库和服务的分布式基础开始,由浅入深阐述了分布式事务的原理、解决方案。这种以框架开发者视角分享的分布式事务实现的源码和实践用例,对于应用架构师和开发者都有极大的价值。
5304 1
《深入理解分布式事务:原理与实战》,不可错过的精品!
|
7月前
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
Windows中如何查看被占用的端口、杀掉对应的进程
这篇文章介绍了在Windows系统中如何查看被占用的端口号以及如何杀掉占用端口的进程,包括使用命令提示符的`netstat -ano | findstr 端口号`命令查找进程PID,然后通过任务管理器或`taskkill /PID PID号`命令来结束进程。
Windows中如何查看被占用的端口、杀掉对应的进程
idea没有Maven运行界面怎么办
idea没有Maven运行界面怎么办
|
存储 负载均衡 监控
何为微服务、网关、服务发现/注册?
随着互联网业务复杂性慢慢提高,单机服务的架构慢慢出现了生产效率问题 微服务架构带来的有优点也有缺点,使用前需要调研清楚 微服务架构的网关设计、服务注册/发现、配置管理都是关键点
何为微服务、网关、服务发现/注册?
|
设计模式 安全 Java
|
消息中间件 Kafka
MQ 学习日志(八) 消息队列的延时以及过期失效问题处理
消息队列的延时以及过期失效问题处理
494 0