一、AQS对锁的底层支持
AbstractQueuedSynchronizer抽象同步队列简称AQS,它是实现同步器的基础组件,并发包中锁的底层就是使用AQS实现的。
1、概述
首先看看AQS的类图:
其中Node节点的类图是:
结合两张图和源码,我们可以知道AQS以及Node的特性:
- AQS是一个FIFO的双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node
- Node中的thread变量用来存放进入AQS队列里面的线程
- Node节点内部的SHARED用来标记线程是获取共享资源时被阻塞挂起后放入AQS队列的
- Node节点内部的EXCLUSIVE用来标记线程是获取独占资源时被阻塞挂起后放入AQS队列的
- Node节点内部的waitStatus记录当前线程等待状态,可以为CANCELLED(被取消)/SIGNAL(被唤醒)/CONDITION(在条件队列中等待)/PROPAGATE(释放共享资源时需要通知其他节点)
- Node节点内部的prev记录当前节点的前驱节点,next记录当前节点的后继节点
- Node节点内部的nextWaiter有两种作用:1. 表示下一个在Condition条件队列中等待的节点(共享模式);2. 表示是共享模式或者独占模式。
继续深入学习:
AQS中维护了一个单一的状态信息state,并有getter、setter、cas三个方法获取或修改它的值
protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
对于ReentrantLock的实现来说,state可以用来表示当前线程获取锁的可重入次数:
对于ReentrantReadWriteLock的实现来说,state的高16位线程获取到读锁的可重入次数,低16位表示获取到写锁的可重入次数:
int c = getState();//调用AQS的getState方法来获取state值 static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
对于CountDownLatch的实现来说,state用来表示计数器当前的值:
private static final class Sync extends AbstractQueuedSynchronizer { int getCount() { return getState(); } } public long getCount() { return sync.getCount(); }
- AQS的内部类ConditionObject,用来结合锁实现线程同步。ConditionObject可以直接访问AQS对象内部的变量,比如state状态值和AQS队列。ConditionObject是条件变量,每个条件变量对应一个条件队列(单项链表队列),其用来存放调用条件变量的await方法后被阻塞的线程。
- 对于AQS,线程同步的关键是对状态值state的操作。根据state是否属于同一个线程,操作state的方式分为独占方式和共享方式。在独占模式下,获取和释放资源的方法为:void acquire(int arg)、void acquireInterruptibly(int arg)、boolean release(int arg) 在共享模式下,获取和释放资源的方法为:void acquireShared(int arg)、void acquireSharedInterruptibly(int arg)、boolean releaseShared(int arg)
- 使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记是这个线程获取到了,其他线程再尝试操作state获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。比如独占锁ReentrantLock的实现,当一个线程获取了ReentrantLock的锁后,在AQS内部会首先使用CAS操作把state状态值从0变为1,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它是锁的持有者,则会把状态值从1变为2,即设置可重入次数。而当另外一个线程获取该锁时发现自己并不是该锁的持有者就会被放入AQS阻塞队列后挂起。
- 使用共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过CAS方式竞争获取资源,当一个线程获取了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则此线程只需要使用CAS方式获取即可。比如Semaphore信号量,当一个线程通过acquire方法获取信号量时,会首先看当前信号量个数是否满需要,不满足则把当前线程放入阻塞队列,如果满足则通过CAS获取信号量。
2、独占模式下线程获取与释放资源
(1)当一个线程调用acquire(int arg)方法获取独占资源时,先调用tryAcquire方法尝试获取资源,成功则返回;失败则将当前线程封装为类型为Node.EXCLUSIVE的Node节点后插入AQS阻塞队列尾部,并调用LockSupport.park(this)方法挂起自己。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
(2)当一个线程调用release(int arg)方法释放独占资源时,先调用tryRelease方法尝试释放资源,这里是设置state的值,然后调用LockSupport.unpark(thread)方法唤醒队列里被阻塞的一个线程thread。被唤醒的线程则使用tryAcquire尝试,看当前状态变量state的值是否满足自己的需要,满足则该线程被唤醒,然后继续向下运行,否则还是会被放入AQS队列中并被挂起。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
3、共享模式下线程获取与释放资源
(1)当线程调用 acquireShared(int arg)方法获取共享资源时,会首先使用 trγAcquireShared尝试获取资源,具体是设置状态变量state值,成功则返回,失败则将当前线程封装为类型为Node.SHARED的Node节点后插入AQS队列的尾部,并使用LockSupport. park(this)方法挂起自己。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
(2)当线程调用releaseShared(int arg)时,首先会尝试使用tryReleaseShared操作释放资源,这里是设置状态变量state的值,然后使用LockSupport.unpark(thread)激活AQS里面被阻塞的一个线程thread.被激活的线程使用tryReleaseShared查看当前状态变量state的值是否能满足自己的需要,满足该线程被激活,然后继续向下运行,否则还是会被放入AQS队列并被挂起。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
值得注意的是,以上tryAcquire
、tryRelease
方法,AQS未给出可用的方法,而是需要子类去根据自身的特点去重写,此外,还需重写isHeldExclusively
方法,以判断锁是被当前线程独占还是被共享。
4、Interruptibly关键字
acquire方法对应的有acquireInterruptibly方法,acquireShared方法对应的有acquireInterruptiblyShared方法,那Interruptibly修饰的作用又是什么,来看看:
不带Interruptibly的方法表示不对中断进行响应,也就是线程在调用方法获取资源或者获取资源失败被挂起时,其他线程中断了该线程,那么该线程也不会因为中断而抛出异常,它还是会继续获取资源或者被挂起,即忽略中断。
带Interruptibly的方法表示对中断进行响应,即其他线程中断了该线程,会抛出InterruptedException异常。
5、AQS中队列的入队操作
维护AQS提供的队列,我们主要看看其入队操作。
当一个线程获取锁失败后,就会被封装成Node节点,如果队列未被创建,则调用下面的方法将节点入队:
private Node enq(final Node node) { for (;;) { Node t = tail;//(1) if (t == null) { // Must initialize if (compareAndSetHead(new Node()))//(2) tail = head; } else { node.prev = t;//(3) if (compareAndSetTail(t, node)) {//(4) t.next = node; return t; } } } }
在第一次循环中,由于队列未初始化,所以队列头head和队列尾tail都指向null;当执行完代码(1)后,节点t指向了尾部节点,如图中的状态(I);这是由于t为null,故执行代码(2),使用CAS算法设置一个哨兵节点为头节点,如果CAS成功,则让tail也指向哨兵节点,如图中的状态(II)。到此为止,队列初始化完毕,这时队列只有一个哨兵节点。
在第二次循环中,执行到代码(1),这时队列状态如图(III)所示;然后执行代码(3)设置node的前驱节点为tail,这时队列状态如图(IV)所示;然后执行代码(4),使用CAS算法设置node节点为tail,CAS成功后队列状态如图(V);CAS成功后再设置原来的tail的后继节点为node,这就完成了双向链表的插入,如图(VI)。
6、小结
通过本期的学习,大家应该对AQS有了一定的认识,我主要是写了一些偏思想方面的东西,源码的话大家有兴趣的话可以自己阅读。AQS是JUC中众多类的父类,可以说它是实现线程同步和锁的标准,JUC中的ReentrantLock、ReentrantReadWriteLock等都是AQS的实现;线程对资源的占有主要有独占和共享模式,AQS在这两种模式下,线程获取资源、释放资源的方式又略有不同;然后再讲解了下线程被阻塞挂起后被插入到AQS维护的队列的一些细节。本期的学习就到这里,知识点多而杂,还需要多加消化。