感受
这个类的代码除去注释差不多有千多行,要想把所有代码都读完,然后按照作者的思路给理解完,是不容易的.这里我仔仔细细读了差不多一半的代码,说难倒不是很难.
虽然没有完全看完,但是基本上理解了作者代码的意图..说得简单些,就是操作一个双向链表.而链表中的每个节点有多种状态.AQS就是要保证整个双向链表和节点的状态的正确性.
连续看了好几天的JUC相关的源码,现在脑袋真有点晕乎乎的.加上这个类的代码确实有点多,这个就不在整理这个类的分析结果,就直接把代码copy了,并附上一张图.(这里只贴出分析过的那部分代码) 如果有朋友需要一起探讨的,留言就好了.
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
protected AbstractQueuedSynchronizer() {
}
//链接节点类
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
//表示当前节点的线程已经被取消
static final int CANCELLED = 1;
//表示当前节点的后继节点应该被唤醒
static final int SIGNAL = -1;
//表示当前节点的线程正在等待某一个条件
static final int CONDITION = -2;
static final int PROPAGATE = -3;
//上面四个状态之一或者0
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
//返回前继节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
*
* @param node the node to insert
* @return node's predecessor 返回插入节点的前继节点
*/
//返回插入节点的前继节点
//是线程安全的方法,试想多个线程同时执行这个方法,分析代码可知是不会出现数据不一致的情况的
//假如多个线程同时执行这个方法,且此时tail为null,那么可能出现他们同时执行a行代码的情况,
//但是a行代码是一个CAS操作,所以只会有一个线程执行成功,也就是说只会有一个线程才会执行b行代码.
//然后所有的线程又会进入循环.然会它们可能又同时执行了c行代码
// (也就是所有的线程都把各自要插入的节点的前继节点设置为了队列的尾节点,初看上去不应该这样,但是莫慌)
//然后所有的线程可能同时执行d代码,但是d行代码事CAS操作,所以也只会有个线程执行成功,也就是说只会有一个线程执行e行并返回.
//而剩余的线程又会进入循环.此时尾节点已经更新,所以不会造成数据的不一致
private Node enq(final Node node) {
for (; ; ) { //
Node t = tail; //
if (t == null) { // Must initialize //
if (compareAndSetHead(new Node())) //a
tail = head; //b
} else {
node.prev = t; //c
if (compareAndSetTail(t, node)) { //d
t.next = node; //e
return t;
}
}
}
}
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
//参数的值那么是 Node.EXCLUSIVE 要么是 Node.SHARED
private Node addWaiter(Node mode) {
//这行代码不存在竞争资源,所以不存在线程是否安全的问题
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 下面的代码会先尝试以最快的方式入队列,同时如果失败就会以常规方式入队列作为后备计划
Node pred = tail;
if (pred != null) {
node.prev = pred;
//如果多个线程同时执行下面的代码,那么只会有个线程会返回,所以其他线程则执行a代码
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//以常规方式入队列
enq(node); //a
return node;
}
/**
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*
* @param node the node
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
//如果输入节点存在后继节点,则唤醒之.如果不存在,则唤醒队列中第一个非CANCELLED状态的节点(如果有的话).
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
//如果当且节点对应的线程正处于 Node.SIGNAL OR Node.CONDITION OR Node.PROPAGATE 三个状态之一
//则通过CAS操作把状态更改为0 如果更改失败也没有关系
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//得到后继节点
Node s = node.next;
//如果后继节点不存在或者后继节点对应的线程已经被取消掉,则从尾节点向头节点遍历(排除输入节点),
//只要遍历的节点的状态不是CANCELLED,则记录之.最后的效果就相当于从前往后找到整个队列中第一个非CANCELLED状态的节点(排除输入节点)
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//如果s!=null,则s可能是输入节点的后继节点,也可能是队列中第一个非CANCELLED状态的节点(排除输入节点)
//取消阻塞s节点
//需要注意的是:如果n个线程同时执行unparkSuccessor(Node)方法,且最后的s不为null,那么就会在s对应的线程上执行n次unpart操作.
//这样的话,后面的n-1次unpark操作可能都是在非阻塞情况下执行.需要注意对非阻塞线程进行unpark操作的影响.
LockSupport.unpark(s.thread);
}
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
//建议先不要看这段总结,直接看方法内部的说明.内部看完后再来看这段总结
//
//整个方法总结下来就是:要求整个方法执行过程中没有其他线程更改头节点的值,否则线程一直在循环体中不出来.
//假设整个方法的执行过程中,头节点的值没有发生变化.
//
//那么如果有n个线程同时执行这个方法,那么最后的效果就是
//有一个线程把头节点的值更改为了0,同时执行了unpartSuccessor(h)操作.
//而其余的线程中又会有一个线程会在头几点的状态更改为0之后又把头节点的状态更改为PROPAGATE
//然后剩余的n-2个线程则不会更改任何东西就直接结束了方法的执行
//
//上面的分析是在多个线程,头节点开始状态是SIGNAL的情况下,当然还有其他情况下的执行结果.
//这里就不在一一说明.比如
// 多线程,开始状态为0的情况 :有一个线程把状态更改为了PROPAGATE,然后退出方法.其他线程不做任何改变然后退出方法
// 多线程,开始状态既不是SIGNAL也不是0的情况 :所有线程不做什么改变然后退出方法
//
// 单线程,开始状态为SIGNAL的情况 :头节点状态被更改为0,且执行了unparkSuccessor操作
// 单线程,开始状态为0的情况; :头节点状态被更改为PROPAGATE
// 单线程,开始状态既不是SIGNAL也不是0的情况 :线程不做什么改变然后退出方法
//
//再次总结:(下面是单线程情况下.多线程可以看做是下面的流程执行了多次)
//在头节点不改变的前提下
// 如果头节点的状态是SIGNAL,则把状态更改为0,同时执行unparkSuccessor操作
// 如果头节点状态是0,则把状态更改为PROPAGATE
// 如果是其他状态,什么也不改变
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (; ; ) {
//获取此时的头节点
Node h = head;
//如果h节点(不能说头节点,因为这个过程中,可能有其他线程更改了头节点的值)不为null,且h节点不是尾节点(即队列长度大于等于2)
if (h != null && h != tail) {
//获取h节点的状态
int ws = h.waitStatus;
//如果h节点的状态是SIGNAL
if (ws == Node.SIGNAL) {
//把h节点的状态更改为0 如果有多个线程同时执行到这里,且获取到的头节点都是同一个节点,那么只会有一个线程执行成功.
//成功更改状态的那个线程就会执行 取消阻塞h节点的后继节点 的操作,然后执行a行代码.失败的线程则又会进入循环
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//如果h节点的状态是0,则把状态更改为PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果线程执行到这里,如果没有其他线程更改头节点,则跳出循环结束方法.
//而如果有其他线程更改了头节点的值,则又进入循环
if (h == head) // loop if head changed //a
break;
}
}
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
//这个条件没有看明白,如果输入node为null,那后面的代码岂不报错?
if (propagate > 0
|| h == null
|| h.waitStatus < 0
|| (h = head) == null
|| h.waitStatus < 0
) {
//得到输入节点的后继节点
Node s = node.next;
//如果不存在后继节点或者后继节点是分享模式,则执行doReleaseShared();
if (s == null || s.isShared())
doReleaseShared();
}
}
// Utilities for various versions of acquire
/**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
//看这个方法的时候配合后面的图也许好理解些.
//这个图只描述了一种情况,其他情况的图就没有画了.
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
//要求输入node不为null
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
// 图A
//循环条件是 状态为CANCELLED.
//跳出循环后的结果:
// pred 指向的是 从输入节点的前继节点开始向头节点遍历,遇到的第一个非CANCELLED状态的节点
// 输入节点的前继节点指向 pred 节点
//
// 如果多线程同时执行这个循环,会导致node.prev的值不一致,
// 所以AQS框架应该会在外部进行限制,使得不会有多个线程同时执行这个方法.呆会会看到这个限制
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//图B
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
//图C
// If we are the tail, remove ourselves.
//
//如果输入节点是尾节点,就把尾节点设置为pred节点,同时把pred节点的后继节点设置为null
// 上面的分析得出不会有多个线程同时执行这个方法,但是有可能多个线程同时在执行compareAndSetTail操作.
// 所以如果执行该方法的线程在执行compareAndSetTail时有可能返回false.而如果返回false,则进入到else
if (node == tail && compareAndSetTail(node, pred)) { //a
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
//图C
//使得线程执行这部分代码,有两种情况:
// 输入节点不是尾节点
// 输入节点是尾节点,但更新尾节点的值时失败.(这种情况的话,node节点又变为了不是尾节点)
// 所以无论是上述哪一种情况,执行到这里时,node节点肯定不是尾节点了(而此时的尾节点到底是在node节点前还是后呢?)
int ws;
//我勒个天,这个条件好难理
if (pred != head
&& ( (ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
&& pred.thread != null
) {
//执行这段代码的条件:
// pred不是头节点 且 pred节点的状态是SIGNAL(如果不是SIGNAL,则要求成功把状态更改为SIGNAL) 且 pred节点的线程不是null
Node next = node.next;
if (next != null && next.waitStatus <= 0)
//执行条件是 此时的node节点有后继节点 且 后继节点的状态是非CANCELLED状态
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
}
附图
欢迎关注订阅号: