简介
Semaphore(信号量)是一个同步工具类,通过Semaphore可以控制同时访问共享资源的线程个数。
应用场景
Semaphore的主要应用场景:
- 资源并发控制:Semaphore可以限制对资源的并发访问。如:管理数据库连接池或线程池中的资源。
- 控制并发线程数:Semaphore可以控制同时执行的线程数量。如:控制同时访问某个接口的请求数量。
- 实现互斥锁:Semaphore可以通过设置参数permits(信号量数量)的值为1来实现互斥锁的功能,保证同一时间只有一个线程可以访问临界区。
- 控制任务流量:Semaphore可以限制任务的执行速率。如:控制某个任务在单位时间内的执行次数。
实现原理
在分析Semaphore的实现原理之前,先介绍一下信号量模型。
信号量模型
信号量模型是一个通用模型(即:与语言无关)。它主要由一个计数器、一个等待队列和三个方法组成,其中计数器和等待队列对外不可见,只能通过信号量模型提供的三个方法来访问它们。
信号量模型整体结构,如图所示:
图中:
init()方法:设置计数器的初始值。
down()方法:计数器的值-1,并根据当前计数器的值进行判断:
-
- 如果当前计数器的值小于0,则阻塞当前线程(即:没有获得信号量)。
- 如果当前计数器的值大于等于0,则执行当前线程(即:获得信号量)。
up()方法:计数器的值+1,并根据当前计数器的值进行判断:
-
- 如果当前计数器的值小于等于0,说明释放信号量之前等待队列中存在处于阻塞状态的线程,则唤醒等待队列中的一个线程,并将其从等待队列中移除。
注意:以上三个方法均是原子性操作。
整体流程
在Java中,信号量模型是通过Semaphore同步工具类实现的。整体流程如图所示:
处理流程:
1)某个线程尝试获取信号量,将当前可用的信号量数量-本次需要的信号量数得到当前最新的信号量数:
-
- 如果当前最新的信号量数大于等于0且通过CAS的方式更新当前最新的信号量数成功,则表示获取信号量成功,执行业务处理。
- 如果当前最新的信号量数小于0,则表示获取信号量失败,将当前线程封装成Node节点追加到等待队列的末尾,进入阻塞,等待被唤醒。
2)业务处理完成,释放信号量,将当前信号量数+本次释放的信号量数得到当前最新的信号量数。如果此时等待队列中存在阻塞的线程,则唤醒等待队列中阻塞的线程。
内部结构
Semaphore类结构定义,如图所示:
可以看到,Semaphore底层是基于AQS来实现。其中:
- AbstractQueuedSynchronizer:抽象队列同步器(简称AQS)。
- Sync:Semaphore的静态内部类,用于实现Semaphore的公共同步逻辑。
- FairSync:Semaphore的静态内部类,继承了Sync,用于实现公平模式。
- NonfairSync:Semaphore的静态内部类,继承了Sync,用于实现非公平模式。
源码如下:
public class Semaphore implements java.io.Serializable {
// 同步变量,类型为Sync
private final Sync sync;
/**
* Sync:Semaphore的静态内部类,它继承了AbstractQueuedSynchronizer,用于实现Semaphore的同步逻辑
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 初始化信号量数量
Sync(int permits) {
setState(permits);
}
// 获取当前信号量数量
final int getPermits() {
return getState();
}
/**
* 以非公平模式尝试获取信号量
*/
final int nonfairTryAcquireShared(int acquires) {
// 自旋
for (;;) {
// 获取当前可用的信号量数量
int available = getState();
// 获取剩余信号量数量(当前可用的信号量数量-本次需要的信号量数量)
int remaining = available - acquires;
// 如果剩余信号量数量小于0或者剩余信号量数量大于等于0且更新剩余信号量数量成功,则返回当前剩余信号量数量
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
/**
* 尝试释放信号量
*/
protected final boolean tryReleaseShared(int releases) {
// 自旋
for (;;) {
// 获取当前信号量数量
int current = getState();
// 当前信号量数量+本次释放的信号量数量
int next = current + releases;
// 当前信号量数量超过int类型的最大值(即:溢出)
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 当前信号量数量未超过int类型的最大值且更新信号量数量成功,则返回释放信号量成功
if (compareAndSetState(current, next))
return true;
}
}
// 扣减指定数量的信号量
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// 清空并返回当前信号量数量
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
/**
* NonfairSync:Semaphore的静态内部类,它继承了Sync,用于实现非公平模式
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
// 以非公平模式尝试获取信号量
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* FairSync:Semaphore的静态内部类,它继承了Sync,用于实现公平模式
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
// 以公平模式尝试获取信号量
protected int tryAcquireShared(int acquires) {
// 自旋
for (;;) {
/**
* 当前线程对应的节点不是头节点的下一个节点或者当前线程不是持有信号量的线程,则获取信号量失败
* 其中:
* 当前线程对应的节点不是头节点的下一个节点,则获取信号量失败,体现的是公平原则(即:先到先得)
* 当前线程不是当前持有信号量的线程,则获取信号量失败,体现的是可重入
*/
if (hasQueuedPredecessors())
return -1;
// 获取当前可用的信号量数量
int available = getState();
// 获取剩余信号量数量(当前可用的信号量数量-本次需要的信号量数量)
int remaining = available - acquires;
// 如果剩余信号量数量小于0或者剩余信号量数量大于等于0且更新剩余信号量数量成功,则返回当前剩余信号量数量
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
构造函数
Semaphore的构造函数:
/**
* 构造函数1
* permits:信号量数量,该参数值可能为负数,permits为负数时,必须要释放信号量后其他线程才能获取信号量
*/
public Semaphore(int permits) {
// 默认为非公平模式
sync = new NonfairSync(permits);
}
/**
* 构造函数2
* permits:信号量数量
* fair:true-公平模式,false-非公平模式
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
其中:
- 公平模式:获取信号量时按照先后顺序进行分配,保证等待最久的线程能够优先获得信号量。
- 非公平锁模式:获取信号量时不保证等待最久的线程能够优先获得信号量,而是根据系统调度算法选择合适的线程获取信号量。
核心方法
Semaphore的常用方法:
// 获取一个信号量
public void acquire() throws InterruptedException {
...}
// 获取指定数量的信号量
public void acquire(int permits) throws InterruptedException {
...}
// 获取一个信号量(忽略中断)
public void acquireUninterruptibly() {
...}
// 获取指定数量的信号量(忽略中断)
public void acquireUninterruptibly(int permits) {
...}
// 尝试获取一个信号量
public boolean tryAcquire() {
...}
// 尝试在指定时间内获取一个信号量
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
...}
// 尝试获取指定数量的信号量
public boolean tryAcquire(int permits) {
...}
// 尝试在指定时间内获取指定数量的信号量
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {
...}
// 释放一个信号量
public void release() {
...}
// 释放指定数量的信号量
public void release(int permits) {
...}
// 判断等待队列中是否存在阻塞的线程
public final boolean hasQueuedThreads() {
...}
// 获取等待队列中阻塞的线程数量
public final int getQueueLength() {
...}
// 获取等待队列中阻塞的线程集合
protected Collection<Thread> getQueuedThreads() {
...}
// 获取可用的信号量数量
public int availablePermits() {
...}
// 清空并返回当前信号量数量
public int drainPermits() {
...}
其中,最核心的是acquire()方法和release()方法。
acquire方法
Semaphore通过调用Semaphore#acquire()方法获取一个信号量:
- 如果(当前可用的信号量数量-1)大于等于0且通过CAS的方式更新当前最新的信号量数成功,则表示获取信号量成功,执行业务处理。
- 如果(当前可用的信号量数量-1)小于0,则表示获取信号量失败,将当前线程封装成Node节点添加到等待队列中,进入阻塞,等待被唤醒,如果线程阻塞过程中被其他线程中断,则抛出InterruptedException异常。
Semaphore#acquire()方法源码解析:
// java.util.concurrent.Semaphore#acquire()
// 获取一个信号量
public void acquire() throws InterruptedException {
// 以共享模式获取一个信号量
sync.acquireSharedInterruptibly(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
// 以共享模式获取一个信号量(AQS)
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试以共享模式获取一个信号量(由继承AbstractQueuedSynchronizer类的Semaphore.Sync类实现)
if (tryAcquireShared(arg) < 0)
// 如果获取信号量失败(即:当前可用的信号量数量-1<0),则将当前线程封装成Node节点添加到等待队列中
doAcquireSharedInterruptibly(arg);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
// 将当前线程封装成Node节点追加到等待队列的末尾,等待被唤醒(即:进入阻塞)
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 将封装当前线程的Node节点追加到等待队列的末尾(保证追加成功)
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 自旋
for (;;) {
// 获取Node节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点是头节点,则再次尝试获取信号量
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 设置当前节点为head节点,如果存在剩余资源,则唤醒下一个相邻的后继节点(即:向后传播)
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 如果前驱节点不是头节点或者获取信号量失败,则逆序遍历等待队列,找到可以唤醒自己的节点
if (shouldParkAfterFailedAcquire(p, node) &&
// 将自己挂起(即:阻塞)
parkAndCheckInterrupt())
// 如果阻塞的线程被其他线程中断,则抛出InterruptedException异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
release方法
Semaphore通过调用Semaphore#release()方法释放一个信号量:
- 如果(当前可用的信号量数量+1)未超过int类型的最大值,则释放信号量成功。如果此时等待队列中存在阻塞的线程,则唤醒等待队列中的阻塞的线程。
- 如果(当前可用的信号量数量+1)超过int类型的最大值,则释放信号量失败,抛出"Maximum permit count exceeded"异常信息。
Semaphore#release()方法源码解析:
// java.util.concurrent.Semaphore#release()
// 释放一个信号量
public void release() {
// 以共享模式释放一个信号量
sync.releaseShared(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
// 以共享模式获取一个信号量(AQS)
public final boolean releaseShared(int arg) {
// 尝试释放一个信号量(由继承AbstractQueuedSynchronizer类的Semaphore.Sync类实现)
if (tryReleaseShared(arg)) {
// 释放信号量成功,唤醒后继节点
doReleaseShared();
return true;
}
return false;
}
doReleaseShared()方法源码解析请移步主页查阅->「一文搞懂」AQS(抽象队列同步器)实现原理及源码解析。
使用示例
假设某个停车场有5个停车位,有8辆汽车想要进入停车场停车。
Semaphore示例代码:
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Semaphore;
/**
* @program: xxkfz-study
* @ClassName MyTest.java
* @author: xxkfz
* @create: 2024-02-24 21:18
* @description:
**/
@Slf4j
public class SemaphoreExample {
public static void main(String[] args) {
// 创建信号量,初始化信号量数为5(即:5个停车位)
Semaphore semaphore = new Semaphore(2, true);
// 创建8个线程,模拟8辆汽车进入停车场
for (int i = 0; i < 8; i++) {
int n = i;
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
try {
// 获取信号量
semaphore.acquire();
long time = (long) (Math.random() * 10 + n);
log.info(Thread.currentThread().getName() + "进入停车场,停车时间:{}秒", time);
// 模拟停车时长
Thread.sleep(time * 1000);
} finally {
log.info(Thread.currentThread().getName() + "离开停车场");
// 释放信号量
semaphore.release();
}
}
}, "第" + n + "号汽车").start();
}
}
}
执行结果:
10:04:45.249 [第0号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第0号汽车进入停车场,停车时间:7秒
10:04:45.249 [第1号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第1号汽车进入停车场,停车时间:3秒
10:04:48.264 [第1号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第1号汽车离开停车场
10:04:48.264 [第2号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第2号汽车进入停车场,停车时间:7秒
10:04:52.258 [第0号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第0号汽车离开停车场
10:04:52.258 [第6号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第6号汽车进入停车场,停车时间:14秒
10:04:55.278 [第2号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第2号汽车离开停车场
10:04:55.278 [第7号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第7号汽车进入停车场,停车时间:9秒
10:05:04.286 [第7号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第7号汽车离开停车场
10:05:04.286 [第5号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第5号汽车进入停车场,停车时间:13秒
10:05:06.271 [第6号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第6号汽车离开停车场
10:05:06.271 [第4号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第4号汽车进入停车场,停车时间:13秒
10:05:17.290 [第5号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第5号汽车离开停车场
10:05:17.290 [第3号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第3号汽车进入停车场,停车时间:8秒
10:05:19.273 [第4号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第4号汽车离开停车场
10:05:25.292 [第3号汽车] INFO com.xxkfz.simplememory.SemaphoreExample - 第3号汽车离开停车场