Java JUC PriorityBlockingQueue解析

简介: 无界阻塞队列 PriorityBlockingQueue

无界阻塞队列 PriorityBlockingQueue


介绍

PriorityBlockingQueue 是一个带有优先级无界阻塞队列,每次出队返回的都是优先级最高或者最低的元素。在内部是使用平衡二叉树堆实现,所以遍历元素不保证有序

默认使用对象的 compareTo 方法进行比较,如果需要自定义比较规则可以自定义 comparators。

1654830069798.png

该类图可以看到,PriorityBlockingQueue 内部有一个数组 queue,用来存放队列元素;size 用来存放元素个数;allocationSpinLock 是个自旋锁,使用CAS操作来保证同时只有一个线程来进行扩容队列,状态只有 0 和 1,0表示当前没有进行扩容,1表示正在扩容。由于是优先级队列,所以有一个比较器 comparator 用来比较大小,另外还有 lock 独占锁,notEmpty 条件变量来实现 take 方法的阻塞,由于是无界队列所以没有 notFull 条件变量,所以 put 是非阻塞的

//二叉树最小堆的实现
private transient Object[] queue;
private transient int size;
private transient volatile int allocationSpinLock;
private transient Comparator<? super E> comparator;
private final ReentrantLock lock;
private final Condition notEmpty;

在构造函数中,默认队列容量为11,默认比较器为 null,也就是默认使用元素的 compareTo 方法来确定优先级,所以队列元素必须实现 Comparable 接口。

private static final int DEFAULT_INITIAL_CAPACITY = 11;
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

offer 操作

offer 操作的作用是在队列中插入一个元素,由于是无界队列,所以一直返回 true。

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    //1. 如果当前元素个数 >= 队列容量 则扩容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        //2. 默认比较器为null
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            //3. 自定义比较器
            siftUpUsingComparator(n, e, array, cmp);
        //4. 队列元素数量增加1,并唤醒notEmpty条件队列中的一个阻塞线程
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

如上代码并不复杂,我们主要看看如何进行扩容和在内部建堆。


我们先看扩容逻辑:

private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        //1. CAS成功则扩容
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                //oldCap<64则扩容执行oldCap+2,否则扩容50%,并且最大值为MAX_ARRAY_SIZE
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
       //2. 第一个线程CAS成功后,第二线程进入这段代码,然后第二个线程让出CPU,尽量让第一个线程获取到锁,但得不到保证
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
}

tryGrow 的作用就是扩容,但是为什么要在扩容前释放锁,然后使用 CAS 控制只有一个线程可以扩容成功?


其实不释放锁也是 ok 的,也就是在扩容期间一直持有该锁,但是扩容需要时间,这段时间内占用锁的话那么其他线程在这个时候就不能进行出队和入队操作,降低了并发性。所以为了提高性能,使用 CAS 来控制只有一个线程可以进行扩容,并且在扩容前释放锁,进而让其他线程可以进行入队和出队操作。


扩容线程扩容完毕后会重置自旋锁变量 allocationSpinLock 为 0,这里并没有使用 UNSAFE 方法的 CAS 进行设置是因为同时只可能有一个线程获取到该锁,并且 allocationSpinLock 被修饰为了 volatile 的。


我们接着看建堆算法:

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    // 队列元素个数 > 0 则判断插入位置,否则直接入队
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

熟悉二叉堆的话,该段代码并不复杂,我们看下图具体结构:

image.png

首先我们看parent = (k - 1) >>> 1,首先 k - 1 就是拿到当前真正的下标的位置,随后 >>> 1拿到父节点的位置,该图我们得知,k = 7,执行(k - 1) >>> 1之后得到的parent = 3,根据下标我们知道是元素 6。


PriorityQueue 是一个完全二叉树,且不允许出现 null 节点,其父节点都比叶子节点小,这个是堆排序中的最小堆。二叉树存入数组的方式很简单,就是从上到下,从左到右。完全二叉树可以和数组中的位置一一对应:


  • 左叶子节点 = 父节点下标 * 2 + 1
  • 右叶子节点 = 父节点下标 * 2 + 2
  • 父节点 = (叶子节点 - 1) / 2


实际上就是将要插入的元素 x 和它的父节点元素 6 做对比,如果比父节点大就一直向上移动。


poll 操作

poll 操作的作用是获取队列内部堆树的根节点元素,如果队列为空,则返回 null。

public E poll() {
    final ReentrantLock lock = this.lock;
    //获取独占锁
    lock.lock();
    try {
        return dequeue();
    } finally {
        //释放独占锁
        lock.unlock();
    }
}

我们主要看一下 dequeue 方法。

private E dequeue() {
    int n = size - 1;
    //队列为空,返回null
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        //1.获取头部元素
        E result = (E) array[0];
        //2. 获取队尾元素,并赋值为null
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)//3.
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n; //4.
        return result;
    }
}

该方法如果队列为空则直接返回 null,否则执行代码(1)获取数组第一个元素作为返回值存放到变量 Result 中,这里需要注意,数组里面的第一个元素是优先级最小或者最大的元素,出队操作就是返回这个元素。然后代码(2)获取队列尾部元素并存放到变量 x 中,且置空尾部节点,然后执行代码(3)将变量 x 插入到数组下标为 0 的位置,之后重新调整堆为最大或者最小堆,然后返回。这里重要的是,去掉堆的根节点后,如何使用剩下的节点重新调整一个最大或者最小堆。下面我们看下 siftDownComparable 的实现。

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

由于队列数组第 0 个元素为根,因此出队时要移除它。这时数组就不再是最小的堆了,所以需要调整堆。具体是从被移除的树根的左右子树中找一个最小的值来当树根,左右子树又会找自己左右子树里面那个最小值,这是一个递归过程,直到叶子节点结束递归。

