【JUC】JDK1.8源码分析之AbstractQueuedSynchronizer(二)(3)

简介: 简介: 简介:  在锁框架中,AbstractQueuedSynchronizer抽象类可以毫不夸张的说,占据着核心地位,它提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架。所以很有必要好好分析

 2. 示例二


  下面我们结合Condition实现生产者与消费者,来进一步分析AbstractQueuedSynchronizer的内部工作机制。


  Depot(仓库)类

package com.hust.grid.leesf.reentrantLock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Depot {
    private int size;
    private int capacity;
    private Lock lock;
    private Condition fullCondition;
    private Condition emptyCondition;
    public Depot(int capacity) {
        this.capacity = capacity;    
        lock = new ReentrantLock();
        fullCondition = lock.newCondition();
        emptyCondition = lock.newCondition();
    }
    public void produce(int no) {
        lock.lock();
        int left = no;
        try {
            while (left > 0) {
                while (size >= capacity)  {
                    System.out.println(Thread.currentThread() + " before await");
                    fullCondition.await();
                    System.out.println(Thread.currentThread() + " after await");
                }
                int inc = (left + size) > capacity ? (capacity - size) : left;
                left -= inc;
                size += inc;
                System.out.println("produce = " + inc + ", size = " + size);
                emptyCondition.signal();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void consume(int no) {
        lock.lock();
        int left = no;
        try {            
            while (left > 0) {
                while (size <= 0) {
                    System.out.println(Thread.currentThread() + " before await");
                    emptyCondition.await();
                    System.out.println(Thread.currentThread() + " after await");
                }
                int dec = (size - left) > 0 ? left : size;
                left -= dec;
                size -= dec;
                System.out.println("consume = " + dec + ", size = " + size);
                fullCondition.signal();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

 测试类

package com.hust.grid.leesf.reentrantLock;
class Consumer {
    private Depot depot;
    public Consumer(Depot depot) {
        this.depot = depot;
    }
    public void consume(int no) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                depot.consume(no);
            }
        }, no + " consume thread").start();
    }
}
class Producer {
    private Depot depot;
    public Producer(Depot depot) {
        this.depot = depot;
    }
    public void produce(int no) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                depot.produce(no);
            }
        }, no + " produce thread").start();
    }
}
public class ReentrantLockDemo {
    public static void main(String[] args) throws InterruptedException {
        Depot depot = new Depot(500);
        new Producer(depot).produce(500);
        new Producer(depot).produce(200);
        new Consumer(depot).consume(500);
        new Consumer(depot).consume(200);
    }
}

 运行结果(可能的一种):

produce = 500, size = 500
Thread[200 produce thread,5,main] before await
consume = 500, size = 0
Thread[200 consume thread,5,main] before await
Thread[200 produce thread,5,main] after await
produce = 200, size = 200
Thread[200 consume thread,5,main] after await
consume = 200, size = 0

说明:根据结果,我们猜测一种可能的时序如下


image.png

说明:p1代表produce 500的那个线程,p2代表produce 200的那个线程,c1代表consume 500的那个线程,c2代表consume 200的那个线程。


  1. p1线程调用lock.lock,获得锁,继续运行,函数调用顺序在前面已经给出。


  2. p2线程调用lock.lock,由前面的分析可得到如下的最终状态。


image.png

 说明:p2线程调用lock.lock后,会禁止p2线程的继续运行,因为执行了LockSupport.park操作。

  3. c1线程调用lock.lock,由前面的分析得到如下的最终状态。


image.png

 说明:最终c1线程会在sync queue队列的尾部,并且其结点的前驱结点(包含p2的结点)的waitStatus变为了SIGNAL。


  4. c2线程调用lock.lock,由前面的分析得到如下的最终状态。

image.png


说明:最终c1线程会在sync queue队列的尾部,并且其结点的前驱结点(包含c1的结点)的waitStatus变为了SIGNAL。


  5. p1线程执行emptyCondition.signal,其函数调用顺序如下,只给出了主要的函数调用


image.png

说明:AQS.CO表示AbstractQueuedSynchronizer.ConditionObject类。此时调用signal方法不会产生任何其他效果。


  6. p1线程执行lock.unlock,根据前面的分析可知,最终的状态如下。


image.png

说明:此时,p2线程所在的结点为头结点,并且其他两个线程(c1、c2)依旧被禁止,所以,此时p2线程继续运行,执行用户逻辑。


  7. p2线程执行fullCondition.await,其函数调用顺序如下,只给出了主要的函数调用。

