Java JUC PriorityBlockingQueue解析

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: 无界阻塞队列 PriorityBlockingQueue

无界阻塞队列 PriorityBlockingQueue


介绍

PriorityBlockingQueue 是一个带有优先级无界阻塞队列,每次出队返回的都是优先级最高或者最低的元素。在内部是使用平衡二叉树堆实现,所以遍历元素不保证有序

默认使用对象的 compareTo 方法进行比较,如果需要自定义比较规则可以自定义 comparators。

1654830069798.png

该类图可以看到,PriorityBlockingQueue 内部有一个数组 queue,用来存放队列元素;size 用来存放元素个数;allocationSpinLock 是个自旋锁,使用CAS操作来保证同时只有一个线程来进行扩容队列,状态只有 0 和 1,0表示当前没有进行扩容,1表示正在扩容。由于是优先级队列,所以有一个比较器 comparator 用来比较大小,另外还有 lock 独占锁,notEmpty 条件变量来实现 take 方法的阻塞,由于是无界队列所以没有 notFull 条件变量,所以 put 是非阻塞的

//二叉树最小堆的实现
private transient Object[] queue;
private transient int size;
private transient volatile int allocationSpinLock;
private transient Comparator<? super E> comparator;
private final ReentrantLock lock;
private final Condition notEmpty;

在构造函数中,默认队列容量为11,默认比较器为 null,也就是默认使用元素的 compareTo 方法来确定优先级,所以队列元素必须实现 Comparable 接口。

private static final int DEFAULT_INITIAL_CAPACITY = 11;
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

offer 操作

offer 操作的作用是在队列中插入一个元素,由于是无界队列,所以一直返回 true。

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    //1. 如果当前元素个数 >= 队列容量 则扩容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        //2. 默认比较器为null
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            //3. 自定义比较器
            siftUpUsingComparator(n, e, array, cmp);
        //4. 队列元素数量增加1,并唤醒notEmpty条件队列中的一个阻塞线程
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

如上代码并不复杂,我们主要看看如何进行扩容和在内部建堆。


我们先看扩容逻辑:

private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        //1. CAS成功则扩容
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                //oldCap<64则扩容执行oldCap+2,否则扩容50%,并且最大值为MAX_ARRAY_SIZE
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
       //2. 第一个线程CAS成功后,第二线程进入这段代码,然后第二个线程让出CPU,尽量让第一个线程获取到锁,但得不到保证
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
}

tryGrow 的作用就是扩容,但是为什么要在扩容前释放锁,然后使用 CAS 控制只有一个线程可以扩容成功?


其实不释放锁也是 ok 的,也就是在扩容期间一直持有该锁,但是扩容需要时间,这段时间内占用锁的话那么其他线程在这个时候就不能进行出队和入队操作,降低了并发性。所以为了提高性能,使用 CAS 来控制只有一个线程可以进行扩容,并且在扩容前释放锁,进而让其他线程可以进行入队和出队操作。


扩容线程扩容完毕后会重置自旋锁变量 allocationSpinLock 为 0,这里并没有使用 UNSAFE 方法的 CAS 进行设置是因为同时只可能有一个线程获取到该锁,并且 allocationSpinLock 被修饰为了 volatile 的。


我们接着看建堆算法:

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    // 队列元素个数 > 0 则判断插入位置,否则直接入队
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

熟悉二叉堆的话,该段代码并不复杂,我们看下图具体结构:

image.png

首先我们看parent = (k - 1) >>> 1,首先 k - 1 就是拿到当前真正的下标的位置,随后 >>> 1拿到父节点的位置,该图我们得知,k = 7,执行(k - 1) >>> 1之后得到的parent = 3,根据下标我们知道是元素 6。


PriorityQueue 是一个完全二叉树,且不允许出现 null 节点,其父节点都比叶子节点小,这个是堆排序中的最小堆。二叉树存入数组的方式很简单,就是从上到下,从左到右。完全二叉树可以和数组中的位置一一对应:


  • 左叶子节点 = 父节点下标 * 2 + 1
  • 右叶子节点 = 父节点下标 * 2 + 2
  • 父节点 = (叶子节点 - 1) / 2


实际上就是将要插入的元素 x 和它的父节点元素 6 做对比,如果比父节点大就一直向上移动。


poll 操作

poll 操作的作用是获取队列内部堆树的根节点元素,如果队列为空,则返回 null。

public E poll() {
    final ReentrantLock lock = this.lock;
    //获取独占锁
    lock.lock();
    try {
        return dequeue();
    } finally {
        //释放独占锁
        lock.unlock();
    }
}

我们主要看一下 dequeue 方法。

private E dequeue() {
    int n = size - 1;
    //队列为空,返回null
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        //1.获取头部元素
        E result = (E) array[0];
        //2. 获取队尾元素,并赋值为null
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)//3.
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n; //4.
        return result;
    }
}