假设目前队列内容如下图:

image.png

上图中树根的 leftChildVal = 4; rightChildVal = 6;由于4 < 6,所以c = 4。然后由于11 > 4,也就是key > c,所以使用元素 4 覆盖树根节点的值。


然后树根的左子树树根的左右孩子节点中的 leftChildVal = 8; rightChildVal = 10;由于8 < 10,所以c = 8。然后由于11 > 8,也就是 key > c,所以元素 8 作为树根左子树的根节点,现在树的形状如下图第三步所示。这时候判断是否k < half,结果为 false,所以退出循环。然后把x = 11的元素设置到数组下标为3的地方,这时候堆树如下图第四步所示,至此调整堆完毕。

image.png

put 操作

put 操作内部调用的是 offer 操作,由于是无界队列,所以不需要阻塞。

public void put(E e) {
    offer(e); // never need to block
}

take 操作

take 操作的作用是获取队列内部堆树的根节点元素,如果队列为空则阻塞。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

size 操作

获取队列元素个数。如下代码在返回 size 前加了锁,以保证在调用 size 方法时不会有其他线程进行入队和出队操作。另外,由于 size 变量没有被修饰为 volatie 的,所以这里加锁也保证了在多线程下 size 变量的内存可见性。

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return size;
    } finally {
        lock.unlock();
    }
}

总结

PriorityBlockingQueue 类似于 ArrayBlockingQueue,在内部使用一个独占锁来控制同时只有一个线程可以进行入队和出队操作。另外,PriorityBlockingQueue 只使用了一个 notEmpty 条件变量而没有使用 notFull,因为是无界队列,执行 put 操作时永远不会处于 await 状态,所以也不需要被唤醒。而 take 方法是阻塞方法,并且是可被中断的。当需要存放有优先级的元素时该队列比较有用。

相关文章
|
1月前
|
机器学习/深度学习 JSON Java
Java调用Python的5种实用方案:从简单到进阶的全场景解析
在机器学习与大数据融合背景下,Java与Python协同开发成为企业常见需求。本文通过真实案例解析5种主流调用方案,涵盖脚本调用到微服务架构,助力开发者根据业务场景选择最优方案,提升开发效率与系统性能。
329 0
|
1月前
|
Java
Java的CAS机制深度解析
CAS(Compare-And-Swap)是并发编程中的原子操作,用于实现多线程环境下的无锁数据同步。它通过比较内存值与预期值,决定是否更新值,从而避免锁的使用。CAS广泛应用于Java的原子类和并发包中,如AtomicInteger和ConcurrentHashMap,提升了并发性能。尽管CAS具有高性能、无死锁等优点,但也存在ABA问题、循环开销大及仅支持单变量原子操作等缺点。合理使用CAS,结合实际场景选择同步机制,能有效提升程序性能。
|
29天前
|
Java 开发者
Java并发编程:CountDownLatch实战解析
Java并发编程:CountDownLatch实战解析
351 100
|
2月前
|
存储 缓存 Java
Java数组全解析:一维、多维与内存模型
本文深入解析Java数组的内存布局与操作技巧,涵盖一维及多维数组的声明、初始化、内存模型,以及数组常见陷阱和性能优化。通过图文结合的方式帮助开发者彻底理解数组本质,并提供Arrays工具类的实用方法与面试高频问题解析,助你掌握数组核心知识,避免常见错误。
|
前端开发 Java C++
JUC系列之《CompletableFuture:Java异步编程的终极武器》
本文深入解析Java 8引入的CompletableFuture,对比传统Future的局限,详解其非阻塞回调、链式编排、多任务组合及异常处理等核心功能,结合实战示例展示异步编程的最佳实践,助你构建高效、响应式的Java应用。
|
12天前
|
设计模式 算法 安全
JUC系列之《深入理解AQS:Java并发锁的基石与灵魂 》
本文深入解析Java并发核心组件AQS(AbstractQueuedSynchronizer),从其设计动机、核心思想到源码实现,系统阐述了AQS如何通过state状态、CLH队列和模板方法模式构建通用同步框架,并结合独占与共享模式分析典型应用,最后通过自定义锁的实战案例,帮助读者掌握其原理与最佳实践。
|
12天前
|
缓存 安全 Java
JUC系列《深入浅出Java并发容器:CopyOnWriteArrayList全解析》
CopyOnWriteArrayList是Java中基于“写时复制”实现的线程安全List,读操作无锁、性能高,适合读多写少场景,如配置管理、事件监听器等,但频繁写入时因复制开销大需谨慎使用。
|
25天前
|
Java 开发者
Java 函数式编程全解析:静态方法引用、实例方法引用、特定类型方法引用与构造器引用实战教程
本文介绍Java 8函数式编程中的四种方法引用:静态、实例、特定类型及构造器引用,通过简洁示例演示其用法,帮助开发者提升代码可读性与简洁性。
|
1月前
|
安全 Java API
Java SE 与 Java EE 区别解析及应用场景对比
在Java编程世界中,Java SE(Java Standard Edition)和Java EE(Java Enterprise Edition)是两个重要的平台版本,它们各自有着独特的定位和应用场景。理解它们之间的差异,对于开发者选择合适的技术栈进行项目开发至关重要。
148 1
|
2月前
|
存储 缓存 算法
Java数据类型与运算符深度解析
本文深入解析Java中容易混淆的基础知识,包括八大基本数据类型(如int、Integer)、自动装箱与拆箱机制,以及运算符(如&与&&)的使用区别。通过代码示例剖析内存布局、取值范围及常见陷阱,帮助开发者写出更高效、健壮的代码,并附有面试高频问题解析,夯实基础。

推荐镜像

更多
  • DNS