一、Semaphore简介
Semaphore,即信号量,是操作系统中的一个经典概念。在Java并发包(java.util.concurrent,简称JUC)中,Semaphore类实现了这一概念,用于控制同时访问特定资源的线程数量。与synchronized关键字和ReentrantLock类不同,Semaphore可以实现更为复杂的线程同步需求,允许一定数量的线程同时访问共享资源。
二、Semaphore的工作原理
Semaphore内部维护了一组许可证(permits)。每个线程在访问共享资源之前,必须先获取一个许可证。如果许可证不足,则线程将被阻塞,直到有许可证可用。当线程释放资源时,它会归还一个许可证,从而允许其他等待的线程获取资源。
通过控制许可证的数量,Semaphore
可以实现对共享资源访问的精细控制。例如,如果有一个需要限制并发访问次数的资源池,就可以使用Semaphore
来实现。
三、Semaphore的特性
- 公平性:
Semaphore
可以配置为公平的或非公平的。公平的Semaphore
将按照线程请求许可证的顺序来分配它们,而非公平的Semaphore
则不保证这种顺序。 - 可重用性:与
CountDownLatch
等一次性使用的同步工具不同,Semaphore
可以多次使用。一旦线程释放了许可证,其他线程就可以再次获取它。 - 资源池控制:通过调整许可证的数量,
Semaphore
可以灵活地控制对资源池的并发访问。这对于保护有限资源或实现流量控制非常有用。
四、Semaphore源码分析
首先,Semaphore类中的主要组成部分是一个继承自AbstractQueuedSynchronizer(AQS)的内部类Sync。AbstractQueuedSynchronizer是Java并发包中提供的一个用于构建锁和其他同步组件的基础框架。
public class Semaphore implements java.io.Serializable { // 当前的许可数 private final Sync sync; // 构造函数,创建一个具有给定许可数的Semaphore,默认非公平策略 public Semaphore(int permits) { sync = new Sync(permits); } // 构造函数,创建一个具有给定许可数的Semaphore,并指定公平策略 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } // 获取一个许可,如果当前没有可用许可,则等待 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 释放一个许可 public void release() { sync.releaseShared(1); } // ... 其他方法,如acquire(int), release(int), tryAcquire(), tryRelease(), drainPermits()等 // 同步器抽象类 abstract static class Sync extends AbstractQueuedSynchronizer { // 返回当前许可数 final int getPermits() { return getState(); } // ... 其他方法,如tryAcquireShared(), tryReleaseShared()等 // 提供给子类实现的公平锁和非公平锁 // 这些方法需要由子类根据公平策略来实现 protected abstract boolean tryAcquireShared(int acquires); protected abstract boolean tryReleaseShared(int releases); // 减少许可数 final void reducePermits(int reductions) { // ... 实现细节 } // ... 其他方法 } // 非公平同步器 static final class NonfairSync extends Sync { // ... 实现tryAcquireShared()等方法 } // 公平同步器 static final class FairSync extends Sync { // ... 实现tryAcquireShared()等方法,保证FIFO顺序 } }
源码解读:
状态管理:Semaphore使用AQS的同步状态来管理许可的数量。在Semaphore中,同步状态表示当前可用的许可数。getState()方法返回当前的许可数。
许可的获取与释放:当线程调用acquire()方法时,它会尝试通过Sync的acquireSharedInterruptibly()方法获取一个许可。如果当前没有可用许可,线程将被阻塞。类似地,当线程调用release()方法时,它会通过Sync的releaseShared()方法释放一个许可,这可能会唤醒正在等待的线程。
公平性:Semaphore可以在创建时指定为公平或非公平。公平Semaphore确保等待时间最长的线程优先获得许可,而非公平Semaphore则不保证等待线程的获取顺序。这是通过FairSync和NonfairSync两个内部类来实现的,它们分别实现了tryAcquireShared()和tryReleaseShared()方法以提供不同的获取和释放策略。
等待队列:AQS维护了一个等待队列,用于管理等待获取许可的线程。当线程尝试获取许可但失败时(即许可不足),它将被添加到等待队列中。当其他线程释放许可时,等待队列中的线程可能会被唤醒并尝试再次获取许可。这个过程是由AQS的内部机制自动管理的。
自定义同步器:Semaphore通过扩展AQS并实现必要的方法来定义自己的同步语义。这包括实现tryAcquireShared()和tryReleaseShared()等方法,这些方法根据Semaphore的当前状态和公平性策略来决定是否允许线程获取或释放许可。
五、Semaphore的主要方法
Semaphore
类提供了几个主要的方法来操作许可证:
- acquire(): 获取一个许可证。如果当前没有可用的许可证,则线程将被阻塞,直到有许可证可用。
- release(): 释放一个许可证。这将增加可用许可证的数量,并可能唤醒正在等待的线程。
- acquire(int permits): 获取指定数量的许可证。如果当前没有足够的许可证,则线程将被阻塞。
- release(int permits): 释放指定数量的许可证。
此外,Semaphore
还提供了带有超时参数的acquire
方法,允许线程在等待指定时间后放弃获取许可证。
六、Semaphore实现控制资源的访问
我们使用Semaphore
来控制对资源访问,代码如下:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class SemaphoreExample { // 定义一个Semaphore,初始许可数为3 private static final Semaphore semaphore = new Semaphore(3); public static void main(String[] args) { // 创建一个固定大小的线程池 ExecutorService executor = Executors.newFixedThreadPool(5); // 模拟5个线程尝试访问一个只允许3个并发访问的资源 for (int i = 0; i < 5; i++) { final int threadId = i; executor.execute(() -> { try { // 获取一个许可 semaphore.acquire(); accessResource(threadId); // 访问完成后释放许可 semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }); } // 关闭线程池(这在实际应用中通常会在所有任务完成后进行) // executor.shutdown(); } // 模拟访问资源的方法 private static void accessResource(int threadId) throws InterruptedException { System.out.println("线程 " + threadId + " 获取到许可,开始访问资源。"); // 模拟资源访问时间 Thread.sleep(1000); System.out.println("线程 " + threadId + " 访问资源结束,释放许可。"); } }
代码解释:
- 我们首先定义了一个
Semaphore
对象,其初始许可数设置为3。这意味着最多允许3个线程同时访问某个资源。 - 然后,我们创建了一个固定大小为5的线程池,用于模拟5个线程尝试并发访问资源。
- 在循环中,我们为每个线程提交了一个任务到线程池。每个任务首先尝试通过调用
semaphore.acquire()
来获取一个许可。如果许可可用,线程将继续执行并访问资源;如果许可不可用(即已达到最大并发数),线程将被阻塞,直到有许可可用。 - 在
accessResource
方法中,我们模拟了线程访问资源的过程,通过Thread.sleep(1000)
来模拟资源访问所需的时间。 - 访问资源完成后,线程通过调用
semaphore.release()
来释放许可,这样其他等待的线程就有机会获取许可并访问资源。
七、适用场景
Semaphore
适用于以下场景:
- 保护有限资源:当需要限制对某个资源池的并发访问次数时,可以使用
Semaphore
。例如,数据库连接池或线程池。 - 流量控制:在需要限制对某个服务的并发请求数时,可以使用
Semaphore
作为限流器。这可以防止系统过载并保持稳定的性能。 - 实现复杂的同步模式:与其他同步工具结合使用,
Semaphore
可以实现更复杂的线程同步模式,如读写锁等。
总之,Semaphore
是Java并发编程中一个强大而灵活的同步工具。通过掌握其工作原理和特性,开发人员可以更好地应对多线程环境中的并发挑战。