该方法如果队列为空则直接返回 null,否则执行代码(1)获取数组第一个元素作为返回值存放到变量 Result 中,这里需要注意,数组里面的第一个元素是优先级最小或者最大的元素,出队操作就是返回这个元素。然后代码(2)获取队列尾部元素并存放到变量 x 中,且置空尾部节点,然后执行代码(3)将变量 x 插入到数组下标为 0 的位置,之后重新调整堆为最大或者最小堆,然后返回。这里重要的是,去掉堆的根节点后,如何使用剩下的节点重新调整一个最大或者最小堆。下面我们看下 siftDownComparable 的实现。

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

由于队列数组第 0 个元素为根,因此出队时要移除它。这时数组就不再是最小的堆了,所以需要调整堆。具体是从被移除的树根的左右子树中找一个最小的值来当树根,左右子树又会找自己左右子树里面那个最小值,这是一个递归过程,直到叶子节点结束递归。

假设目前队列内容如下图:

image.png

上图中树根的 leftChildVal = 4; rightChildVal = 6;由于4 < 6,所以c = 4。然后由于11 > 4,也就是key > c,所以使用元素 4 覆盖树根节点的值。


然后树根的左子树树根的左右孩子节点中的 leftChildVal = 8; rightChildVal = 10;由于8 < 10,所以c = 8。然后由于11 > 8,也就是 key > c,所以元素 8 作为树根左子树的根节点,现在树的形状如下图第三步所示。这时候判断是否k < half,结果为 false,所以退出循环。然后把x = 11的元素设置到数组下标为3的地方,这时候堆树如下图第四步所示,至此调整堆完毕。

image.png

put 操作

put 操作内部调用的是 offer 操作,由于是无界队列,所以不需要阻塞。

public void put(E e) {
    offer(e); // never need to block
}

take 操作

take 操作的作用是获取队列内部堆树的根节点元素,如果队列为空则阻塞。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

size 操作

获取队列元素个数。如下代码在返回 size 前加了锁,以保证在调用 size 方法时不会有其他线程进行入队和出队操作。另外,由于 size 变量没有被修饰为 volatie 的,所以这里加锁也保证了在多线程下 size 变量的内存可见性。

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return size;
    } finally {
        lock.unlock();
    }
}

总结

PriorityBlockingQueue 类似于 ArrayBlockingQueue,在内部使用一个独占锁来控制同时只有一个线程可以进行入队和出队操作。另外,PriorityBlockingQueue 只使用了一个 notEmpty 条件变量而没有使用 notFull,因为是无界队列,执行 put 操作时永远不会处于 await 状态,所以也不需要被唤醒。而 take 方法是阻塞方法,并且是可被中断的。当需要存放有优先级的元素时该队列比较有用。

相关文章
|
12天前
|
Java 编译器
Java 泛型详细解析
本文将带你详细解析 Java 泛型,了解泛型的原理、常见的使用方法以及泛型的局限性,让你对泛型有更深入的了解。
26 2
Java 泛型详细解析
|
10天前
|
存储 算法 Java
Java内存管理深度解析####
本文深入探讨了Java虚拟机(JVM)中的内存分配与垃圾回收机制,揭示了其高效管理内存的奥秘。文章首先概述了JVM内存模型,随后详细阐述了堆、栈、方法区等关键区域的作用及管理策略。在垃圾回收部分,重点介绍了标记-清除、复制算法、标记-整理等多种回收算法的工作原理及其适用场景,并通过实际案例分析了不同GC策略对应用性能的影响。对于开发者而言,理解这些原理有助于编写出更加高效、稳定的Java应用程序。 ####
|
10天前
|
存储 监控 算法
Java虚拟机(JVM)垃圾回收机制深度解析与优化策略####
本文旨在深入探讨Java虚拟机(JVM)的垃圾回收机制,揭示其工作原理、常见算法及参数调优方法。通过剖析垃圾回收的生命周期、内存区域划分以及GC日志分析,为开发者提供一套实用的JVM垃圾回收优化指南,助力提升Java应用的性能与稳定性。 ####
|
12天前
|
Java 数据库连接 开发者
Java中的异常处理机制:深入解析与最佳实践####
本文旨在为Java开发者提供一份关于异常处理机制的全面指南,从基础概念到高级技巧,涵盖try-catch结构、自定义异常、异常链分析以及最佳实践策略。不同于传统的摘要概述,本文将以一个实际项目案例为线索,逐步揭示如何高效地管理运行时错误,提升代码的健壮性和可维护性。通过对比常见误区与优化方案,读者将获得编写更加健壮Java应用程序的实用知识。 --- ####
|
13天前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
11天前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
13天前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####
|
7天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
7天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
24 3
|
8天前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####

推荐镜像

更多