image.png

 说明:最终到达的状态是新生成了一个结点,包含了p2线程,此结点在condition queue中;并且sync queue中p2线程被禁止了,因为在执行了LockSupport.park操作。从函数一些调用可知,在await操作中线程会释放锁资源,供其他线程获取。同时,head结点后继结点的包含的线程的许可被释放了,故其可以继续运行。由于此时,只有c1线程可以运行,故运行c1。


  8. 继续运行c1线程,c1线程由于之前被park了,所以此时恢复,继续之前的步骤,即还是执行前面提到的acquireQueued函数,之后,c1判断自己的前驱结点为head,并且可以获取锁资源,最终到达的状态如下。


image.png


 说明:其中,head设置为包含c1线程的结点,c1继续运行。


  9. c1线程执行fullCondtion.signal,其函数调用顺序如下,只给出了主要的函数调用。


image.png


 说明:signal函数达到的最终结果是将包含p2线程的结点从condition queue中转移到sync queue中,之后condition queue为null,之前的尾结点的状态变为SIGNAL。


  10. c1线程执行lock.unlock操作,根据之前的分析,经历的状态变化如下。


image.png

 说明:最终c2线程会获取锁资源,继续运行用户逻辑。


  11. c2线程执行emptyCondition.await,由前面的第七步分析,可知最终的状态如下。


image.png


说明:await操作将会生成一个结点放入condition queue中与之前的一个condition queue是不相同的,并且unpark头结点后面的结点,即包含线程p2的结点。


  12. p2线程被unpark,故可以继续运行,经过CPU调度后,p2继续运行,之后p2线程在AQS:await函数中被park,继续AQS.CO:await函数的运行,其函数调用顺序如下,只给出了主要的函数调用。


image.png

 13. p2继续运行,执行emptyCondition.signal,根据第九步分析可知,最终到达的状态如下。

image.png


说明:最终,将condition queue中的结点转移到sync queue中,并添加至尾部,condition queue会为空,并且将head的状态设置为SIGNAL。


  14. p2线程执行lock.unlock操作,根据前面的分析可知,最后的到达的状态如下。


image.png

 说明:unlock操作会释放c2线程的许可,并且将头结点设置为c2线程所在的结点。

  15. c2线程继续运行,执行fullCondition. signal,由于此时fullCondition的condition queue已经不存在任何结点了,故其不会产生作用。


  16. c2执行lock.unlock,由于c2是sync队列中最后一个结点,故其不会再调用unparkSuccessor了,直接返回true。即整个流程就完成了。


五、总结


  对于AbstractQueuedSynchronizer的分析,最核心的就是sync queue的分析。


  ① 每一个结点都是由前一个结点唤醒


  ② 当结点发现前驱结点是head并且尝试获取成功,则会轮到该线程运行。


  ③ condition queue中的结点向sync queue中转移是通过signal操作完成的。


  ④ 当结点的状态为SIGNAL时,表示后面的结点需要运行。


  当然,此次分析没有涉及到中断操作,如果涉及到中断操作,又会复杂得多,以后遇到这种情况,我们再进行详细分析,AbstractQueuedSynchronizer类的设计令人叹为观止,以后有机会还会进行分析。也谢谢各位园友的观看~

 


目录
相关文章
|
4月前
|
存储 安全 Java
【JDK 源码分析】ConcurrentHashMap 底层结构
【1月更文挑战第27天】【JDK 源码分析】ConcurrentHashMap 底层结构
|
4月前
|
Java
【JDK 源码分析】HashMap 操作方法
【1月更文挑战第27天】【JDK 源码分析】HashMap Put 元素 初始化
|
4月前
|
存储 网络协议 Java
【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)(二)
【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)
60 0
【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)(二)
|
4月前
|
Java
Integer类【JDK源码分析】
Integer类【JDK源码分析】
38 0
|
3月前
|
存储 算法 安全
JDK源码分析-HashMap
JDK源码分析-HashMap
|
4月前
|
存储 数据库
享元模式、在 JDK-Interger 的应用源码分析
享元模式(案例解析)、在 JDK-Interger 的应用源码分析
|
4月前
|
网络协议 Java Unix
【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)(一)
【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)
89 0
【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)(一)
|
4月前
|
存储 网络协议 Java
【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)
【Java】BIO源码分析和改造(GraalVM JDK 11.0.19)
28 0
|
4月前
|
安全 Java
【JDK 源码分析】HashMap 线程安全问题分析
【1月更文挑战第27天】【JDK 源码分析】HashMap 线程安全问题分析
|
4月前
|
存储 Java
【JDK 源码分析】HashMap 底层结构
【1月更文挑战第27天】【JDK 源码分析】HashMap 底层结构