1. 前言
在前面的文章我们讲解了CountDownLatch倒计时器的基本使用,本节我们继续来学习Java多线程编程中的一个工具类Semaphore信号量。
2. 什么是Semaphore?
Semaphore 是一个在多线程环境中用于控制对共享资源的访问的同步器(synchronizer),它是 Java 5 中引入的 java.util.concurrent(JUC)包的一部分。Semaphore 维护了一个许可集,线程在执行前必须从 Semaphore 获取一个许可。如果没有许可可用,线程将阻塞等待,直到其他线程释放许可。可以用来限制数据库连接数、限制服务器可处理请求数等。
3. Semaphore源码解读
我们跟进信号量的源码中浏览一圈,发现其实它内部主要的方法就2个:
// 初始共享资源数量 final Semaphore semaphore = new Semaphore(5); // 获取1个许可 semaphore.acquire(); // 释放1个许可 semaphore.release();
3.1 acquire():获取许可
3.1 acquire():获取许可
跟进这个方法后,我们会发现其内部调用了AQS的一个final 方法acquireSharedInterruptibly(),
/** * 共享模式下获取许可证,获取成功则返回,失败则加入阻塞队列,挂起线程 */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取许可证,arg为获取许可证个数,当可用许可证数减当前获取的许可证数结果小于0,则创建一个节点加入阻塞队列,挂起当前线程。 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
我们可以发现这个方法中又调用了tryAcquireShared(arg)方法,作为AQS中的钩子方法,这个方法的实现在Semaphore的两个静态内部类 FairSync(公平模式) 和 NonfairSync(非公平模式) 中。虽然这个方法在AQS中,但它作为钩子方法,最终的实现则回到了Semaphore的内部类中。
我们接下来继续查看在 FairSync(公平模式)中的tryAcquireShared方法
static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; // 获取可用的许可数 int available = getState(); // 计算扣减后的许可数 int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
这里就是获取剩余的许可数值,然后计算扣减后的许可数值,如果扣减后的值小于0,就会直接返回扣减数(||短路或,前者满足就不用判断后者了)。否认就会执行CAS操作compareAndSetState(available, remaining)扣减许可数的值。
这里我们就可知,Semaphore信号量的许可数是存放在AQS中的state变量中的(AQS源码我后续应该也会发文讲解),然后通过CAS来修改state的值,保证了操作的原子性。
3.2 release():释放许可
同样跟入这个方法,里面用了AQS的releaseShared(),而在这个方法内也毫无疑问的用了tryReleaseShared(int arg)这个钩子方法,原理同上,不再冗释,需要注意的是释放共享锁(也就是增加许可数state的值)的同时也会唤醒同步队列中的一个线程。
// 释放一个许可证 public void release() { sync.releaseShared(1); } // 释放共享锁,同时会唤醒同步队列中的一个线程。 public final boolean releaseShared(int arg) { // 尝试释放共享锁(增加许可数) if (tryReleaseShared(arg)) { //唤醒同步队列中的一个线程 doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; // 溢出情况判断 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
4. Semaphore的使用
OK,讲到这里,把信号量中主要的方法解释完了,我们来写一个小demo感受一下它的使用:
public class Test { private final Semaphore semaphore; /*构造一个令牌*/ public Test(int acq){ this.semaphore= new Semaphore(acq); } public void useSemaphore(){ try { semaphore.acquire(); // 使用资源 System.out.println("资源使用 " + Thread.currentThread().getName()); Thread.sleep(1000); // 模拟资源使用时间 } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); System.out.println("资源释放 " + Thread.currentThread().getName()); } } public static void main(String[] args) { Test test = new Test(3); for (int i = 0; i < 5; i++) { new Thread(test::useSemaphore).start(); } } }
输出:
资源使用 Thread-0 资源使用 Thread-1 资源使用 Thread-3 资源释放 Thread-0 资源使用 Thread-2 资源使用 Thread-4 资源释放 Thread-1 资源释放 Thread-3 资源释放 Thread-4 资源释放 Thread-2
5. 总结
Semaphore是基于AQS和CAS操作实现的共享锁,利用AQS中被volatile修饰的变量state来代表许可证数(permit),当用户调用acquire()方法来获取许可证数时,他会计算出本次取出操作后的许可证数,如果小于0,那么就会挂起等待,如果大于0,也就是许可证数还够用,那么就会使用CAS操作修改许可证数。当调用release()方法释放许可证数时,会唤醒等待的线程去获取共享锁
如果本文对你有帮助,点个小赞吧,后续有时间就多出出这方面的源码解读