Java PriorityBlockingQueue 实现

简介: 本文着重介绍 Java 并发容器中 PriorityBlockingQueue 的实现方式。

引言

本文着重介绍 Java 并发容器中 PriorityBlockingQueue 的实现方式。所有关于 Java 并发的文章均收录于<Java并发系列文章>

PriorityBlockingQueue

PriorityBlockingQueue 是一个带排序功能的阻塞队列,因为它是一个队列,没必要保证整个队列的内部顺序,只需要保证出队时按照排序结果出即可,所以其内部使用了二分堆得形式实现,同时,PriorityBlockingQueue 也是线程安全的,内部通过一个锁来控制堆数据的维护。

PriorityBlockingQueue 的堆数据都保存在如下的 queue 数组中,堆的根节点是 queue[0] , 就像如下注释所说的,我们可以根据一个节点的下标 n 快速计算出它的两个子节点的下标,即 queue[2*n+1]queue[2*(n+1)] 。这是用数组来描述二分堆(树)的常见方法。

/**
 * Priority queue represented as a balanced binary heap: the two
 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
 * priority queue is ordered by comparator, or by the elements'
 * natural ordering, if comparator is null: For each node n in the
 * heap and each descendant d of n, n <= d.  The element with the
 * lowest value is in queue[0], assuming the queue is nonempty.
 */
private transient Object[] queue;

因为维护二分堆时,我们不需要保证整个堆内所有元素有序,只需要保证父子节点之间有序即可,所以当我们要插入一个元素时,直接将其插入到堆尾,然后通过其与父节点的关系,进行适当地父子节点换位,就能保证堆的性质。

/**
 * Inserts the specified element into this priority queue.
 * As the queue is unbounded, this method will never return {@code false}.
 *
 * @param e the element to add
 * @return {@code true} (as specified by {@link Queue#offer})
 * @throws ClassCastException if the specified element cannot be compared
 *         with elements currently in the priority queue according to the
 *         priority queue's ordering
 * @throws NullPointerException if the specified element is null
 */
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    // 最新节点的下标(n) 等于 size,确保容量没问题
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            // 通过比较函数,进行父子节点换位
            siftUpComparable(n, e, array);
        else
            // 通过比较函数,进行父子节点换位
            siftUpUsingComparator(n, e, array, cmp);
        // 修改 size 并唤醒等待中的出队操作
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

/**
 * Inserts item x at position k, maintaining heap invariant by
 * promoting x up the tree until it is greater than or equal to
 * its parent, or is the root.
 *
 * To simplify and speed up coercions and comparisons. the
 * Comparable and Comparator versions are separated into different
 * methods that are otherwise identical. (Similarly for siftDown.)
 * These methods are static, with heap state as arguments, to
 * simplify use in light of possible comparator exceptions.
 *
 * @param k the position to fill
 * @param x the item to insert
 * @param array the heap array
 */
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        // 计算父节点下标
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        // 和父节点比较,因为默认是升序(最小堆),所以当新添加的节点大于父节点时就停下来了
        if (key.compareTo((T) e) >= 0)
            break;
        // 新添加的节点小于父节点,那么父子节点换位,然后重复上述过程
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

当从 PriorityBlockingQueue 中执行出队操作时,直接提取下标0元素,然后用 queue 中的最后一个元素,接替 0 号元素的位置,自上而下地修正堆中元素的位置关系,使其满足堆的性质。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    // 加锁并循环出队,如果没有数据就在 notEmpty condition上等待
    try {
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

/**
 * Mechanics for poll().  Call only while holding lock.
 */
private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        // 出队 0 号元素
        E result = (E) array[0];
        // 接替者是 queue 的末尾元素
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            // 接替者下移
            siftDownComparable(0, x, array, n);
        else
            // 接替者下移
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

从下移节点的实现中,我们可以看到,它先会比较两个子节点,选取最小的节点最为下移的目标节点。然后通过比较器比较当前节点 x 和目标节点是否满足堆的性质,如果不满足则交换节点位置,并重复上述过程。

/**
 * Inserts item x at position k, maintaining heap invariant by
 * demoting x down the tree repeatedly until it is less than or
 * equal to its children or is a leaf.
 *
 * @param k the position to fill
 * @param x the item to insert
 * @param array the heap array
 * @param n heap size
 */
private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                // 比较左右两个节点,以最小节点作为下移的目标节点
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                // 父节点最小,停止
                break;
            // 子节点小,交换子节点和父节点,并重复上述过程
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析%E7%9B%B8%E5%85%B3%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/)
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理

