【JUC】JDK1.8源码分析之AbstractQueuedSynchronizer(二)(2)

简介: 简介:  在锁框架中,AbstractQueuedSynchronizer抽象类可以毫不夸张的说,占据着核心地位,它提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架。所以很有必要好好分析

3.5 类的核心函数


  1. acquire函数


  该函数以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。源码如下 


public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
           selfInterrupt();
}

 由上述源码可以知道,当一个线程调用acquire时,调用方法流程如下。

image.png

说明:


  ① 首先调用tryAcquire函数,调用此方法的线程会试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。在AbstractQueuedSynchronizer源码中默认会抛出一个异常,即需要子类去重写此函数完成自己的逻辑。之后会进行分析。


  ② 若tryAcquire失败,则调用addWaiter函数,addWaiter函数完成的功能是将调用此方法的线程封装成为一个结点并放入Sync queue。


  ③ 调用acquireQueued函数,此函数完成的功能是Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。


  由于tryAcquire默认实现是抛出异常,所以此时,不进行分析,之后会结合一个例子进行分析。


  首先分析addWaiter函数

// 添加等待者
    private Node addWaiter(Node mode) {
        // 新生成一个结点,默认为独占模式
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 保存尾结点
        Node pred = tail;
        if (pred != null) { // 尾结点不为空,即已经被初始化
            // 将node结点的prev域连接到尾结点
            node.prev = pred; 
            if (compareAndSetTail(pred, node)) { // 比较pred是否为尾结点,是则将尾结点设置为node 
                // 设置尾结点的next域为node
                pred.next = node;
                return node; // 返回新生成的结点
            }
        }
        enq(node); // 尾结点为空(即还没有被初始化过),或者是compareAndSetTail操作失败,则入队列
        return node;
    }

 说明:addWaiter函数使用快速添加的方式往sync queue尾部添加结点,如果sync queue队列还没有初始化,则会使用enq插入队列中,enq方法源码如下 

// 入队列
    private Node enq(final Node node) {
        for (;;) { // 无限循环,确保结点能够成功入队列
            // 保存尾结点
            Node t = tail;
            if (t == null) { // 尾结点为空,即还没被初始化
                if (compareAndSetHead(new Node())) // 头结点为空,并设置头结点为新生成的结点
                    tail = head; // 头结点与尾结点都指向同一个新生结点
            } else { // 尾结点不为空,即已经被初始化过
                // 将node结点的prev域连接到尾结点
                node.prev = t; 
                if (compareAndSetTail(t, node)) { // 比较结点t是否为尾结点,若是则将尾结点设置为node
                    // 设置尾结点的next域为node
                    t.next = node; 
                    return t; // 返回尾结点
                }
            }
        }
    }

说明:enq函数会使用无限循环来确保节点的成功插入。

  现在,分析acquireQueue函数。其源码如下

