Java PriorityBlockingQueue 实现

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 本文着重介绍 Java 并发容器中 PriorityBlockingQueue 的实现方式。

引言

本文着重介绍 Java 并发容器中 PriorityBlockingQueue 的实现方式。所有关于 Java 并发的文章均收录于<Java并发系列文章>

PriorityBlockingQueue

PriorityBlockingQueue 是一个带排序功能的阻塞队列,因为它是一个队列,没必要保证整个队列的内部顺序,只需要保证出队时按照排序结果出即可,所以其内部使用了二分堆得形式实现,同时,PriorityBlockingQueue 也是线程安全的,内部通过一个锁来控制堆数据的维护。

PriorityBlockingQueue 的堆数据都保存在如下的 queue 数组中,堆的根节点是 queue[0] , 就像如下注释所说的,我们可以根据一个节点的下标 n 快速计算出它的两个子节点的下标,即 queue[2*n+1]queue[2*(n+1)] 。这是用数组来描述二分堆(树)的常见方法。

/**
 * Priority queue represented as a balanced binary heap: the two
 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
 * priority queue is ordered by comparator, or by the elements'
 * natural ordering, if comparator is null: For each node n in the
 * heap and each descendant d of n, n <= d.  The element with the
 * lowest value is in queue[0], assuming the queue is nonempty.
 */
private transient Object[] queue;

因为维护二分堆时,我们不需要保证整个堆内所有元素有序,只需要保证父子节点之间有序即可,所以当我们要插入一个元素时,直接将其插入到堆尾,然后通过其与父节点的关系,进行适当地父子节点换位,就能保证堆的性质。

/**
 * Inserts the specified element into this priority queue.
 * As the queue is unbounded, this method will never return {@code false}.
 *
 * @param e the element to add
 * @return {@code true} (as specified by {@link Queue#offer})
 * @throws ClassCastException if the specified element cannot be compared
 *         with elements currently in the priority queue according to the
 *         priority queue's ordering
 * @throws NullPointerException if the specified element is null
 */
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    // 最新节点的下标(n) 等于 size,确保容量没问题
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            // 通过比较函数,进行父子节点换位
            siftUpComparable(n, e, array);
        else
            // 通过比较函数,进行父子节点换位
            siftUpUsingComparator(n, e, array, cmp);
        // 修改 size 并唤醒等待中的出队操作
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

/**
 * Inserts item x at position k, maintaining heap invariant by
 * promoting x up the tree until it is greater than or equal to
 * its parent, or is the root.
 *
 * To simplify and speed up coercions and comparisons. the
 * Comparable and Comparator versions are separated into different
 * methods that are otherwise identical. (Similarly for siftDown.)
 * These methods are static, with heap state as arguments, to
 * simplify use in light of possible comparator exceptions.
 *
 * @param k the position to fill
 * @param x the item to insert
 * @param array the heap array
 */
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        // 计算父节点下标
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        // 和父节点比较,因为默认是升序(最小堆),所以当新添加的节点大于父节点时就停下来了
        if (key.compareTo((T) e) >= 0)
            break;
        // 新添加的节点小于父节点,那么父子节点换位,然后重复上述过程
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

当从 PriorityBlockingQueue 中执行出队操作时,直接提取下标0元素,然后用 queue 中的最后一个元素,接替 0 号元素的位置,自上而下地修正堆中元素的位置关系,使其满足堆的性质。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    // 加锁并循环出队,如果没有数据就在 notEmpty condition上等待
    try {
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

/**
 * Mechanics for poll().  Call only while holding lock.
 */
private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        // 出队 0 号元素
        E result = (E) array[0];
        // 接替者是 queue 的末尾元素
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            // 接替者下移
            siftDownComparable(0, x, array, n);
        else
            // 接替者下移
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

从下移节点的实现中,我们可以看到,它先会比较两个子节点,选取最小的节点最为下移的目标节点。然后通过比较器比较当前节点 x 和目标节点是否满足堆的性质,如果不满足则交换节点位置,并重复上述过程。

/**
 * Inserts item x at position k, maintaining heap invariant by
 * demoting x down the tree repeatedly until it is less than or
 * equal to its children or is a leaf.
 *
 * @param k the position to fill
 * @param x the item to insert
 * @param array the heap array
 * @param n heap size
 */
private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                // 比较左右两个节点,以最小节点作为下移的目标节点
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                // 父节点最小,停止
                break;
            // 子节点小,交换子节点和父节点,并重复上述过程
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析%E7%9B%B8%E5%85%B3%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/)
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理

相关文章
|
6月前
|
设计模式 Java
Java面试题:什么是观察者模式以及如何在Java中实现?
Java面试题:什么是观察者模式以及如何在Java中实现?
44 0
|
6月前
|
Java
【Java】已解决java.util.ConcurrentModificationException异常
【Java】已解决java.util.ConcurrentModificationException异常
116 0
|
7月前
|
存储 缓存 Java
【Java】Java中的引用类型(全面解读)
【Java】Java中的引用类型(全面解读)
79 0
|
8月前
|
存储 缓存 Java
【JAVA】深入了解 Java 中的 DelayQueue
【JAVA】深入了解 Java 中的 DelayQueue
|
Java
java202303java学习笔记第三十九天自定义线程池详解1
java202303java学习笔记第三十九天自定义线程池详解1
63 0
【java常见的面试题】java常见的集合类有哪些
Java基础的面试题java常见的集合类有哪些
|
安全 Java 容器
Java 中的FutureTask
Java 中的FutureTask
java202303java学习笔记第三十一天ArrayList源码分析2
java202303java学习笔记第三十一天ArrayList源码分析2
57 0
java202303java学习笔记第二十三天-初识内部类1
java202303java学习笔记第二十三天-初识内部类1
69 0
java202303java学习笔记第三十一天hashset和linkhashset区别4
java202303java学习笔记第三十一天hashset和linkhashset区别4
67 0

热门文章

最新文章