3.5 类的核心函数
1. acquire函数
该函数以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。源码如下
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
由上述源码可以知道,当一个线程调用acquire时,调用方法流程如下。
说明:
① 首先调用tryAcquire函数,调用此方法的线程会试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。在AbstractQueuedSynchronizer源码中默认会抛出一个异常,即需要子类去重写此函数完成自己的逻辑。之后会进行分析。
② 若tryAcquire失败,则调用addWaiter函数,addWaiter函数完成的功能是将调用此方法的线程封装成为一个结点并放入Sync queue。
③ 调用acquireQueued函数,此函数完成的功能是Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。
由于tryAcquire默认实现是抛出异常,所以此时,不进行分析,之后会结合一个例子进行分析。
首先分析addWaiter函数
// 添加等待者 private Node addWaiter(Node mode) { // 新生成一个结点,默认为独占模式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 保存尾结点 Node pred = tail; if (pred != null) { // 尾结点不为空,即已经被初始化 // 将node结点的prev域连接到尾结点 node.prev = pred; if (compareAndSetTail(pred, node)) { // 比较pred是否为尾结点,是则将尾结点设置为node // 设置尾结点的next域为node pred.next = node; return node; // 返回新生成的结点 } } enq(node); // 尾结点为空(即还没有被初始化过),或者是compareAndSetTail操作失败,则入队列 return node; }
说明:addWaiter函数使用快速添加的方式往sync queue尾部添加结点,如果sync queue队列还没有初始化,则会使用enq插入队列中,enq方法源码如下
// 入队列 private Node enq(final Node node) { for (;;) { // 无限循环,确保结点能够成功入队列 // 保存尾结点 Node t = tail; if (t == null) { // 尾结点为空,即还没被初始化 if (compareAndSetHead(new Node())) // 头结点为空,并设置头结点为新生成的结点 tail = head; // 头结点与尾结点都指向同一个新生结点 } else { // 尾结点不为空,即已经被初始化过 // 将node结点的prev域连接到尾结点 node.prev = t; if (compareAndSetTail(t, node)) { // 比较结点t是否为尾结点,若是则将尾结点设置为node // 设置尾结点的next域为node t.next = node; return t; // 返回尾结点 } } } }
说明:enq函数会使用无限循环来确保节点的成功插入。
现在,分析acquireQueue函数。其源码如下
// sync队列中的结点在独占且忽略中断的模式下获取(资源) final boolean acquireQueued(final Node node, int arg) { // 标志 boolean failed = true; try { // 中断标志 boolean interrupted = false; for (;;) { // 无限循环 // 获取node节点的前驱结点 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 前驱为头结点并且成功获得锁 setHead(node); // 设置头结点 p.next = null; // help GC failed = false; // 设置标志 return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
说明:首先获取当前节点的前驱节点,如果前驱节点是头结点并且能够获取(资源),代表该当前节点能够占有锁,设置头结点为当前节点,返回。否则,调用shouldParkAfterFailedAcquire和parkAndCheckInterrupt函数,首先,我们看shouldParkAfterFailedAcquire函数,代码如下
// 当获取(资源)失败后,检查并且更新结点状态 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取前驱结点的状态 int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 状态为SIGNAL,为-1 /* * This node has already set status asking a release * to signal it, so it can safely park. */ // 可以进行park操作 return true; if (ws > 0) { // 表示状态为CANCELLED,为1 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 找到pred结点前面最近的一个状态不为CANCELLED的结点 // 赋值pred结点的next域 pred.next = node; } else { // 为PROPAGATE -3 或者是0 表示无状态,(为CONDITION -2时,表示此节点在condition queue中) /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 比较并设置前驱结点的状态为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 不能进行park操作 return false; }
说明:只有当该节点的前驱结点的状态为SIGNAL时,才可以对该结点所封装的线程进行park操作。否则,将不能进行park操作。再看parkAndCheckInterrupt函数,源码如下
// 进行park操作并且返回该线程是否被中断 private final boolean parkAndCheckInterrupt() { // 在许可可用之前禁用当前线程,并且设置了blocker LockSupport.park(this); return Thread.interrupted(); // 当前线程是否已被中断,并清除中断标记位 }
说明:parkAndCheckInterrupt函数里的逻辑是首先执行park操作,即禁用当前线程,然后返回该线程是否已经被中断。再看final块中的cancelAcquire函数,其源码如下
// 取消继续获取(资源) private void cancelAcquire(Node node) { // Ignore if node doesn't exist // node为空,返回 if (node == null) return; // 设置node结点的thread为空 node.thread = null; // Skip cancelled predecessors // 保存node的前驱结点 Node pred = node.prev; while (pred.waitStatus > 0) // 找到node前驱结点中第一个状态小于0的结点,即不为CANCELLED状态的结点 node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. // 获取pred结点的下一个结点 Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. // 设置node结点的状态为CANCELLED node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { // node结点为尾结点,则设置尾结点为pred结点 // 比较并设置pred结点的next节点为null compareAndSetNext(pred, predNext, null); } else { // node结点不为尾结点,或者比较设置不成功 // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // (pred结点不为头结点,并且pred结点的状态为SIGNAL)或者 // pred结点状态小于等于0,并且比较并设置等待状态为SIGNAL成功,并且pred结点所封装的线程不为空 // 保存结点的后继 Node next = node.next; if (next != null && next.waitStatus <= 0) // 后继不为空并且后继的状态小于等于0 compareAndSetNext(pred, predNext, next); // 比较并设置pred.next = next; } else { unparkSuccessor(node); // 释放node的前一个结点 } node.next = node; // help GC } }
说明:该函数完成的功能就是取消当前线程对资源的获取,即设置该结点的状态为CANCELLED,接着我们再看unparkSuccessor函数,源码如下
// 释放后继结点 private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ // 获取node结点的等待状态 int ws = node.waitStatus; if (ws < 0) // 状态值小于0,为SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3 // 比较并且设置结点等待状态,设置为0 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 获取node节点的下一个结点 Node s = node.next; if (s == null || s.waitStatus > 0) { // 下一个结点为空或者下一个节点的等待状态大于0,即为CANCELLED // s赋值为空 s = null; // 从尾结点开始从后往前开始遍历 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) // 找到等待状态小于等于0的结点,找到最前的状态小于等于0的结点 // 保存结点 s = t; } if (s != null) // 该结点不为为空,释放许可 LockSupport.unpark(s.thread); }
说明:该函数的作用就是为了释放node节点的后继结点。
对于cancelAcquire与unparkSuccessor函数,如下示意图可以清晰的表示。
说明:其中node为参数,在执行完cancelAcquire函数后的效果就是unpark了s结点所包含的t4线程。
现在,再来看acquireQueued函数的整个的逻辑。逻辑如下
① 判断结点的前驱是否为head并且是否成功获取(资源)。
② 若步骤①均满足,则设置结点为head,之后会判断是否finally模块,然后返回。
③ 若步骤①不满足,则判断是否需要park当前线程,是否需要park当前线程的逻辑是判断结点的前驱结点的状态是否为SIGNAL,若是,则park当前结点,否则,不进行park操作。
④ 若park了当前线程,之后某个线程对本线程unpark后,并且本线程也获得机会运行。那么,将会继续进行步骤①的判断。
2. release
以独占模式释放对象,其源码如下
public final boolean release(int arg) { if (tryRelease(arg)) { // 释放成功 // 保存头结点 Node h = head; if (h != null && h.waitStatus != 0) // 头结点不为空并且头结点状态不为0 unparkSuccessor(h); //释放头结点的后继结点 return true; } return false; }
说明:其中,tryRelease的默认实现是抛出异常,需要具体的子类实现,如果tryRelease成功,那么如果头结点不为空并且头结点的状态不为0,则释放头结点的后继结点,unparkSuccessor函数已经分析过,不再累赘。
对于其他函数我们也可以分析,与前面分析的函数大同小异,所以,不再累赘。
四、示例分析
1. 示例一
借助下面示例来分析AbstractQueuedSyncrhonizer内部的工作机制。示例源码如下
package com.hust.grid.leesf.abstractqueuedsynchronizer; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class MyThread extends Thread { private Lock lock; public MyThread(String name, Lock lock) { super(name); this.lock = lock; } public void run () { lock.lock(); try { System.out.println(Thread.currentThread() + " running"); } finally { lock.unlock(); } } } public class AbstractQueuedSynchonizerDemo { public static void main(String[] args) { Lock lock = new ReentrantLock(); MyThread t1 = new MyThread("t1", lock); MyThread t2 = new MyThread("t2", lock); t1.start(); t2.start(); } }
运行结果(可能的一种):
Thread[t1,5,main] running Thread[t2,5,main] running
结果分析:从示例可知,线程t1与t2共用了一把锁,即同一个lock。可能会存在如下一种时序。
说明:首先线程t1先执行lock.lock操作,然后t2执行lock.lock操作,然后t1执行lock.unlock操作,最后t2执行lock.unlock操作。基于这样的时序,分析AbstractQueuedSynchronizer内部的工作机制。
① t1线程调用lock.lock函数,其函数调用顺序如下,只给出了主要的函数调用。
说明:其中,前面的部分表示哪个类,后面是具体的类中的哪个方法,AQS表示AbstractQueuedSynchronizer类,AOS表示AbstractOwnableSynchronizer类。
② t2线程调用lock.lock函数,其函数调用顺序如下,只给出了主要的函数调用。
说明:经过一系列的函数调用,最后达到的状态是禁用t2线程,因为调用了LockSupport.lock。
③ t1线程调用lock.unlock,其函数调用顺序如下,只给出了主要的函数调用。
说明:t1线程中调用lock.unlock后,经过一系列的调用,最终的状态是释放了许可,因为调用了LockSupport.unpark。这时,t2线程就可以继续运行了。此时,会继续恢复t2线程运行环境,继续执行LockSupport.park后面的语句,即进一步调用如下。
说明:在上一步调用了LockSupport.unpark后,t2线程恢复运行,则运行parkAndCheckInterrupt,之后,继续运行acquireQueued函数,最后达到的状态是头结点head与尾结点tail均指向了t2线程所在的结点,并且之前的头结点已经从sync队列中断开了。
④ t2线程调用lock.unlock,其函数调用顺序如下,只给出了主要的函数调用。
说明:t2线程执行lock.unlock后,最终达到的状态还是与之前的状态一样。