前言
目前正在出一个Java多线程专题
长期系列教程,从入门到进阶含源码解读
, 篇幅会较多, 喜欢的话,给个关注❤️ ~
Java提供了一些非常好用的并发工具类,不需要我们重复造轮子,本节我们讲解Semaphore
,一起来看下吧~
Semaphore
它就是我们之前在讲源码的时候提到的信号量
,下面看下它的构造函数
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } 复制代码
从构造函数可以看出,它可以传入指定数量
的资源和指定公平和非公平锁
,公平和非公平就不多阐述了,可以参考之前的文章。
我们重点关注的是acquire()和release()
, 这两个方法字面意思很好理解, Semaphore
往往用于资源有限的场景,比如我们需要限制某个操作的线程数量。下面通过例子感受一下
public class SemaphoreTest { public static final class Task implements Runnable { private int num; private Semaphore semaphore; public Task(int num, Semaphore semaphore) { this.num = num; this.semaphore = semaphore; } @Override public void run() { try { // 获取 semaphore.acquire(); System.out.println(String.format("num: %d, 剩余%d个资源, 还有%d个线程在等待", num, semaphore.availablePermits(), semaphore.getQueueLength())); System.out.println(System.currentTimeMillis()); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 释放 System.out.println("释放资源"); semaphore.release(); } } } public static void main(String[] args) { Semaphore semaphore = new Semaphore(2); IntStream.range(0, 20).forEach(i -> new Thread(new Task(i, semaphore)).start()); } } 复制代码
实际输出:
num: 1, 剩余0个资源, 还有0个线程在等待 1657591518171 num: 0, 剩余1个资源, 还有0个线程在等待 1657591518172 释放资源 .... 释放资源 num: 18, 剩余0个资源, 还有1个线程在等待 1657591545235 num: 19, 剩余0个资源, 还有0个线程在等待 1657591545236 释放资源 释放资源 进程已结束,退出代码0 复制代码
源码剖析
我们重点看下acquire()
源码实现.
从这个信号量获取一个许可,阻塞直到有一个可用,或者线程被中断。获得一个许可,如果一个可用并立即返回,将可用许可的数量减少一个
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } 复制代码
重点是这个sync
// 首先它继承 AbstractQueuedSynchronizer 这个大家肯定不陌生了 就是AQS abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; // 初始化的时候会写入一个状态 Sync(int permits) { setState(permits); } // 获取当前的状态 final int getPermits() { return getState(); } // 非公平方式获取信号量 final int nonfairTryAcquireShared(int acquires) { for (;;) { // 当前可获取的 int available = getState(); // 计算剩余数量 int remaining = available - acquires; // 如果剩余数量大于0 就是进行cas修改 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // 释放信号量 protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); // 释放后剩余的数量 int next = current + releases; // 如果超出就 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; // 超出最大限量抛异常 if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } } 复制代码
在构造函数中FairSync
和NonfairSync
他们都继承Sync
sync = fair ? new FairSync(permits) : new NonfairSync(permits); 复制代码
默认情况下非公平的Semaphore
会去调用Sync
的nonfairTryAcquireShared
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } 复制代码
公平的Semaphore
内部实现了tryAcquireShared()
static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } 复制代码
下面我们再回过头看下acquire()
, 内部方法acquireSharedInterruptibly
是AQS的内部方法
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } 复制代码
如果线程中断,直接抛异常, 如果没拿到资源就进入排队机制
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 可获取的资源数小于0进入排队 这里的实现在子类,就是上边提到的 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } 复制代码
重点看下这个doAcquireSharedInterruptibly()
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 以共享模式加入到阻塞队列 之前讲源码的时候都讲过 final Node node = addWaiter(Node.SHARED); // 默认失败 boolean failed = true; try { for (;;) { // 获取前置节点 final Node p = node.predecessor(); // 如果上一个节点就是头部节点 再次尝试获取 (原因是头部节点可能释放资源了) if (p == head) { int r = tryAcquireShared(arg); // 如果获取到了 并且还有剩余资源 if (r >= 0) { // 1. 将当前节点设置为头部节点 // 2. 判断后续节点是否是共享等待节点 // 3. 唤醒后续的节点 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 这一步主要是检查未能获取到资源的节点状态 // 如果线程需要阻塞返回true // parkAndCheckInterrupt 如果线程中断了 抛出异常 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { // 如果失败 取消正在进行的获取 if (failed) cancelAcquire(node); } } 复制代码
shouldParkAfterFailedAcquire()
的细节我们也来看下,可能有的同学不大清楚
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 只要前置节点释放锁,就会通知标识为SIGNAL(-1)状态的后续节点的线程 // 如果前置节点为SIGNAL,只需要等待其他前置节点的线程被释放, if (ws == Node.SIGNAL) return true; // 这里的判断指的是取消状态, 如果取消了就讲这个节点移除掉 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // cas 更新 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 复制代码
这里的SIGNAL
一类的常量,大家可以自行到源码查看,这也是细节地方。发现这段代码主要的作用就是检查节点状态,对后续节点做一些操作,这里并没有阻塞
操作,下面我们看下parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } 复制代码
这里我们可以看到加了锁
,所以阻塞发生在这。那么释放锁在哪呢?其实在release
阶段
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } 复制代码
可以看到在unparkSuccessor
中进行了锁的释放,这个过程发生在释放阶段
release()
相对简单一些,大家可以自己对着源码看下,实现有些类似
结束语
其实本节带大家看源码,主要是想给大家讲下共享锁
的知识,Semaphore
其实就是使用了共享锁。另外AQS
这个类很值得大家好好研究一下,你会发现很多的好用的类都是基于它实现,之前我们讲源码的时候也都遇到了,有兴趣可以了解一下。下节给大家讲下Exchanger
~