// sync队列中的结点在独占且忽略中断的模式下获取(资源)
    final boolean acquireQueued(final Node node, int arg) {
        // 标志
        boolean failed = true;
        try {
            // 中断标志
            boolean interrupted = false;
            for (;;) { // 无限循环
                // 获取node节点的前驱结点
                final Node p = node.predecessor(); 
                if (p == head && tryAcquire(arg)) { // 前驱为头结点并且成功获得锁
                    setHead(node); // 设置头结点
                    p.next = null; // help GC
                    failed = false; // 设置标志
                    return interrupted; 
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 说明:首先获取当前节点的前驱节点,如果前驱节点是头结点并且能够获取(资源),代表该当前节点能够占有锁,设置头结点为当前节点,返回。否则,调用shouldParkAfterFailedAcquire和parkAndCheckInterrupt函数,首先,我们看shouldParkAfterFailedAcquire函数,代码如下

// 当获取(资源)失败后,检查并且更新结点状态
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取前驱结点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) // 状态为SIGNAL,为-1
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            // 可以进行park操作
            return true; 
        if (ws > 0) { // 表示状态为CANCELLED,为1
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0); // 找到pred结点前面最近的一个状态不为CANCELLED的结点
            // 赋值pred结点的next域
            pred.next = node; 
        } else { // 为PROPAGATE -3 或者是0 表示无状态,(为CONDITION -2时,表示此节点在condition queue中) 
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 比较并设置前驱结点的状态为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 
        }
        // 不能进行park操作
        return false;
    }

说明:只有当该节点的前驱结点的状态为SIGNAL时,才可以对该结点所封装的线程进行park操作。否则,将不能进行park操作。再看parkAndCheckInterrupt函数,源码如下

// 进行park操作并且返回该线程是否被中断
    private final boolean parkAndCheckInterrupt() {
        // 在许可可用之前禁用当前线程,并且设置了blocker
        LockSupport.park(this);
        return Thread.interrupted(); // 当前线程是否已被中断,并清除中断标记位
    }

 说明:parkAndCheckInterrupt函数里的逻辑是首先执行park操作,即禁用当前线程,然后返回该线程是否已经被中断。再看final块中的cancelAcquire函数,其源码如下

// 取消继续获取(资源)
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        // node为空,返回
        if (node == null)
            return;
        // 设置node结点的thread为空
        node.thread = null;
        // Skip cancelled predecessors
        // 保存node的前驱结点
        Node pred = node.prev;
        while (pred.waitStatus > 0) // 找到node前驱结点中第一个状态小于0的结点,即不为CANCELLED状态的结点
            node.prev = pred = pred.prev;
        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        // 获取pred结点的下一个结点
        Node predNext = pred.next;
        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        // 设置node结点的状态为CANCELLED
        node.waitStatus = Node.CANCELLED;
        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) { // node结点为尾结点,则设置尾结点为pred结点
            // 比较并设置pred结点的next节点为null
            compareAndSetNext(pred, predNext, null); 
        } else { // node结点不为尾结点,或者比较设置不成功
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) { // (pred结点不为头结点,并且pred结点的状态为SIGNAL)或者 
                                    // pred结点状态小于等于0,并且比较并设置等待状态为SIGNAL成功,并且pred结点所封装的线程不为空
                // 保存结点的后继
                Node next = node.next;
                if (next != null && next.waitStatus <= 0) // 后继不为空并且后继的状态小于等于0
                    compareAndSetNext(pred, predNext, next); // 比较并设置pred.next = next;
            } else {
                unparkSuccessor(node); // 释放node的前一个结点
            }
            node.next = node; // help GC
        }
    }

说明:该函数完成的功能就是取消当前线程对资源的获取,即设置该结点的状态为CANCELLED,接着我们再看unparkSuccessor函数,源码如下


// 释放后继结点
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        // 获取node结点的等待状态
        int ws = node.waitStatus;
        if (ws < 0) // 状态值小于0,为SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3
            // 比较并且设置结点等待状态,设置为0
            compareAndSetWaitStatus(node, ws, 0);
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        // 获取node节点的下一个结点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) { // 下一个结点为空或者下一个节点的等待状态大于0,即为CANCELLED
            // s赋值为空
            s = null; 
            // 从尾结点开始从后往前开始遍历
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0) // 找到等待状态小于等于0的结点,找到最前的状态小于等于0的结点
                    // 保存结点
                    s = t;
        }
        if (s != null) // 该结点不为为空,释放许可
            LockSupport.unpark(s.thread);
    }

 说明:该函数的作用就是为了释放node节点的后继结点。

  对于cancelAcquire与unparkSuccessor函数,如下示意图可以清晰的表示。

image.png

说明:其中node为参数,在执行完cancelAcquire函数后的效果就是unpark了s结点所包含的t4线程。


  现在,再来看acquireQueued函数的整个的逻辑。逻辑如下

  ① 判断结点的前驱是否为head并且是否成功获取(资源)。

  ② 若步骤①均满足,则设置结点为head,之后会判断是否finally模块,然后返回。

  ③ 若步骤①不满足,则判断是否需要park当前线程,是否需要park当前线程的逻辑是判断结点的前驱结点的状态是否为SIGNAL,若是,则park当前结点,否则,不进行park操作。


  ④ 若park了当前线程,之后某个线程对本线程unpark后,并且本线程也获得机会运行。那么,将会继续进行步骤①的判断。


2. release

  以独占模式释放对象,其源码如下

public final boolean release(int arg) {
        if (tryRelease(arg)) { // 释放成功
            // 保存头结点
            Node h = head; 
            if (h != null && h.waitStatus != 0) // 头结点不为空并且头结点状态不为0
                unparkSuccessor(h); //释放头结点的后继结点
            return true;
        }
        return false;
    }

 说明:其中,tryRelease的默认实现是抛出异常,需要具体的子类实现,如果tryRelease成功,那么如果头结点不为空并且头结点的状态不为0,则释放头结点的后继结点,unparkSuccessor函数已经分析过,不再累赘。


  对于其他函数我们也可以分析,与前面分析的函数大同小异,所以,不再累赘。

