PriorityBlockingQueue 是什么?
PriorityBlockingQueue 是一个支持优先级的无界阻塞队列,直到系统资源耗尽。默认情况下元素采用自然顺序升序排列。也可以自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。但需要注意的是不能保证同优先级元素的顺序。 PriorityBlockingQueue 也是基于最小二叉堆实现,使用基于 CAS 实现的 **自旋锁 **来控制队列的动态扩容,保证了扩容操作不会阻塞 take 操作的执行。
二叉堆
一颗完全二叉树,它非常适合用数组进行存储,它具有如下的两个特点:
- 对于数组中的元素 a[i],其左子节点为 a[2i+1],右子节点为 a[2i + 2],其父节点为 a[(i-1)/2]。
- 其堆序性质为,每个节点的值都小于其左右子节点的值。二叉堆中最小的值就是根节点,但是删除根节点是比较麻烦的,因为需要调整树。
一个间的二叉堆如图所示(最小二叉堆):
回到我们本次的主题 PriorityBlockingQueue 队列,下面我们将从它的初始化过程,入队、出队、拓容等几个方面展开描述它的实现和原理。
初始化过程
成员变量
PriortyBlockingQueue 成员变量解析:
// 初始容量 11 private static final int DEFAULT_INITIAL_CAPACITY = 11; // 最大容量 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; // 存储队列元素的数组 private transient Object[] queue; // 队列中元素的个数 private transient int size; // 比较器 private transient Comparator<? super E> comparator; // 用于所有公共操作的锁 private final ReentrantLock lock; // 不为空的条件 private final Condition notEmpty; // 用于分配的自旋锁,通过CAS获取,保证只有一个线程可以拓容。 private transient volatile int allocationSpinLock; // 仅用于序列化的普通优先级队列 // 保持与以前版本的兼容性 private PriorityQueue<E> q;
构造方法
该类有四个构造方法,主要是对队列的初始化,我们下面直接看 PriorityBlockingQueue(Collection<? **extends **E> c)
构造方法:
public PriorityBlockingQueue(Collection<? extends E> c) { // 初始化全局锁 this.lock = new ReentrantLock(); // 非空条件初始化 this.notEmpty = lock.newCondition(); // 如果不知道堆的顺序为 true boolean heapify = true; // true if not known to be in heap order // 如果元素不允许为空为 true boolean screen = true; // true if must screen for nulls if (c instanceof SortedSet<?>) { // 有序 set, 对 this.comparator 初始化 SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue<?>) { // 如果为 PriorityBlockingQueue 类型,初始化比较器,以及设置允许为空 PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } // 集合转换为数组 Object[] a = c.toArray(); // n 为数据的真实长度 int n = a.length; // 如果是 ArrayList if (c.getClass() != java.util.ArrayList.class) a = Arrays.copyOf(a, n, Object[].class); // 如果指定集合不属于属于SortedSet类型或者子类 或者 不属于PriorityBlockingQueue类型或者子类 // 并且 n为1 或者 比较器不为null // 那么需要检测null if (screen && (n == 1 || this.comparator != null)) { // 检查 null for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } // 数据赋值给 this.queue this.queue = a; // 队列长度为 n this.size = n; // 堆排序 if (heapify) heapify(); }
heapify 堆排序
heapify()
方法的整体逻辑就是一个堆排序的过程,排序对象是数组的0-(n/2-1)之间的元素。整个排序的核心逻辑就是父节点和左右节点三者进行比较,三者中最小的元素上浮。这个过程从(n/2-1)的尾部元素开始到顶部元素进行排序的,所以我们可以理解为先保证底部元素有序后再逐步往顶部走。
private void heapify() { // 数据 Object[] array = queue; // 数据长度 int n = size; // 前半段最大索引 int half = (n >>> 1) - 1; // 比较器 Comparator<? super E> cmp = comparator; if (cmp == null) { // 比较器为空 for (int i = half; i >= 0; i--) siftDownComparable(i, (E) array[i], array, n); } else { // 比较器不为空 for (int i = half; i >= 0; i--) siftDownUsingComparator(i, (E) array[i], array, n, cmp); } } private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; // loop while a non-leaf while (k < half) { int child = (k << 1) + 1; // assume left child is least Object c = array[child]; int right = child + 1; if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } }