DelayQueue
DelayQeque 是一个无界阻塞队列,只有在延迟时间到达的时候,才能从队列中获取元素。可以设置队列元素的存活时间,移除时间,唯一 id 等元素。
源码分析
- 添加方法 offer
public boolean offer(E e) { // 获取锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 元素加入优先级队列 q.offer(e); // 获取优先级头元素,头元素等于当前元素 // 清空leader,并放开读限制 if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { // 释放锁 lock.unlock(); } }
- 出队方法 take, 如果为空当前线程阻塞
public E take() throws InterruptedException { // 获取锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 自旋 for (;;) { // 获取优先级队列头节点 E first = q.peek(); // 优先级队列为空 if (first == null) // 阻塞 available.await(); else { // 判断头元素剩余时间是否小于等于0 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // 优先级队列出队 return q.poll(); // 到这,说明剩余时间大于0 // 头引用置空 first = null; // leader线程是否为空,不为空就等待 if (leader != null) available.await(); else { // 设置leader线程为当前线程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 休眠剩余秒 available.awaitNanos(delay); } finally { // 休眠结束,leader线程还是当前线程 // 置空leader if (leader == thisThread) leader = null; } } } } } finally { // leader线程为空,并且first不为空 // 唤醒阻塞的leader,让它再去试一次 if (leader == null && q.peek() != null) available.signal(); // 解锁 lock.unlock(); } }
使用场景
我一般很少直接使用它,但是在我们使用的框架中大量使用。
PriorityBlockingQueue
在这个数据结构,元素是按照顺序储存的。元素们必须实现带有 compareTo() 方法的 Comparable 接口。当你在结构中插入数据时,它会与数据元素对比直到找到它的位置。
源码分析
构造方法 PriorityBlockingQueue(Collection<? extends E> c)
分析,如下所示
/** * 从已有集合构造队列. * 如果已经集合是SortedSet或者PriorityBlockingQueue, 则保持原来的元素顺序 */ public PriorityBlockingQueue(Collection<? extends E> c) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); boolean heapify = true; // true if not known to be in heap order boolean screen = true; // true if must screen for nulls if (c instanceof SortedSet<?>) { // 如果是有序集合 SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue<?>) { // 如果是优先级队列 PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } Object[] a = c.toArray(); int n = a.length; if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); if (screen && (n == 1 || this.comparator != null)) { // 校验是否存在null元素 for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; if (heapify) // 堆排序 heapify(); }
- 插入元素 offer 方法分析
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; // 加锁 lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) // 队列已满, 则进行扩容 tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null) // 比较器为空, 则按照元素的自然顺序进行堆调整 siftUpComparable(n, e, array); else // 比较器非空, 则按照比较器进行堆调整 siftUpUsingComparator(n, e, array, cmp); size = n + 1; // 队列元素总数+1 notEmpty.signal(); // 唤醒一个可能正在等待的"出队线程" } finally { lock.unlock(); } return true; }
上面最关键的是siftUpComparable和siftUpUsingComparator方法,这两个方法内部几乎一样,只不过前者是一个根据元素的自然顺序比较,后者则根据外部比较器比较,我们重点看下siftUpComparable方法:
/** * 将元素x插入到array[k]的位置. * 然后按照元素的自然顺序进行堆调整——"上浮",以维持"堆"有序. * 最终的结果是一个"小顶堆". */ private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; // 相当于(k-1)除2, 就是求k结点的父结点索引parent Object e = array[parent]; if (key.compareTo((T) e) >= 0) // 如果插入的结点值大于父结点, 则退出 break; // 否则,交换父结点和当前结点的值 array[k] = e; k = parent; } array[k] = key; }
siftUpComparable方法的作用其实就是堆的“上浮调整”,可以把堆可以想象成一棵完全二叉树,每次插入元素都链接到二叉树的最右下方,然后将插入的元素与其父结点比较,如果父结点大,则交换元素,直到没有父结点比插入的结点大为止。这样就保证了堆顶(二叉树的根结点)一定是最小的元素。(注:以上仅针对“小顶堆”)
- 拓容 tryGrow方法
private void tryGrow(Object[] array, int oldCap) { lock.unlock(); // 扩容和入队/出队可以同时进行, 所以先释放全局锁 Object[] newArray = null; if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { // allocationSpinLock置1表示正在扩容 try { // 计算新的数组大小 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // 溢出判断 int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; // 分配新数组 } finally { allocationSpinLock = 0; } } if (newArray == null) // 扩容失败(可能有其它线程正在扩容,导致allocationSpinLock竞争失败) Thread.yield(); lock.lock(); // 获取全局锁(因为要修改内部数组queue) if (newArray != null && queue == array) { queue = newArray; // 指向新的内部数组 System.arraycopy(array, 0, newArray, 0, oldCap); } }
- 出对 take() 方法分析
/** * 出队一个元素. * 如果队列为空, 则阻塞线程. */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 获取全局锁 E result; try { while ((result = dequeue()) == null) // 队列为空 notEmpty.await(); // 线程在noEmpty条件队列等待 } finally { lock.unlock(); } return result; } private E dequeue() { int n = size - 1; // n表示出队后的剩余元素个数 if (n < 0) // 队列为空, 则返回null return null; else { Object[] array = queue; E result = (E) array[0]; // array[0]是堆顶结点, 每次出队都删除堆顶结点 E x = (E) array[n]; // array[n]是堆的最后一个结点, 也就是二叉树的最右下结点 array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
使用场景
我一般很少直接使用它,但是在我们使用的框架中大量使用。