面试官: Semaphore在项目中有使用过吗?说说看(源码剖析)

简介: 面试官: Semaphore在项目中有使用过吗?说说看(源码剖析)

前言

目前正在出一个Java多线程专题长期系列教程,从入门到进阶含源码解读, 篇幅会较多, 喜欢的话,给个关注❤️ ~


Java提供了一些非常好用的并发工具类,不需要我们重复造轮子,本节我们讲解Semaphore,一起来看下吧~


Semaphore

它就是我们之前在讲源码的时候提到的信号量,下面看下它的构造函数

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
复制代码


从构造函数可以看出,它可以传入指定数量的资源和指定公平和非公平锁,公平和非公平就不多阐述了,可以参考之前的文章。


我们重点关注的是acquire()和release(), 这两个方法字面意思很好理解, Semaphore往往用于资源有限的场景,比如我们需要限制某个操作的线程数量。下面通过例子感受一下

public class SemaphoreTest {
    public static final class Task implements Runnable {
        private int num;
        private Semaphore semaphore;
        public Task(int num, Semaphore semaphore) {
            this.num = num;
            this.semaphore = semaphore;
        }
        @Override
        public void run() {
            try {
                // 获取
                semaphore.acquire();
                System.out.println(String.format("num: %d, 剩余%d个资源, 还有%d个线程在等待", num, semaphore.availablePermits(), semaphore.getQueueLength()));
                System.out.println(System.currentTimeMillis());
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 释放
                System.out.println("释放资源");
                semaphore.release();
            }
        }
    }
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);
        IntStream.range(0, 20).forEach(i -> new Thread(new Task(i, semaphore)).start());
    }
}
复制代码


实际输出:

num: 1, 剩余0个资源, 还有0个线程在等待
1657591518171
num: 0, 剩余1个资源, 还有0个线程在等待
1657591518172
释放资源
....
释放资源
num: 18, 剩余0个资源, 还有1个线程在等待
1657591545235
num: 19, 剩余0个资源, 还有0个线程在等待
1657591545236
释放资源
释放资源
进程已结束,退出代码0
复制代码


源码剖析

我们重点看下acquire()源码实现.


从这个信号量获取一个许可,阻塞直到有一个可用,或者线程被中断。获得一个许可,如果一个可用并立即返回,将可用许可的数量减少一个

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
复制代码


重点是这个sync

// 首先它继承 AbstractQueuedSynchronizer 这个大家肯定不陌生了 就是AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
    // 初始化的时候会写入一个状态
    Sync(int permits) {
        setState(permits);
    }
    // 获取当前的状态
    final int getPermits() {
        return getState();
    }
    // 非公平方式获取信号量
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            // 当前可获取的
            int available = getState();
            // 计算剩余数量
            int remaining = available - acquires;
            // 如果剩余数量大于0 就是进行cas修改
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    // 释放信号量
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            // 释放后剩余的数量
            int next = current + releases;
            // 如果超出就
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            // 超出最大限量抛异常
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }
    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}
复制代码


在构造函数中FairSyncNonfairSync他们都继承Sync

sync = fair ? new FairSync(permits) : new NonfairSync(permits);
复制代码


默认情况下非公平的Semaphore会去调用SyncnonfairTryAcquireShared

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
        NonfairSync(int permits) {
            super(permits);
        }
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
复制代码


公平的Semaphore内部实现了tryAcquireShared()

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
    FairSync(int permits) {
        super(permits);
    }
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}
复制代码


下面我们再回过头看下acquire(), 内部方法acquireSharedInterruptibly是AQS的内部方法

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
复制代码


如果线程中断,直接抛异常, 如果没拿到资源就进入排队机制

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
        // 可获取的资源数小于0进入排队 这里的实现在子类,就是上边提到的
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
复制代码


