入队
add() 和 offer()
add(E e)
和 offer(E e)
的目的相同,都是向优先队列中插入元素,只是 Queue 接口规定二者对插入失败时的处理不同,前者在插入失败时抛出异常,后则则会返回false。对于 PriorityQueue 这两个方法其实没什么差别,我们可以针对不同的场景进行使用。 下面我们以 offer 为例,一起看看它是如何实现的。
public boolean offer(E e) { // 如果为 null, 返回 NPE . 所以不能添加 null 元素 if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; // 获取锁 lock.lock(); int n, cap; Object[] array; // 队列满了,就进行拓容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null) // 比较器为空,自然排序 siftUpComparable(n, e, array); else // 将元素加入堆中 siftUpUsingComparator(n, e, array, cmp); // 元素格式加 1 size = n + 1; // 唤醒读线程 notEmpty.signal(); } finally { // 解锁 lock.unlock(); } return true; }
tryGrow 拓容
按照上面的分析,如果当队列满了,写入队列的线程就需要调用 tryGrow 方法对队列进行拓容。拓容成功后在想队列添加新元素。下面我们一起来看看 tryGrow 方法的具体实现:
private void tryGrow(Object[] array, int oldCap) { // 释放锁 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; // CAS 获取锁 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { // 拓容 try { // 计算新的长度 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); // 如果新容量大于 MAX_ARRAY_SIZE 可能内存溢出 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; // 到达最大值,拓容失败 if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } // 并发情况下,被其他线程已经拓容,那么本次不在拓容 if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { // 释放锁 allocationSpinLock = 0; } } // 如果有写线程正在拓容量让出 cpu if (newArray == null) // back off if another thread is allocating Thread.yield(); // 解锁 lock.lock(); // 再次判断是否是首个拓容线程的写线程,如果是就将阻塞队列,指向新拓容得新数组得引用得新队列 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
添加元素到堆得方式 siftUpComparable
, siftUpUsingComparaor
区别只是自然排序还是比较器排序, 下面以 siftUpcomparable 方法
private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; } private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; }
出队
出队一般调用 take , 或者 poll , remove 方法,下面是 take 方法得实现。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { // 如果没有拿到 await while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } // 返回 return result; }
从上面可以看出,咱们出队列的核心方法是 dequeue()
private E dequeue() { int n = size - 1; // 为空 if (n < 0) return null; else { // 非空堆顶取出数据 Object[] array = queue; E result = (E) array[0]; E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; // 调整堆 if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
总结
阻塞队列 PriorityBlockingQueue 不阻塞写线程,当队列满的时候,写线程会尝试拓容会造成队列阻塞,拓展容量成功后再向队列中新增元素;而当队列元素为空时,会阻塞读线程,当然也有非阻塞方法的 poll , 该阻塞队列适用于读多写少的场景,写的线程多,堆导致内存消耗过大。性能影响,队列采用堆存储结构,因此每次从阻塞队列取出的元素总是最小元素(或者最大元素)。而且堆存储需要提供比较器或者元素实现比较器接口,否则程序会抛出 ClassCastException。