Java JUC PriorityBlockingQueue解析

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 无界阻塞队列 PriorityBlockingQueue

无界阻塞队列 PriorityBlockingQueue


介绍

PriorityBlockingQueue 是一个带有优先级无界阻塞队列,每次出队返回的都是优先级最高或者最低的元素。在内部是使用平衡二叉树堆实现,所以遍历元素不保证有序

默认使用对象的 compareTo 方法进行比较,如果需要自定义比较规则可以自定义 comparators。

1654830069798.png

该类图可以看到,PriorityBlockingQueue 内部有一个数组 queue,用来存放队列元素;size 用来存放元素个数;allocationSpinLock 是个自旋锁,使用CAS操作来保证同时只有一个线程来进行扩容队列,状态只有 0 和 1,0表示当前没有进行扩容,1表示正在扩容。由于是优先级队列,所以有一个比较器 comparator 用来比较大小,另外还有 lock 独占锁,notEmpty 条件变量来实现 take 方法的阻塞,由于是无界队列所以没有 notFull 条件变量,所以 put 是非阻塞的

//二叉树最小堆的实现
private transient Object[] queue;
private transient int size;
private transient volatile int allocationSpinLock;
private transient Comparator<? super E> comparator;
private final ReentrantLock lock;
private final Condition notEmpty;

在构造函数中,默认队列容量为11,默认比较器为 null,也就是默认使用元素的 compareTo 方法来确定优先级,所以队列元素必须实现 Comparable 接口。

private static final int DEFAULT_INITIAL_CAPACITY = 11;
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

offer 操作

offer 操作的作用是在队列中插入一个元素,由于是无界队列,所以一直返回 true。

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    //1. 如果当前元素个数 >= 队列容量 则扩容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        //2. 默认比较器为null
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            //3. 自定义比较器
            siftUpUsingComparator(n, e, array, cmp);
        //4. 队列元素数量增加1,并唤醒notEmpty条件队列中的一个阻塞线程
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

如上代码并不复杂,我们主要看看如何进行扩容和在内部建堆。


我们先看扩容逻辑:

private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        //1. CAS成功则扩容
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                //oldCap<64则扩容执行oldCap+2,否则扩容50%,并且最大值为MAX_ARRAY_SIZE
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
       //2. 第一个线程CAS成功后,第二线程进入这段代码,然后第二个线程让出CPU,尽量让第一个线程获取到锁,但得不到保证
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
}

tryGrow 的作用就是扩容,但是为什么要在扩容前释放锁,然后使用 CAS 控制只有一个线程可以扩容成功?


其实不释放锁也是 ok 的,也就是在扩容期间一直持有该锁,但是扩容需要时间,这段时间内占用锁的话那么其他线程在这个时候就不能进行出队和入队操作,降低了并发性。所以为了提高性能,使用 CAS 来控制只有一个线程可以进行扩容,并且在扩容前释放锁,进而让其他线程可以进行入队和出队操作。


扩容线程扩容完毕后会重置自旋锁变量 allocationSpinLock 为 0,这里并没有使用 UNSAFE 方法的 CAS 进行设置是因为同时只可能有一个线程获取到该锁,并且 allocationSpinLock 被修饰为了 volatile 的。


我们接着看建堆算法:

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    // 队列元素个数 > 0 则判断插入位置,否则直接入队
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

熟悉二叉堆的话,该段代码并不复杂,我们看下图具体结构:

image.png

首先我们看parent = (k - 1) >>> 1,首先 k - 1 就是拿到当前真正的下标的位置,随后 >>> 1拿到父节点的位置,该图我们得知,k = 7,执行(k - 1) >>> 1之后得到的parent = 3,根据下标我们知道是元素 6。


PriorityQueue 是一个完全二叉树,且不允许出现 null 节点,其父节点都比叶子节点小,这个是堆排序中的最小堆。二叉树存入数组的方式很简单,就是从上到下,从左到右。完全二叉树可以和数组中的位置一一对应:


  • 左叶子节点 = 父节点下标 * 2 + 1
  • 右叶子节点 = 父节点下标 * 2 + 2
  • 父节点 = (叶子节点 - 1) / 2


实际上就是将要插入的元素 x 和它的父节点元素 6 做对比,如果比父节点大就一直向上移动。


poll 操作

poll 操作的作用是获取队列内部堆树的根节点元素,如果队列为空,则返回 null。

public E poll() {
    final ReentrantLock lock = this.lock;
    //获取独占锁
    lock.lock();
    try {
        return dequeue();
    } finally {
        //释放独占锁
        lock.unlock();
    }
}

我们主要看一下 dequeue 方法。

private E dequeue() {
    int n = size - 1;
    //队列为空,返回null
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        //1.获取头部元素
        E result = (E) array[0];
        //2. 获取队尾元素,并赋值为null
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)//3.
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n; //4.
        return result;
    }
}

