一、继承实现图关系
二、底层数据存储结构
属性说明:
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { ... // 数组默认容量11 private static final int DEFAULT_INITIAL_CAPACITY = 11; // 存储数据的数组队列 private transient Object[] queue; // 当前数组队列数据大小 private transient int size; // 比较规则(排序规则) private transient Comparator<? super E> comparator; // 独占锁 private final ReentrantLock lock = new ReentrantLock(); // 队列为空是阻塞 private final Condition notEmpty = lock.newCondition(); // CAS自旋锁 private transient volatile int allocationSpinLock; // 序列化优先队列 private PriorityQueue<E> q; ... }
构造器:
/** * 默认构造器,默认容量大小为11,无排序规则 */ public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } /** * 指定容器大小,无排序规则 */ public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } /** * 指定容器大小 和 指定排序规则 */ public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.comparator = comparator; this.queue = new Object[Math.max(1, initialCapacity)]; } ...
注:默认构造器容量大小为11,没有比较器,如果未指定比较器,插入队列的元素需要实现Comparable接口
三、特点及优缺点
3.1 特点
- 支持优先级排序的无界阻塞队列,优先级高的先出队,先先级低的后出队
- 存储数据结构为数组,以二叉堆形式存储于数组中,二叉堆叶子节点均小于父节点
- 存取都为一把锁
- 入队不阻塞,出队阻塞
- 出队取队头元素
- 默认容量是11,数组最大容量为Integer.MAX_VALUE - 8
3.2 优缺点
- 根据排序规则排序(可提供排序规则,若不提供则队列元素必须实现 Comparable接口)
- put不阻塞(若生产速度快于消费速度,会耗尽所有的可用堆内空间)
- 优先级排序无界阻塞队列
- 取数据锁为独占锁
- 内部数组长度不够会进行扩容
- 新增元素进行最小二叉推排序
四、源码详解
读取部分源码:
- 添加任务方法
- 获取和删除任务方法
- 扩容
阅读明白这几个接口即可,其它都很简单。
4.1 put、offer
- 数组无界,不阻塞添加
- 不允许添加空对象
- 父节点大于所有子节点
public void put(E e) { offer(e); // never need to block } /** * 向队列中添加元素 <br/> * 添加元素若为空,直接抛出异常 <br/> */ public boolean offer(E e) { // 添加的对象不能为空 if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] es; while ((n = size) >= (cap = (es = queue).length)) { // 若数组已满,则进行扩容 tryGrow(es, cap); } try { final Comparator<? super E> cmp; if ((cmp = comparator) == null) { // 无配置排序规则时,走对象自带的排序逻辑 siftUpComparable(n, e, es); } else { // 走自定义排序规则 siftUpUsingComparator(n, e, es, cmp); } size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; } /** * 创建队列时无指定排序规则,调添加对象的compareTo进行排序 <br/> * 所以,添加的元素必须实现Comparable接口 <br/> */ private static <T> void siftUpComparable(int k, T x, Object[] es) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { // 由新元素下标(移动下标),获取父节点的数组下标 int parent = (k - 1) >>> 1; Object e = es[parent]; // 若排序规则成立,则直接退出循环,key放到k的位置 if (key.compareTo((T) e) >= 0) break; // 若不成立,父节点元素放到位置k es[k] = e; // 移动下标指向父节点下标,然后继续向上(父节点的父节点)比较,直到if成立,或到根节点 k = parent; } es[k] = key; } /** * 创建队列时指定排序规则,则排序规则按自定义排序处理数组二叉堆数据 <br/> * 逻辑同siftUpComparable */ private static <T> void siftUpUsingComparator( int k, T x, Object[] es, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = es[parent]; if (cmp.compare(x, (T) e) >= 0) break; es[k] = e; k = parent; } es[k] = x; }
4.2 take
- 若数组队列为空,则阻塞。
- 取出最后一个元素L,并最后一个下标的值赋值null
- 1 二叉堆下标为0出队(树的根节点)
- 2 大的子节点元素a放到树的根节点(假设排序规则为降序)
- 3. 以元素a的原位置似为树,重复1,2步骤,直到找到最小树 s(有叶子节点)
- 4. s的叶子节点大子元素b放到根节点,原树最后一个元素L放到叶子节点元素b的原位置
元素的移动次数最多为树的高度
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 获取锁 lock.lockInterruptibly(); E result; try { // 若队列为空,则阻塞 while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; } /** * 出队(假调排序规则是降序) <br/> * 取出最后一个元素a,并且最后一个标的值赋值为空 <br/> * 1. 取出第一个元素,则该位置空缺 <br/> * 2. 大的子元素放到第一个位置, 则这个位置空缺 <br/> * 3. 以空缺为父节点,重复1和2步骤,找到最小子树 * 4. 大的叶子节点放到父节点,元素a放到大的叶子节点原来的位置 */ private E dequeue() { // assert lock.isHeldByCurrentThread(); final Object[] es; final E result; // 取出第一个元素 if ((result = (E) ((es = queue)[0])) != null) { final int n; // 取出最后一个元素 final E x = (E) es[(n = --size)]; // 删除最后一个下标元素 es[n] = null; if (n > 0) { final Comparator<? super E> cmp; // 大的子节点中 if ((cmp = comparator) == null) siftDownComparable(0, x, es, n); else siftDownUsingComparator(0, x, es, n, cmp); } } return result; } private static <T> void siftDownComparable(int k, T x, Object[] es, int n) { // assert n > 0; Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; // loop while a non-leaf while (k < half) { // 计算出左子节点下标 int child = (k << 1) + 1; // assume left child is least Object c = es[child]; int right = child + 1; // 取出大的子节点 if (right < n && ((Comparable<? super T>) c).compareTo((T) es[right]) > 0) c = es[child = right]; if (key.compareTo((T) c) <= 0) break; // 大的子节点元素放到父节点的位置 es[k] = c; // 以大的子节点元素为根,重复以上逻辑,继续找大的子节点放到其位置 k = child; } // 原树的最后一个位置的元素放到最后一个移动的叶子节点的原来位置上 es[k] = key; } /** * 逻辑同上 <br/> */ private static <T> void siftDownUsingComparator( int k, T x, Object[] es, int n, Comparator<? super T> cmp) { // assert n > 0; int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = es[child]; int right = child + 1; if (right < n && cmp.compare((T) c, (T) es[right]) > 0) c = es[child = right]; if (cmp.compare(x, (T) c) <= 0) break; es[k] = c; k = child; } es[k] = x; }
4.3 扩容原理
- CAS自旋锁,避免并发扩容
- 扩容时机:元素数量等于数组大小时进行扩容
- 扩容大小:
- 容量(len)小于64时 len = len + (len + 2),即新容量 = 原容量*2 + 2
- 容量(len) >= 64 时 len = len + len / 2,即新容量 = 原容量 + 原容量/2
- 创建新数组,原数组数据复制过来
public boolean offer(E e) { ... // 元素数量等于数组大小时进行扩容 // 有可能别的线程扩容了,又添加元素满了,所以while判断就轮到本线程再扩容 while ((n = size) >= (cap = (es = queue).length)) tryGrow(es, cap); ... } /** * 扩容 */ private void tryGrow(Object[] array, int oldCap) { // 必须释放然后重新获得主锁,因为只有一把锁,释放锁后可以读 lock.unlock(); Object[] newArray = null; // CAS自旋锁,避免并发扩容 if (allocationSpinLock == 0 && ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) { try { // 计算新容量大小,容量<64时 len = len + (len + 2) // 计算新容量大小,容量>=64时 len = len + (len / 2) int growth = (oldCap < 64) ? (oldCap + 2) // grow faster if small : (oldCap >> 1); int newCap = ArraysSupport.newLength(oldCap, 1, growth); if (queue == array) // 创建新的空数组 newArray = new Object[newCap]; } finally { // 还原自旋锁为0 allocationSpinLock = 0; } } // 如果另一个线程已分配,则退出 if (newArray == null) Thread.yield(); // 数组队列指向新数组,旧数据复制到新数组前加锁 lock.lock(); if (newArray != null && queue == array) { // 队列数组指向新数组 queue = newArray; // 复制旧数据到新数组中 System.arraycopy(array, 0, newArray, 0, oldCap); } }
五、作用和使用场景
- 业务办理排队叫号,VIP客户插队
- 枪购