相关文章
|
17小时前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解04-阻塞队列之PriorityBlockingQueue原理及扩容机制详解
1. **继承实现图关系**: - `PriorityBlockingQueue`实现了`BlockingQueue`接口,提供了线程安全的队列操作。 - 内部基于优先级堆(小顶堆或大顶堆)的数据结构实现,可以保证元素按照优先级顺序出队。 2. **底层数据存储结构**: - 默认容量是11,存储数据的数组会在需要时动态扩容。 - 数组长度总是2的幂,以满足堆的性质。 3. **构造器**: - 无参构造器创建一个默认容量的队列,元素需要实现`Comparable`接口。 - 指定容量构造器允许设置初始容量,但不指定排序规则。 - 可指定容量和比较
235 2
|
17小时前
|
存储 安全 Java
Java并发基础:PriorityBlockingQueue全面解析!
PriorityBlockingQueue类能高效处理优先级任务,确保高优先级任务优先执行,它内部基于优先级堆实现,保证了元素的有序性,同时,作为BlockingQueue接口的实现,它提供了线程安全的队列操作,适用于多线程环境下的任务调度与资源管理,简洁而强大的API使得开发者能轻松应对复杂的并发场景。
Java并发基础:PriorityBlockingQueue全面解析!
|
Java
Java 实现汉字按照首字母分组排序
Java 实现汉字按照首字母分组排序
572 0
|
12月前
|
Java
Java Review - 并发编程_PriorityBlockingQueue原理&源码剖析
Java Review - 并发编程_PriorityBlockingQueue原理&源码剖析
75 0
|
分布式计算 Java Hadoop
Java实现单词计数MapReduce
本文分享实现单词计数MapReduce的方法
302 0
|
Java 数据安全/隐私保护
JAVA 实现上传图片添加水印(详细版)(上)
JAVA 实现上传图片添加水印(详细版)
967 0
JAVA 实现上传图片添加水印(详细版)(上)
|
存储 Java
Java实现图书管理系统
本篇文章是对目前Java专栏已有内容的一个总结练习,希望各位小主们在学习完面向对象的知识后,可以阅览本篇文章后,自己也动手实现一个这样的demo来加深总结应用已经学到知识并进行巩固。
376 0
Java实现图书管理系统
|
Java Windows Spring
java实现spring boot项目启动时,重启Windows进程
java实现spring boot项目启动时,重启Windows进程
476 0
|
数据可视化 Java
Java实现拼图小游戏(1)—— JFrame的认识及界面搭建
如果要在某一个界面里面添加功能的话,都在一个类中,会显得代码难以阅读,而且修改起来也会很困难,所以我们将游戏主界面、登录界面、以及注册界面都单独编成一个类,每一个类都继承JFrame父类,并且在类中创建方法来来实现页面
462 0
Java实现拼图小游戏(1)—— JFrame的认识及界面搭建
|
网络协议 Java
Java网络编程:UDP/TCP实现实时聊天、上传图片、下载资源等
ip地址的分类: 1、ipv4、ipv6 127.0.0.1:4个字节组成,0-255,42亿;30亿都在北美,亚洲就只有4亿 2011年就用尽了。
Java网络编程:UDP/TCP实现实时聊天、上传图片、下载资源等