该方法如果队列为空则直接返回 null,否则执行代码(1)获取数组第一个元素作为返回值存放到变量 Result 中,这里需要注意,数组里面的第一个元素是优先级最小或者最大的元素,出队操作就是返回这个元素。然后代码(2)获取队列尾部元素并存放到变量 x 中,且置空尾部节点,然后执行代码(3)将变量 x 插入到数组下标为 0 的位置,之后重新调整堆为最大或者最小堆,然后返回。这里重要的是,去掉堆的根节点后,如何使用剩下的节点重新调整一个最大或者最小堆。下面我们看下 siftDownComparable 的实现。

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

由于队列数组第 0 个元素为根,因此出队时要移除它。这时数组就不再是最小的堆了,所以需要调整堆。具体是从被移除的树根的左右子树中找一个最小的值来当树根,左右子树又会找自己左右子树里面那个最小值,这是一个递归过程,直到叶子节点结束递归。

假设目前队列内容如下图:

image.png

上图中树根的 leftChildVal = 4; rightChildVal = 6;由于4 < 6,所以c = 4。然后由于11 > 4,也就是key > c,所以使用元素 4 覆盖树根节点的值。


然后树根的左子树树根的左右孩子节点中的 leftChildVal = 8; rightChildVal = 10;由于8 < 10,所以c = 8。然后由于11 > 8,也就是 key > c,所以元素 8 作为树根左子树的根节点,现在树的形状如下图第三步所示。这时候判断是否k < half,结果为 false,所以退出循环。然后把x = 11的元素设置到数组下标为3的地方,这时候堆树如下图第四步所示,至此调整堆完毕。

image.png

put 操作

put 操作内部调用的是 offer 操作,由于是无界队列,所以不需要阻塞。

public void put(E e) {
    offer(e); // never need to block
}

take 操作

take 操作的作用是获取队列内部堆树的根节点元素,如果队列为空则阻塞。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

size 操作

获取队列元素个数。如下代码在返回 size 前加了锁,以保证在调用 size 方法时不会有其他线程进行入队和出队操作。另外,由于 size 变量没有被修饰为 volatie 的,所以这里加锁也保证了在多线程下 size 变量的内存可见性。

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return size;
    } finally {
        lock.unlock();
    }
}

总结

PriorityBlockingQueue 类似于 ArrayBlockingQueue,在内部使用一个独占锁来控制同时只有一个线程可以进行入队和出队操作。另外,PriorityBlockingQueue 只使用了一个 notEmpty 条件变量而没有使用 notFull,因为是无界队列,执行 put 操作时永远不会处于 await 状态,所以也不需要被唤醒。而 take 方法是阻塞方法,并且是可被中断的。当需要存放有优先级的元素时该队列比较有用。

相关文章
|
3天前
|
自然语言处理 Java 应用服务中间件
Java 18 新特性解析
Java 18 新特性解析
|
4天前
|
存储 设计模式 Java
Java中的if-else语句:深入解析与应用实践
Java中的if-else语句:深入解析与应用实践
|
3天前
|
设计模式 存储 Java
掌握Java设计模式的23种武器(全):深入解析与实战示例
掌握Java设计模式的23种武器(全):深入解析与实战示例
|
3天前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践
|
8天前
|
安全 Java 数据处理
Java并发编程:解锁多线程的潜力
在数字化时代的浪潮中,Java作为一门广泛使用的编程语言,其并发编程能力是提升应用性能和响应速度的关键。本文将带你深入理解Java并发编程的核心概念,探索如何通过多线程技术有效利用计算资源,并实现高效的数据处理。我们将从基础出发,逐步揭开高效并发编程的面纱,让你的程序运行得更快、更稳、更强。
|
7天前
|
Java 开发者
奇迹时刻!探索 Java 多线程的奇幻之旅:Thread 类和 Runnable 接口的惊人对决
【8月更文挑战第13天】Java的多线程特性能显著提升程序性能与响应性。本文通过示例代码详细解析了两种核心实现方式:Thread类与Runnable接口。Thread类适用于简单场景,直接定义线程行为;Runnable接口则更适合复杂的项目结构,尤其在需要继承其他类时,能保持代码的清晰与模块化。理解两者差异有助于开发者在实际应用中做出合理选择,构建高效稳定的多线程程序。
28 7
|
6天前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
6天前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
1天前
|
Java
java开启线程的四种方法
这篇文章介绍了Java中开启线程的四种方法,包括继承Thread类、实现Runnable接口、实现Callable接口和创建线程池,每种方法都提供了代码实现和测试结果。
java开启线程的四种方法
|
3天前
|
存储 缓存 安全
深度剖析Java HashMap:源码分析、线程安全与最佳实践
深度剖析Java HashMap:源码分析、线程安全与最佳实践

推荐镜像

更多