一、入题
Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。它的用法如下:
// 创建一个计数阈值为5的信号量对象 // 只能5个线程同时访问 Semaphore semp = new Semaphore(5); try { // 申请许可 semp.acquire(); try { // 业务逻辑 } catch (Exception e) { } finally { // 释放许可 semp.release(); } } catch (InterruptedException e) { }那么Semaphore内部是如何实现的呢?本文我们将以acquire()方法为例进行详细研究。
二、主体结构
同ReentrantLock一样,Semaphore内部也是依靠一个继承自AbstractQueuedSynchronizer的Sync抽象类型的类成员变量sync来实现主要功能的,如下:
/** All mechanics via AbstractQueuedSynchronizer subclass */ private final Sync sync;同时,Semaphore也是由公平性和非公平性两种实现模式,对应Sync的两个实现类FairSync和NonfairSync。而acquire()方法实现的主要逻辑为:
它的主要处理流程是:
1、通过Semaphore的acquire()方法申请许可;
2、调用类成员变量sync的acquireSharedInterruptibly(1)方法处理,实际上是父类AbstractQueuedSynchronizer的acquireSharedInterruptibly()方法处理;
3、AbstractQueuedSynchronizer的acquireSharedInterruptibly()方法会先在当前线程未中断的情况下先调用tryAcquireShared()方法尝试获取许可,未获取到则调用doAcquireSharedInterruptibly()方法将当前线程加入等待队列。acquireSharedInterruptibly()代码如下:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
至于如何加入等待队列,还有等待队列的线程如何竞争获取许可,我会在专门分析AbstractQueuedSynchronizer的文章中进行详细描述,本文目前仅关注Semaphore层面。
4、接下来竞争许可信号的tryAcquireShared()方法则分别由公平性FairSync和非公平性NonfairSync各自实现。
三、非公平性NonfairSync
非公平性的tryAcquireShared()方法调用的是其父类Sync的nonfairTryAcquireShared()方法,代码如下:
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }在一个无限循环内:
1、首先通过getState()获取状态,这个状态在ReentrantLock中也讲到过,那里为0表示尚未有任何线程持有锁,为正表示持有该锁的线程重入次数,而这里则表示当前可用许可数available;
2、然后通过当前可用许可数available减去本次申请许可数acquires,得到假如本次申请许可得到满足后的剩余许可数remaining;
3、如果remaining小于0,则本次申请的许可数得不到满足,直接返回(后续将当前线程加入到等待队列),或者remaining大于等于0时,即本次申请的许可数能够得到满足时,则尝试通过CAS操作,即compareAndSetState(available, remaining)修改状态,修改成功则获取许可成功,否则也是会在后续将当前线程加入到等待队列。
可以看到,非公平性NonfairSync无视等待队列的存在,不管现在有没有现成排队等待申请许可,上来先抢,剩余许可数不足或抢不到再被加入等待队列,太不公平了。
四、公平性FairSync
公平性FairSync的tryAcquireShared()方法实现如下:
protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }也是在一个无限循环内:
1、它会先判断当前线程之前等待队列内是否存在其它线程排队请求许可,有的话直接返回-1,后续会将该线程加入到等待队列,这部分逻辑判断是通过AbstractQueuedSynchronizer的hasQueuedPredecessors()方法实现的,以后再做分析;
2、剩下的就是和非公平性NonfairSync中调用的nonfairTryAcquireShared()方法一样了,判断当前状态,通过CAS抢占等,不再赘述。
五、默认实现
Semaphore的默认实现是非公平性,如下:
/** * Creates a {@code Semaphore} with the given number of * permits and nonfair fairness setting. * * @param permits the initial number of permits available. * This value may be negative, in which case releases * must occur before any acquires will be granted. */ public Semaphore(int permits) { sync = new NonfairSync(permits); }
你也可以通过另外一个构造函数生成指定实现方式的Semaphore对象,如下:
/** * Creates a {@code Semaphore} with the given number of * permits and the given fairness setting. * * @param permits the initial number of permits available. * This value may be negative, in which case releases * must occur before any acquires will be granted. * @param fair {@code true} if this semaphore will guarantee * first-in first-out granting of permits under contention, * else {@code false} */ public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
六、其它
Semaphore也提供了boolean tryAcquire(long timeout, TimeUnit unit)、tryAcquire()等限制时间内阻塞或非阻塞实现方式,比较简单,但是有一点,公平模式下的tryAcquire()、tryAcquire(int permits)会打破原先的公平性,因为其是通过调用sync的nonfairTryAcquireShared()方法的方式实现的,需要另外使用tryAcquire(long timeout, TimeUnit unit)、tryAcquire(int permits, long timeout, TimeUnit unit)来保持公平性。tryAcquire()代码如下:
public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; }而tryAcquire(long timeout, TimeUnit unit)等则是通过Sync父类AbstractQueuedSynchronizer的tryAcquireSharedNanos()方法实现的,其实现为:
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }也是通过tryAcquireShared()和doAcquireSharedNanos()方法实现的,模式与上面一致。