四、示例分析


  1. 示例一


  借助下面示例来分析AbstractQueuedSyncrhonizer内部的工作机制。示例源码如下  

package com.hust.grid.leesf.abstractqueuedsynchronizer;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class MyThread extends Thread {
    private Lock lock;
    public MyThread(String name, Lock lock) {
        super(name);
        this.lock = lock;
    }
    public void run () {
        lock.lock();
        try {
            System.out.println(Thread.currentThread() + " running");
        } finally {
            lock.unlock();
        }
    }
}
public class AbstractQueuedSynchonizerDemo {
    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        MyThread t1 = new MyThread("t1", lock);
        MyThread t2 = new MyThread("t2", lock);
        t1.start();
        t2.start();    
    }
}

运行结果(可能的一种):


Thread[t1,5,main] running
Thread[t2,5,main] running

结果分析:从示例可知,线程t1与t2共用了一把锁,即同一个lock。可能会存在如下一种时序。

image.png


说明:首先线程t1先执行lock.lock操作,然后t2执行lock.lock操作,然后t1执行lock.unlock操作,最后t2执行lock.unlock操作。基于这样的时序,分析AbstractQueuedSynchronizer内部的工作机制。


  ① t1线程调用lock.lock函数,其函数调用顺序如下,只给出了主要的函数调用。


image.png


说明:其中,前面的部分表示哪个类,后面是具体的类中的哪个方法,AQS表示AbstractQueuedSynchronizer类,AOS表示AbstractOwnableSynchronizer类。

  ② t2线程调用lock.lock函数,其函数调用顺序如下,只给出了主要的函数调用。

image.png

说明:经过一系列的函数调用,最后达到的状态是禁用t2线程,因为调用了LockSupport.lock。

  ③ t1线程调用lock.unlock,其函数调用顺序如下,只给出了主要的函数调用。

image.png

说明:t1线程中调用lock.unlock后,经过一系列的调用,最终的状态是释放了许可,因为调用了LockSupport.unpark。这时,t2线程就可以继续运行了。此时,会继续恢复t2线程运行环境,继续执行LockSupport.park后面的语句,即进一步调用如下。


image.png


 说明:在上一步调用了LockSupport.unpark后,t2线程恢复运行,则运行parkAndCheckInterrupt,之后,继续运行acquireQueued函数,最后达到的状态是头结点head与尾结点tail均指向了t2线程所在的结点,并且之前的头结点已经从sync队列中断开了。

  ④ t2线程调用lock.unlock,其函数调用顺序如下,只给出了主要的函数调用。


image.png


说明:t2线程执行lock.unlock后,最终达到的状态还是与之前的状态一样。

目录
相关文章
|
4月前
|
存储 安全 Java
【JDK 源码分析】ConcurrentHashMap 底层结构
【1月更文挑战第27天】【JDK 源码分析】ConcurrentHashMap 底层结构
|
4月前
|
Java
【JDK 源码分析】HashMap 操作方法
【1月更文挑战第27天】【JDK 源码分析】HashMap Put 元素 初始化
|
4月前
|
存储 网络协议 Java
【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)(二)
【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)
60 0
【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)(二)
|
4月前
|
Java
Integer类【JDK源码分析】
Integer类【JDK源码分析】
38 0
|
3月前
|
存储 算法 安全
JDK源码分析-HashMap
JDK源码分析-HashMap
|
4月前
|
存储 数据库
享元模式、在 JDK-Interger 的应用源码分析
享元模式(案例解析)、在 JDK-Interger 的应用源码分析
|
4月前
|
网络协议 Java Unix
【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)(一)
【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)
89 0
【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)(一)
|
4月前
|
存储 网络协议 Java
【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)
【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)
28 0
|
4月前
|
安全 Java
【JDK 源码分析】HashMap 线程安全问题分析
【1月更文挑战第27天】【JDK 源码分析】HashMap 线程安全问题分析
|
4月前
|
存储 Java
【JDK 源码分析】HashMap 底层结构
【1月更文挑战第27天】【JDK 源码分析】HashMap 底层结构