重点看下这个doAcquireSharedInterruptibly()

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 以共享模式加入到阻塞队列 之前讲源码的时候都讲过     
    final Node node = addWaiter(Node.SHARED);
    // 默认失败
    boolean failed = true;
    try {
        for (;;) {
            // 获取前置节点
            final Node p = node.predecessor();
            // 如果上一个节点就是头部节点 再次尝试获取 (原因是头部节点可能释放资源了)
            if (p == head) {
                int r = tryAcquireShared(arg);
                // 如果获取到了 并且还有剩余资源
                if (r >= 0) {
                    // 1. 将当前节点设置为头部节点
                    // 2. 判断后续节点是否是共享等待节点
                    // 3. 唤醒后续的节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 这一步主要是检查未能获取到资源的节点状态
            // 如果线程需要阻塞返回true
            // parkAndCheckInterrupt 如果线程中断了 抛出异常
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // 如果失败 取消正在进行的获取
        if (failed)
            cancelAcquire(node);
    }
}
复制代码


shouldParkAfterFailedAcquire()的细节我们也来看下,可能有的同学不大清楚

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 只要前置节点释放锁,就会通知标识为SIGNAL(-1)状态的后续节点的线程
        // 如果前置节点为SIGNAL,只需要等待其他前置节点的线程被释放,
        if (ws == Node.SIGNAL)
            return true;
        // 这里的判断指的是取消状态, 如果取消了就讲这个节点移除掉
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // cas 更新
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
复制代码


这里的SIGNAL一类的常量,大家可以自行到源码查看,这也是细节地方。发现这段代码主要的作用就是检查节点状态,对后续节点做一些操作,这里并没有阻塞操作,下面我们看下parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
复制代码


这里我们可以看到加了,所以阻塞发生在这。那么释放锁在哪呢?其实在release阶段

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
复制代码


可以看到在unparkSuccessor中进行了锁的释放,这个过程发生在释放阶段

release()相对简单一些,大家可以自己对着源码看下,实现有些类似


结束语

其实本节带大家看源码,主要是想给大家讲下共享锁的知识,Semaphore其实就是使用了共享锁。另外AQS这个类很值得大家好好研究一下,你会发现很多的好用的类都是基于它实现,之前我们讲源码的时候也都遇到了,有兴趣可以了解一下。下节给大家讲下Exchanger ~

相关文章
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
110 2
|
5月前
|
JavaScript 前端开发 应用服务中间件
【Vue面试题三十】、vue项目本地开发完成后部署到服务器后报404是什么原因呢?
这篇文章分析了Vue项目在服务器部署后出现404错误的原因,主要是由于history路由模式下服务器缺少对单页应用的支持,并提供了通过修改nginx配置使用`try_files`指令重定向所有请求到`index.html`的解决方案。
【Vue面试题三十】、vue项目本地开发完成后部署到服务器后报404是什么原因呢?
|
5月前
|
JavaScript 前端开发
【Vue面试题二十五】、你了解axios的原理吗?有看过它的源码吗?
这篇文章主要讨论了axios的使用、原理以及源码分析。 文章中首先回顾了axios的基本用法,包括发送请求、请求拦截器和响应拦截器的使用,以及如何取消请求。接着,作者实现了一个简易版的axios,包括构造函数、请求方法、拦截器的实现等。最后,文章对axios的源码进行了分析,包括目录结构、核心文件axios.js的内容,以及axios实例化过程中的配置合并、拦截器的使用等。
【Vue面试题二十五】、你了解axios的原理吗?有看过它的源码吗?
|
28天前
|
Java 数据库连接 Maven
最新版 | 深入剖析SpringBoot3源码——分析自动装配原理(面试常考)
自动装配是现在面试中常考的一道面试题。本文基于最新的 SpringBoot 3.3.3 版本的源码来分析自动装配的原理,并在文未说明了SpringBoot2和SpringBoot3的自动装配源码中区别,以及面试回答的拿分核心话术。
最新版 | 深入剖析SpringBoot3源码——分析自动装配原理(面试常考)
|
5月前
|
JavaScript 前端开发
【Vue面试题二十七】、你了解axios的原理吗?有看过它的源码吗?
文章讨论了Vue项目目录结构的设计原则和实践,强调了项目结构清晰的重要性,提出了包括语义一致性、单一入口/出口、就近原则、公共文件的绝对路径引用等原则,并展示了单页面和多页面Vue项目的目录结构示例。
|
1月前
|
存储 缓存 Java
Spring面试必问:手写Spring IoC 循环依赖底层源码剖析
在Spring框架中,IoC(Inversion of Control,控制反转)是一个核心概念,它允许容器管理对象的生命周期和依赖关系。然而,在实际应用中,我们可能会遇到对象间的循环依赖问题。本文将深入探讨Spring如何解决IoC中的循环依赖问题,并通过手写源码的方式,让你对其底层原理有一个全新的认识。
65 2
|
4月前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
514 37
|
4月前
|
NoSQL Java Redis
面试官:项目中如何实现分布式锁?
面试官:项目中如何实现分布式锁?
114 6
面试官:项目中如何实现分布式锁?
|
3月前
|
JavaScript 前端开发
vue尚品汇商城项目-day01【8.路由跳转与传参相关面试题】
vue尚品汇商城项目-day01【8.路由跳转与传参相关面试题】
52 0
vue尚品汇商城项目-day01【8.路由跳转与传参相关面试题】
|
5月前
|
JavaScript 安全 前端开发
【Vue面试题二十九】、Vue项目中你是如何解决跨域的呢?
这篇文章介绍了Vue项目中解决跨域问题的方法,包括使用CORS设置HTTP头、通过Proxy代理服务器进行请求转发,以及在vue.config.js中配置代理对象的策略。
【Vue面试题二十九】、Vue项目中你是如何解决跨域的呢?

热门文章

最新文章