引言
本文着重介绍 Java 并发容器中 PriorityBlockingQueue 的实现方式。所有关于 Java 并发的文章均收录于<Java并发系列文章>。
PriorityBlockingQueue
PriorityBlockingQueue 是一个带排序功能的阻塞队列,因为它是一个队列,没必要保证整个队列的内部顺序,只需要保证出队时按照排序结果出即可,所以其内部使用了二分堆得形式实现,同时,PriorityBlockingQueue 也是线程安全的,内部通过一个锁来控制堆数据的维护。
PriorityBlockingQueue 的堆数据都保存在如下的 queue 数组中,堆的根节点是 queue[0]
, 就像如下注释所说的,我们可以根据一个节点的下标 n 快速计算出它的两个子节点的下标,即 queue[2*n+1]
和 queue[2*(n+1)]
。这是用数组来描述二分堆(树)的常见方法。
/**
* Priority queue represented as a balanced binary heap: the two
* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
* priority queue is ordered by comparator, or by the elements'
* natural ordering, if comparator is null: For each node n in the
* heap and each descendant d of n, n <= d. The element with the
* lowest value is in queue[0], assuming the queue is nonempty.
*/
private transient Object[] queue;
因为维护二分堆时,我们不需要保证整个堆内所有元素有序,只需要保证父子节点之间有序即可,所以当我们要插入一个元素时,直接将其插入到堆尾,然后通过其与父节点的关系,进行适当地父子节点换位,就能保证堆的性质。
/**
* Inserts the specified element into this priority queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @param e the element to add
* @return {@code true} (as specified by {@link Queue#offer})
* @throws ClassCastException if the specified element cannot be compared
* with elements currently in the priority queue according to the
* priority queue's ordering
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
// 最新节点的下标(n) 等于 size,确保容量没问题
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 通过比较函数,进行父子节点换位
siftUpComparable(n, e, array);
else
// 通过比较函数,进行父子节点换位
siftUpUsingComparator(n, e, array, cmp);
// 修改 size 并唤醒等待中的出队操作
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
/**
* Inserts item x at position k, maintaining heap invariant by
* promoting x up the tree until it is greater than or equal to
* its parent, or is the root.
*
* To simplify and speed up coercions and comparisons. the
* Comparable and Comparator versions are separated into different
* methods that are otherwise identical. (Similarly for siftDown.)
* These methods are static, with heap state as arguments, to
* simplify use in light of possible comparator exceptions.
*
* @param k the position to fill
* @param x the item to insert
* @param array the heap array
*/
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
// 计算父节点下标
int parent = (k - 1) >>> 1;
Object e = array[parent];
// 和父节点比较,因为默认是升序(最小堆),所以当新添加的节点大于父节点时就停下来了
if (key.compareTo((T) e) >= 0)
break;
// 新添加的节点小于父节点,那么父子节点换位,然后重复上述过程
array[k] = e;
k = parent;
}
array[k] = key;
}
当从 PriorityBlockingQueue 中执行出队操作时,直接提取下标0元素,然后用 queue 中的最后一个元素,接替 0 号元素的位置,自上而下地修正堆中元素的位置关系,使其满足堆的性质。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
// 加锁并循环出队,如果没有数据就在 notEmpty condition上等待
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
/**
* Mechanics for poll(). Call only while holding lock.
*/
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
// 出队 0 号元素
E result = (E) array[0];
// 接替者是 queue 的末尾元素
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 接替者下移
siftDownComparable(0, x, array, n);
else
// 接替者下移
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
从下移节点的实现中,我们可以看到,它先会比较两个子节点,选取最小的节点最为下移的目标节点。然后通过比较器比较当前节点 x 和目标节点是否满足堆的性质,如果不满足则交换节点位置,并重复上述过程。
/**
* Inserts item x at position k, maintaining heap invariant by
* demoting x down the tree repeatedly until it is less than or
* equal to its children or is a leaf.
*
* @param k the position to fill
* @param x the item to insert
* @param array the heap array
* @param n heap size
*/
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
// 比较左右两个节点,以最小节点作为下移的目标节点
c = array[child = right];
if (key.compareTo((T) c) <= 0)
// 父节点最小,停止
break;
// 子节点小,交换子节点和父节点,并重复上述过程
array[k] = c;
k = child;
}
array[k] = key;
}
}
文章说明
更多有价值的文章均收录于贝贝猫的文章目录
版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。
参考内容
[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析%E7%9B%B8%E5%85%B3%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/)
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理