5 源码分析(以非公平锁为例)
5.1 线程A先抢占锁
构造方法:
public ReentrantLock() { sync = new NonfairSync(); }
lock():
public void lock() { sync.lock(); }
假设现在有两个线程A和B,如下:
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; // 线程A进来 // 线程B进来 final void lock() { // 线程A执行成功,将AQS.state置为1,表示已抢占该锁 // 线程B,由于线程A已将AQS.state置为1,所以线程B执行CAS操作为false if (compareAndSetState(0, 1)) // 线程A,将AbstractOwnableSynchronizer.exclusiveOwnerThread设置为当前线程 setExclusiveOwnerThread(Thread.currentThread()); else // 线程B执行该部分,尝试获取一个锁 acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
其中acquire(1)方法的源码如下:
// 线程B调用:arg = 1 public final void acquire(int arg) { // 线程B // 尝试获得非公平锁,由于已被线程A抢到,所以tryAcquire(arg) = false // addWaiter方法,构建承载线程B的Node,然后添加到队列末尾,返回该node // acquireQueued方法 if (!tryAcquire(arg) && acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg)) { // 设置当前线程的中断标识 selfInterrupt(); } }
尝试抢锁的方法 tryAcquire(arg) 最终调用的:
/** * 尝试抢锁 * * 处理内容: * 1 如果抢到锁,返回true * 1.1 如果当前线程第一次抢到锁: * AQS.status由0变为1 * AQS.exclusiveOwnerThread = Thread.currentThread() * 返回true * 1.2 如果当前线程再次抢到锁(重入) * AQS.status++ * 返回true * 2 没抢到锁,返回false * */ // 线程B acquires = 1 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 由于线程A已将state设为1,所以c=1 int c = getState(); if (c == 0) { // 线程B,不满足不执行 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 线程B,不满足不执行 /** * 获得当前独享线程,如果就是当前线程,那么执行重入操作 * 执行tryLock()时: * 如果第二次进入,则nextc = 0 + 1 = 1 * 如果第三次进入,则nextc = 1 + 1 = 2 * 如果第四次进入,则nextc = 2 + 1 = 3 */ int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 线程B,返回false return false; }
回到 acquire 代码中 addWaiter 方法:
// 线程B:mode = Node.EXCLUSIVE = null private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 线程B:由于线程A并没有进入链表,所以tail = null不会进入if方法 if (pred != null) { node.prev = pred; // (老的尾部node) <--- 新node.prev if (compareAndSetTail(pred, node)) { pred.next = node; // (老的尾部node) next ---> (新node) return node; } } // 线程B:node = 承载线程B的node enq(node); return node; }
addWaiter 方法使用到了 Node 的构造方法:
// 线程B:thread = Thread.currentThread() mode = null Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; }
对照下面的图来看,nextWaiter 是null,Node的构造方法里没有对 waitStatus 赋值,所以默认为0
回到addWaiter方法,其中还用到了enq方法:
/** * 将node节点插入队列末尾 * 1 如果是空队列,则初始化一个空内容node作为第一个节点,然后将入参node加到队列末尾 * 2 如果不是空队列,则直接入参node加到队列末尾 * * @param node * @return */ // 线程B:node = 承载线程B的node private Node enq(final Node node) { for (;;) { // 线程B:第一次循环 tail = null // 线程B:第二次循环 tail = head = 空内容node Node t = tail; /** * 第一次进入由于队列为null,所以t = null * 第二次进入,由于队列已经被初始化1个节点,故 t != null */ if (t == null) { // 线程B:第一次循环 满足t == null,进入该模块内 if (compareAndSetHead(new Node())) // 初始化一个空内容节点,作为AQS.head节点 tail = head; } else { // 线程B:第二次循环 满足 t != null进入该模块内,即空内容node <-> 新节点 node.prev = t; // 老的尾部node <--- 入参node.prev if (compareAndSetTail(t, node)) { // 将AQS.tailOffset内容更新为入参node t.next = node; // 老的尾部node ---> 入参node return t; } } } }
此时addWaiter执行完毕,又回到acquire方法,addWaiter返回了承载B的node,现在要执行acquireQueued方法:
// 线程B:node = 承载B的节点,arg = 1 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 线程B:会一直循环,直到拿到锁 for (;;) { // 线程B:p=空内容node(head) final Node p = node.predecessor(); // 线程B:p == head等于true;但是由于线程A先抢到锁,所以tryAcquire(arg)=false,所以不会进入下面的if if (p == head && tryAcquire(arg)) { // tryAcquire(arg):抢锁操作 setHead(node); // 更新头结点为入参node p.next = null; // help GC failed = false; return interrupted; } // 线程B:shouldParkAfterFailedAcquire方法,设置p节点的waitStatus = Node.SIGNAL(原先是0),如果自旋两次没抢到锁的话会返回true挂起线程 // 线程B:parkAndCheckInterrupt方法,挂起线程B if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
5.2 线程A释放锁
如果A调用unlock操作:
public void unlock() { sync.release(1); }
释放锁的方法如下:
// 线程A:arg = 1 public final boolean release(int arg) { if (tryRelease(arg)) { // 线程A:AQS.state = 0,AQS.exclusiveOwnerThread = null Node h = head; // 线程A:满足条件 h.waitStatus == SIGNAL(值为-1) if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
其中调用了tryRelease方法:
// 线程A:release = 1 protected final boolean tryRelease(int releases) { // 线程A:c = 1 - 1 = 0 int c = getState() - releases; // 如果当前线程不是之前抢占的线程,则抛异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 线程A:c == 0 为 true if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
对应:
tryRelease执行完毕之后又回到了release方法中,运行unparkSuccessor方法
private void unparkSuccessor(Node node) { // 线程A:node.waitStatus == SIGNAL(-1) int ws = node.waitStatus; if (ws < 0) { // 线程A:设置 waitStatus == 0 compareAndSetWaitStatus(node, ws, 0); } Node s = node.next; // 线程A:s == 线程B,s.waitStatus == SIGNAL(-1),所以不满足 if (s == null || s.waitStatus > 0) { // node是tail 或者 node的tail节点waitStatus > 0 s = null; // 断开node与node.next的连接 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 线程A:s == 线程B,激活线程B if (s != null) LockSupport.unpark(s.thread); }
释放完毕之后,在线程B的lock方法的一些列调用中有 parkAndCheckInterrupt() 方法,即线程B被挂起了,A释放之后又回到被挂起那个位置继续执行,所以线程B的 acquireQueued() 方法可以抢到锁
对应: