图解AQS源码分析(下)

简介: AbstractQueuedSynchronizer抽象的队列式同步器(抽象类)。提供了一个FIFO队列,可以看成是一个用来实现同步锁以及其他涉及到同步功能的核心组件,常见的有:ReentrantLock、CountDownLatch等。

5 源码分析(以非公平锁为例)


5.1 线程A先抢占锁


构造方法:


public ReentrantLock() {
    sync = new NonfairSync();
}


lock():


public void lock() {
    sync.lock();
}


假设现在有两个线程A和B,如下:


static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    // 线程A进来
    // 线程B进来
    final void lock() {
        // 线程A执行成功,将AQS.state置为1,表示已抢占该锁
        // 线程B,由于线程A已将AQS.state置为1,所以线程B执行CAS操作为false
        if (compareAndSetState(0, 1))
            // 线程A,将AbstractOwnableSynchronizer.exclusiveOwnerThread设置为当前线程
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // 线程B执行该部分,尝试获取一个锁
            acquire(1);
    }
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}


其中acquire(1)方法的源码如下:


// 线程B调用:arg = 1
public final void acquire(int arg) {
    // 线程B
    // 尝试获得非公平锁,由于已被线程A抢到,所以tryAcquire(arg) = false
    // addWaiter方法,构建承载线程B的Node,然后添加到队列末尾,返回该node
    // acquireQueued方法
    if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg)) {
        // 设置当前线程的中断标识
        selfInterrupt();
    }
}


尝试抢锁的方法 tryAcquire(arg) 最终调用的:


/**
 * 尝试抢锁
 *
 * 处理内容:
 *  1 如果抢到锁,返回true
 *      1.1 如果当前线程第一次抢到锁:
 *          AQS.status由0变为1
 *          AQS.exclusiveOwnerThread = Thread.currentThread()
 *          返回true
 *      1.2 如果当前线程再次抢到锁(重入)
 *          AQS.status++
 *          返回true
 *  2 没抢到锁,返回false
 *
 */
// 线程B acquires = 1
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 由于线程A已将state设为1,所以c=1
    int c = getState();
    if (c == 0) {   // 线程B,不满足不执行
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {    // 线程B,不满足不执行
        /**
         * 获得当前独享线程,如果就是当前线程,那么执行重入操作
         * 执行tryLock()时:
         *      如果第二次进入,则nextc = 0 + 1 = 1
         *      如果第三次进入,则nextc = 1 + 1 = 2
         *      如果第四次进入,则nextc = 2 + 1 = 3
         */
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 线程B,返回false
    return false;
}


回到 acquire 代码中 addWaiter 方法:


// 线程B:mode = Node.EXCLUSIVE = null
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail; 
    // 线程B:由于线程A并没有进入链表,所以tail = null不会进入if方法
    if (pred != null) {
        node.prev = pred;   // (老的尾部node) <--- 新node.prev 
        if (compareAndSetTail(pred, node)) {
            pred.next = node;   // (老的尾部node) next ---> (新node)
            return node;
        }
    }
    // 线程B:node = 承载线程B的node
    enq(node);
    return node;
}


addWaiter 方法使用到了 Node 的构造方法:


// 线程B:thread = Thread.currentThread()  mode = null
Node(Thread thread, Node mode) {     
    this.nextWaiter = mode;
    this.thread = thread;
}


对照下面的图来看,nextWaiter 是null,Node的构造方法里没有对 waitStatus 赋值,所以默认为0


20200916111016772.png


回到addWaiter方法,其中还用到了enq方法:


/**
 * 将node节点插入队列末尾
 * 1 如果是空队列,则初始化一个空内容node作为第一个节点,然后将入参node加到队列末尾
 * 2 如果不是空队列,则直接入参node加到队列末尾
 *
 * @param node
 * @return
 */
// 线程B:node = 承载线程B的node
private Node enq(final Node node) {
    for (;;) {
        // 线程B:第一次循环 tail = null
        // 线程B:第二次循环 tail = head = 空内容node
        Node t = tail;
        /**
         * 第一次进入由于队列为null,所以t = null
         * 第二次进入,由于队列已经被初始化1个节点,故 t != null
         */
        if (t == null) {    // 线程B:第一次循环 满足t == null,进入该模块内
            if (compareAndSetHead(new Node()))  // 初始化一个空内容节点,作为AQS.head节点
                tail = head;
        } else {    // 线程B:第二次循环 满足 t != null进入该模块内,即空内容node <-> 新节点
            node.prev = t;  // 老的尾部node <--- 入参node.prev
            if (compareAndSetTail(t, node)) {   // 将AQS.tailOffset内容更新为入参node
 t.next = node;  // 老的尾部node ---> 入参node
                return t;
            }
        }
    }
}


