AbstractQueuedSychronizer(抽象队列同步器,简称AQS):
1.JDK的并发包(包名:java.util.concurrent,以下简称JUC)下面提供了很多并发操作的工具类,如:ReentrantLock,CountDownLatch等。这些并发操作工具类的基础是AbstractQueuedSychronizer
2.*AQS内部维护了一个共享资源和两个队列:*一个是同步队列;一个是条件队列。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //同步队列的头结点 private transient volatile Node head; //同步队列的尾结点 private transient volatile Node tail; //同步队列的共享资源,队列的同步状态 private volatile int state; }
3.Node类的主要信息;
static final class Node { //静态变量,标识节点以共享模式等待资源 static final Node SHARED = new Node(); //标识节点以独占模式等待资源 static final Node EXCLUSIVE = null; //等待状态,节点取消等待资源 static final int CANCELLED = 1; //等待状态,标识后继节点需要唤醒 static final int SIGNAL = -1; //等待状态,标识线程处于条件等待状态 static final int CONDITION = -2; //等待状态,标识线程以共享模式获取资源,释放锁的行为将传播到后续节点,该状态作用于头节点 static final int PROPAGATE = -3; //节点等待状态,是上述4中状态之一,或者为0 volatile int waitStatus; //节点的前驱节点 volatile Node prev; //节点的后继节点 volatile Node next; //节点对应的线程 volatile Thread thread; //条件等待时标识下一个等待条件的节点,指向条件队列中的下一个节点 //或者,标识共享模式 Node nextWaiter; }
4.同步队列是用双向链表实现的,主要用于记录等待获取共享资源的线程
5.条件队列是一个单向链表的结构,链表中的元素也是Node,只不过条件队列中的元素使用Node的nextWaiter指向下一个元素。
6.AQS对外提供的protected类型的方法入手分析一下AQS的工作原理:
/** *尝试以独占模式获取共享资源 *@param arg 表示需要获取资源的个数 *@return true表示获取成功,false获取失败 **/ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } /** *尝试以独占模式释放共享资源 *@param arg 表示释放资源的个数 *@return true表示释放成功,false释放失败 **/ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } /** *尝试以共享模式获取共享资源 *@param arg 表示获取资源的个数 *@return true表示获取成功,false获取失败 **/ protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } /** *尝试以共享模式释放共享资源 *@param arg 表示释放资源的个数 *@return true表示释放成功,false释放失败 **/ protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } /** * 当前线程是否以独占模式获取了共享资源,该方法是在条件对象ConditionObject内部使用的,如果不需要条件等待,则无需实现该方法 *@return true表示是,false表示否 **/ protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
AQS以模板方法的模式,提供了多个线程对共享资源(state)操作的算法框架,上面的五个protected类型的方法主要用于尝试获取和释放共享资源,并不会阻塞当前线程,是AQS留给具体的业务操作类(如:ReentrantLock)来实现的。
7.AQS获取资源,释放资源等方法的具体代码:
/** *以独占模式获取共享资源,获取成功则返回,否则阻塞当前线程,并将当前线程放入同步队列等待获取资源 *@param arg 表示需要获取资源的个数 **/ public final void acquire(int arg) { //首先调用子类实现的tryAcquire方法,如果该方法返回true则表示获取成功,不进行后续判断 //否则,调用acquireQueued方法将当前线程放入同步队列排队等待获取资源 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /** *将当前线程包装成Node,并放入同步队列 *@param mode 模式,共享模式或者独占模式 *@return 当前线程所在的节点 **/ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 先尝试将节点放入队列的尾部,如果成功则返回,否则将节点入队 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //将当前节点放入同步队列,cas操作设置头和尾节点 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } /** *在队列中不断尝试获取资源 *@param node 当前线程所在节点 *@param arg 获取资源的个数 *@return 等待资源过程中线程是否被中断 **/ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //获取当前节点的前驱节点 final Node p = node.predecessor(); //只有前驱是头节点的情况下才尝试获取锁,因为头结点是当前持有资源的线程所在的节点,如果前驱不是头节点那么没有必要尝试获取 if (p == head && tryAcquire(arg)) { //获取成功后将当前节点设置为头结点 setHead(node); //释放原来的头节点 p.next = null; // help GC failed = false; return interrupted; } //如果节点前驱不是头结点或者获取资源失败则阻塞当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** *以独占模式释放共享资源 *@param arg 表示释放资源的个数 *@return true表示释放成功,false释放失败 **/ public final boolean release(int arg) { //尝试释放资源,如果失败则直接返回 if (tryRelease(arg)) { //释放成功后,唤醒后继节点 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
AQS中独占模式获取和释放资源的方法,这两个方法可以用于实现锁的功能,事实上ReentrantLock就是基于以上方法实现的。以共享模式获取和释放资源的方法,与独占模式类似
8.条件对象的等待和唤醒方法:
public class ConditionObject implements Condition, java.io.Serializable { //第一个条件等待的节点 private transient Node firstWaiter; //最后一个条件等待的节点 private transient Node lastWaiter; /** * 唤醒条件队列中的第一个等待的线程,此时该线程将进入同步队列重新等待获取资源 */ public final void signal() { //判断当前线程是否以独占模式占有资源 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) //将条件队列中的第一个线程,重新放入同步队列 doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { //如果CAS操作失败,说明线程取消获取共享资源,此时返回false,doSignal会尝试将下一个节点放入同步队列 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //将节点放入同步队列 Node p = enq(node); int ws = p.waitStatus; //设置节点的前驱节点的状态为Node.SIGNAL if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } //使获取共享资源的线程等待并进入条件队列,如果当前线程被中断则退出 public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //在条件队列中添加一个节点 Node node = addConditionWaiter(); //释放当前线程获取的共享资源 int savedState = fullyRelease(node); int interruptMode = 0; //判断当前节点是否在同步队列中,如果不在则暂停当前线程 while (!isOnSyncQueue(node)) { //暂停当前线程,该方法响应中断;当调用signal()方法的线程释放共享资源时,会从该处继续执行 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //重新等待获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } }
9.总结:当线程获取共享资源成功时返回,否则进入同步队列等待前驱节点唤醒,此时当前线程处于阻塞状态(LockSupport.park方法使线程阻塞);前驱节点释放共享资源后会唤醒(LockSupport.unpark方法唤醒线程)后继节点,需要说明的是获取共享资源成功的线程必定是头节点所在的线程。当获取共享资源的线程,调用Condition.await()方法时,当前线程会进入条件队列等待;当其他线程调用Condition.signal()方法,并释放共享资源时当前线程会重新进入同步队列等待获取共享资源。