PriorityBlockingQueue可以理解为线程安全的PriorityQueue,其实现原理与PriorityQueue类似,在此基础上实现了BlockingQueue接口,能够作为阻塞队列使用,由于PriorityBlockingQueue是无界队列,因而使用put方法并不会阻塞,offer方法不会返回false。PriorityBlockingQueue也是基于最小二叉堆实现,对于堆数组中索引为k的节点,其父节点为(k-1)/2,其左右子节点分别为2k+1,2k+2。PriorityBlockingQueue使用ReentrantLock来控制所有公用操作的线程同步,使用基于CAS实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行。
类图
PriorityBlockingQueue的继承层次如上图:
- Collection:所有集合的基类,定义集合的基本操作API;
- AbstractCollection:实现公用的集合操作;
- Iterable:赋予集合迭代器的能力;
- Queue:定义队列的基本操作API,比如:offer、poll等;
- AbstractQueue:实现公用的队列操作;
- BlockingQueue:定义阻塞队列的基本操作API,比如:put、take等;
- PriorityBlockingQueue:阻塞队列的实现类,继承AbstractQueue类,并实现BlockingQueue接口;
实现
主要成员变量
/**
* 存放最小二叉堆的数组
*/
private transient Object[] queue;
/**
* 优先队列中包含元素个数
*/
private transient int size;
/**
* 比较器,用于定制元素比较规则;
*/
private transient Comparator<? super E> comparator;
/**
* 用于同步队列操作的锁
*/
private final ReentrantLock lock;
/**
* 当队列为空时,用于阻塞出队操作
*/
private final Condition notEmpty;
/**
* 自旋锁标识字段,通过CAS操作进行比较更新;
* 用于动态扩容操作;值为1时,表示加锁;为0时,标识未加锁;
*/
private transient volatile int allocationSpinLock;
可以看到PriorityBlockingQueue的成员变量和PriorityQueue的成员变量相差不大,多了3个用于控制线程安全的属性:lock,notEmpty,allocationSpinLock,分别用于队列公有操作、出队阻塞、动态扩容。
初始化
PriorityBlockingQueue的初始化同PriorityQueue的类似,多了对lock、notEmpty的初始化。
public PriorityBlockingQueue() {
// 默认容量为11
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
boolean heapify = true; // true if not known to be in heap order
boolean screen = true; // true if must screen for nulls
// 如果传入集合是有序集,则无须进行堆有序化
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
// 如果传入集合是PriorityBlockingQueue类型,则不进行堆有序化
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
Object[] a = c.toArray();
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
this.queue = a;
this.size = n;
// 执行堆有序化
if (heapify)
heapify();
}
上浮、下沉操作
PriorityBlockingQueue也是基于下沉、上浮操作来实现元素的入队和出队操作的,实现代码与PriorityQueue的实现完全相同,只是为了保证线程安全,上层方法调用时需要放在加锁的环境下执行。
- siftUpComparable:上浮操作,通过默认的Comparable接口进行元素比较;队列中的元素必须实现Comparable接口;
- siftUpUsingComparator:上浮操作,通过Comparator进行元素比较;
- siftDownComparable:下沉操作,通过默认的Comparable接口进行元素比较;队列中的元素必须实现Comparable接口;
- siftDownUsingComparator:下沉操作,通过Comparator进行元素比较;
入队操作
PriorityBlockingQueue的入队操作包括4个方法:
- add:队列满时抛出异常;由于为无界队列,因而不会抛出异常;代码实现直接调用offer方法;
- offer:队列满时返回false;由于为无界队列,因而不会返回false;
- offer带超时参数:队列满时阻塞等待直至超时或者数组有空出位置;由于为无界队列,因而不会返回false、超时、阻塞;代码实现直接调用offer方法;
- put:队列满时阻塞;由于为无界队列,因而不会阻塞;代码实现直接调用offer方法;
/**
* 入队操作实现
* 由于是无界队列,永远不会返回false;
*/
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
// 通过CAS操作动态扩容
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
// 将新增元素上浮到合适位置
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
// 唤醒阻塞线程
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
/**
* 由于是无界队列,永远不会被阻塞
*/
public void put(E e) {
offer(e); // never need to block
}
/**
* 由于是无界队列,因而不存在阻塞、超时和返回false的情况;
*/
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); // never need to block
}
public boolean add(E e) {
return offer(e);
}
出队操作
PriorityBlockingQueue的出队操作包括4个方法,都是通过调用dequeue()方法实现:
- dequeue:出队操作的具体实现,为保证线程安全,上层调用方法需要加锁;
- take:队列为空时,阻塞直至有元素添加到队列中;
- poll:队列为空时,直接返回null,不会阻塞;
- poll带超时参数:队列为空时,阻塞直至超时或者有元素添加到队列中;
- drainTo:批量从队首弹出元素到指定集合中,不会阻塞;
/**
* 出队操作的具体实现,仅在有锁环境下调用;
*/
private E dequeue() {
int n = size - 1;
// 队列为空返回null;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];
// 将队尾元素提至队首位置
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
// 下沉队首元素到合适位置
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
/**
* 队列为空时,阻塞;
*/
public E take() throws InterruptedException {
// 加锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
// 如果队列为空,则阻塞;
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
/**
* 队列为空时,返回null;
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
/**
* 队列为空时,阻塞直至超时或者获取到元素;
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
// 如果队列为空,则阻塞直至超时或者获取到元素;
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}
/**
* 批量获取元素
*/
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = Math.min(size, maxElements);
// 循环遍历,不断弹出队首元素;
for (int i = 0; i < n; i++) {
c.add((E) queue[0]); // In this order, in case add() throws.
dequeue();
}
return n;
} finally {
lock.unlock();
}
}
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
查找&删除元素
PriorityBlockingQueue中查找元素的效率是偏低的,由于二叉堆并没有限制左右子节点的大小规则,因而需要变量整个数组进行查找,因而效率为$O(n)$。一些优先队列的实现会对此进行优化,给每个元素添加一个索引字段用于标记元素在堆数组中的位置,比如:ScheduledThreadPoolExecutor.DelayedWorkQueue通过ScheduledFutureTask中的heapIndex来标记任务在堆数组中的位置。
/**
* 查找元素,效率O(n)
*/
private int indexOf(Object o) {
if (o != null) {
Object[] array = queue;
int n = size;
// 遍历数组查找,效率较低
for (int i = 0; i < n; i++)
if (o.equals(array[i]))
return i;
}
return -1;
}
/**
* 删除指定位置的元素
*/
private void removeAt(int i) {
Object[] array = queue;
int n = size - 1;
// 如果索引位置在队尾,则删除队尾元素
if (n == i) // removed last element
array[i] = null;
else {
// 使用队尾元素替换删除元素的位置
E moved = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
// 下沉元素到合适位置
if (cmp == null)
siftDownComparable(i, moved, array, n);
else
siftDownUsingComparator(i, moved, array, n, cmp);
// 如果元素未下沉,则上浮到合适位置
if (array[i] == moved) {
if (cmp == null)
siftUpComparable(i, moved, array);
else
siftUpUsingComparator(i, moved, array, cmp);
}
}
size = n;
}
/**
* 删除队列中的特定元素
*/
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 查找元素
int i = indexOf(o);
if (i == -1)
return false;
// 删除元素
removeAt(i);
return true;
} finally {
lock.unlock();
}
}
动态扩容
在插入元素时,如果堆数组长度不足,则需要新建一个更长的数组,拷贝现有元素到新数组中,从而实现扩容的目的。为保证动态扩容不阻塞队列元素的出队,PriorityBlockingQueue通过CAS操作实现的自旋锁来控制扩容操作:使用allocationSpinLock来标记是否加锁,值为1时,表示加锁,值为0时,表示未加锁;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
// 存放allocationSpinLock属性的偏移量,方便后面CAS更新
private static final long allocationSpinLockOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = PriorityBlockingQueue.class;
allocationSpinLockOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("allocationSpinLock"));
} catch (Exception e) {
throw new Error(e);
}
}
private void tryGrow(Object[] array, int oldCap) {
// 扩容时,释放锁,防止阻塞出队操作
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 加锁,变更allocationSpinLock的值为1;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 容量小于64,则每次容量+2,否则增加一倍
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
// 如果没有其它线程扩容,则创建新数组;
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
// 解锁,因为只有一个线程到此,因而不需要CAS操作;
allocationSpinLock = 0;
}
}
// 如果此时有其它线程已经加锁,则让出执行权,等待其它线程执行完成
if (newArray == null) // back off if another thread is allocating
Thread.yield();
// 加锁,替换现有数组的引用,拷贝数组元素;
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
总结
PriorityBlockingQueue可以理解为public操作都加锁的PriorityQueue,通过排他锁保证了操作的线程安全。PriorityBlockingQueue扩容时,因为增加堆数组的长度并不影响队列中元素的出队操作,因而使用自旋CAS操作实现的锁来控制扩容操作,仅在数组引用替换和拷贝元素时才加锁,从而减少了扩容对出队操作的影响。自旋锁的实现思路以及应用场景值得我们学习借鉴。