此时addWaiter执行完毕,又回到acquire方法,addWaiter返回了承载B的node,现在要执行acquireQueued方法:


20200916111120168.png


// 线程B:node = 承载B的节点,arg = 1
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 线程B:会一直循环,直到拿到锁
        for (;;) {
            // 线程B:p=空内容node(head)
            final Node p = node.predecessor();
            // 线程B:p == head等于true;但是由于线程A先抢到锁,所以tryAcquire(arg)=false,所以不会进入下面的if
            if (p == head && tryAcquire(arg)) { // tryAcquire(arg):抢锁操作
                setHead(node);  // 更新头结点为入参node
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 线程B:shouldParkAfterFailedAcquire方法,设置p节点的waitStatus = Node.SIGNAL(原先是0),如果自旋两次没抢到锁的话会返回true挂起线程
            // 线程B:parkAndCheckInterrupt方法,挂起线程B
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


5.2 线程A释放锁


如果A调用unlock操作:


public void unlock() {
    sync.release(1);
}


释放锁的方法如下:


// 线程A:arg = 1
public final boolean release(int arg) {
    if (tryRelease(arg)) {  // 线程A:AQS.state = 0,AQS.exclusiveOwnerThread = null
        Node h = head;
        // 线程A:满足条件 h.waitStatus  == SIGNAL(值为-1)
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}


其中调用了tryRelease方法:


// 线程A:release = 1
protected final boolean tryRelease(int releases) {
    // 线程A:c = 1 - 1 = 0
    int c = getState() - releases;
    // 如果当前线程不是之前抢占的线程,则抛异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 线程A:c == 0 为 true
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}


对应:


20200916111326654.png


tryRelease执行完毕之后又回到了release方法中,运行unparkSuccessor方法


private void unparkSuccessor(Node node) {
    // 线程A:node.waitStatus == SIGNAL(-1)
    int ws = node.waitStatus;
    if (ws < 0) {
        // 线程A:设置 waitStatus == 0
        compareAndSetWaitStatus(node, ws, 0);
    }
    Node s = node.next;
    // 线程A:s == 线程B,s.waitStatus == SIGNAL(-1),所以不满足
    if (s == null || s.waitStatus > 0) {    // node是tail 或者 node的tail节点waitStatus > 0
        s = null;   // 断开node与node.next的连接
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 线程A:s == 线程B,激活线程B
    if (s != null)
        LockSupport.unpark(s.thread);
}


释放完毕之后,在线程B的lock方法的一些列调用中有 parkAndCheckInterrupt() 方法,即线程B被挂起了,A释放之后又回到被挂起那个位置继续执行,所以线程B的 acquireQueued() 方法可以抢到锁


对应:


20200916111411633.png

相关文章
AQS源码解读之一
AQS源码解读之一
50 0
|
Java 程序员 API
AQS 原理解读
AQS 原理解读
图解ReentrantReadWriteLock读写锁的实现原理(下)
图解ReentrantReadWriteLock读写锁的实现原理
136 0
图解ReentrantReadWriteLock读写锁的实现原理(下)
|
Java API
图解ReentrantReadWriteLock读写锁的实现原理(上)
图解ReentrantReadWriteLock读写锁的实现原理
220 0
图解ReentrantReadWriteLock读写锁的实现原理(上)
|
机器学习/深度学习 Java 调度
浅谈AQS原理
经典八股文之AQS原理
208 0
|
安全 Java
java并发原理实战(9)--手动实现一个可重入锁
java并发原理实战(9)--手动实现一个可重入锁
135 0
java并发原理实战(9)--手动实现一个可重入锁
|
存储 安全 Java
《从面试题来看源码》-LinkedBlockingQueue 源码分析
《从面试题来看源码》-LinkedBlockingQueue 源码分析
《从面试题来看源码》-LinkedBlockingQueue 源码分析
|
存储 Java
Java并发之AQS源码分析(一)
AQS 全称是 AbstractQueuedSynchronizer,顾名思义,是一个用来构建锁和同步器的框架,它底层用了 CAS 技术来保证操作的原子性,同时利用 FIFO 队列实现线程间的锁竞争,将基础的同步相关抽象细节放在 AQS,这也是 ReentrantLock、CountDownLatch 等同步工具实现同步的底层实现机制。它能够成为实现大部分同步需求的基础,也是 J.U.C 并发包同步的核心基础组件。
129 0
Java并发之AQS源码分析(一)
|
安全 Java
Java并发之AQS源码分析(二)
我在 Java并发之AQS源码分析(一)这篇文章中,从源码的角度深度剖析了 AQS 独占锁模式下的获取锁与释放锁的逻辑,如果你把这部分搞明白了,再看共享锁的实现原理,思路就会清晰很多。下面我们继续从源码中窥探共享锁的实现原理。
159 0
Java并发之AQS源码分析(二)