前言
Semaphore是用于限制指定个数线程控制访问资源的同步工具,也是基于AQS实现的共享锁机制。它能控制每个时刻最多有多少个线程能访问共享资源,实现限流的功能。
使用效果
创建一个Semaphore,令牌数是3个,使用5个线程模拟获取令牌
java.util.concurrent.Semaphore semaphore = new java.util.concurrent.Semaphore(3); for (int i=0; i< 5; i++) {
int finalI = i; new Thread(new Runnable() {
@SneakyThrows @Override public void run() {
semaphore.acquire(1); System.out.println(Thread.currentThread().getName() + " 在"+ DateFormatUtils.format(new Date(),"yyyy/MM/dd HH:mm:ss") + " 申请到一个令牌"); Thread.sleep((finalI +1) *3000); semaphore.release(1); } }).start(); }
控制台输出:
从日志发现,同时最多有3个线程获取到令牌,等释放令牌后,其他线程才能获取到令牌,起到限流的作用。
源码分析
AQS实现
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } //非公平方式获取共享锁,不排队,直接获取锁 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } //释放共享锁,归还令牌 protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } //减少令牌 final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } //令牌清零 final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } } //非公平版本 static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } /** * 公平版本 */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } //公平获取共享锁,一开始就要先判断队列中是否有线程已经在等待, //如果有等待的线程,则直接获取失败。 protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
获取1个令牌:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1); }
获取多个令牌:
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
释放一个令牌:
public void release() {
sync.releaseShared(1); }
释放多个令牌:
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }
总结:
令牌获取,令牌释放是配对的,体现了令牌资源复用。它既支持公平获取令牌,也支持非公平获取令牌,可以根据实际业务场景选择,在接口限流场景中使用很多。