Pre
DelayQueue 由优先级支持的、基于时间的调度队列,内部使用非线程安全的优先队列(PriorityQueue)实现,而无界队列基于数组的扩容实现。
在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。
创建队列
BlockingQueue<String> blockingQueue = new DelayQueue();
入队的对象必须要实现 Delayed接口,而Delayed集成自 Comparable 接口。
Delayed 接口使对象成为延迟对象,它使存放在DelayQueue类中的对象具有了激活日期。该接口强制实现下列两个方法。
compareTo(Delayed o):Delayed接口继承了Comparable接口,因此有了这个方法。让元素按激活日期排队
getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,时间单位由单位参数指定。
DelayQueue特征
- DelayQueue的泛型参数需要实现Delayed接口
Delayed接口继承了Comparable接口,DelayQueue内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待时间。
- DelayQueue不允许包含null元素。
Leader/Followers模式
有若干个线程(一般组成线程池)用来处理大量的事件
有一个线程作为领导者,等待事件的发生;其他的线程作为追随者,仅仅是睡眠
假如有事件需要处理,领导者会从追随者中指定一个新的领导者,自己去处理事件
唤醒的追随者作为新的领导者等待事件的发生
处理事件的线程处理完毕以后,就会成为追随者的一员,直到被唤醒成为领导者
假如需要处理的事件太多,而线程数量不够(能够动态创建线程处理另当别论),则有的事件可能会得不到处理。
所以线程会有三种身份中的一种:leader 和 follower,以及一个干活中的状态:processser。
基本原则就 永远最多只有一个 leader。
而所有 follower 都在等待成为 leader。
线程池启动时会自动产生一个 Leader 负责等待网络 IO 事件,当有一个事件产生时,Leader 线程首先通知一个 Follower 线程将被其提拔为新的 Leader ,然后自己就去干活了,去处理这个网络事件,处理完毕后加入 Follower 线程等待队列,等待下次成为 Leader。
这种方法可以增强 CPU高速缓存相似性,以及消除动态内存分配和线程间的数据交换。
DelayQueue源码分析
类继承关系
核心方法
成员变量
DelayQueue 通过组合一个PriorityQueue 来实现元素的存储以及优先级维护,通过ReentrantLock 来保证线程安全,通过Condition 来判断是否可以取数据,对于leader 后面再来分析它的作用
// 可重入锁 private final transient ReentrantLock lock = new ReentrantLock(); // 存储元素的优先级队列 private final PriorityQueue<E> q = new PriorityQueue<E>(); // 获取数据 等待线程标识 private Thread leader = null; // 条件控制,表示是否可以从队列中取数据 private final Condition available = lock.newCondition();
构造函数
DelayQueue 内部组合PriorityQueue,对元素的操作都是通过PriorityQueue 来实现的,DelayQueue 的构造方法很简单,对于PriorityQueue 都是使用的默认参数,不能通过DelayQueue 来指定PriorityQueue的初始大小,也不能使用指定的Comparator,元素本身就需要实现Comparable ,因此不需要指定的Comparator。
/** * 无参构造函数 */ public DelayQueue() {} /** * 通过集合初始化 */ public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
入队方法
虽然提供入队的接口方式很多,实际都是调用的offer 方法,通过PriorityQueue 来进行入队操作,入队超时方法并没有其超时功能。
add(E e),将指定的元素插入到此队列中,在成功时返回 true
put(E e),将指定的元素插入此队列中,队列达到最大值,则抛oom异常
offer(E e),将指定的元素插入到此队列中,在成功时返回 true
offer(E e, long timeout, TimeUnit unit),指定一个等待时间将元素放入队列中并没有意义
offer(E e)
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return {@code true} * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
将指定的元素插入到此队列中,在成功时返回 true,其他几个方法内部都调用了offer 方法,我们也可以直接调用offer 方法来完成入队操作。
peek并不一定是当前添加的元素,队头是当前添加元素,说明当前元素e的优先级最小也就即将过期的,这时候激活avaliable变量条件队列里面的一个线程,通知他们队列里面有元素了。
public boolean offer(E e) { final ReentrantLock lock = this.lock; // 获取锁 lock.lock(); try { //通过PriorityQueue 来将元素入队 q.offer(e); //peek 是获取的队头元素,唤醒阻塞在available 条件上的一个线程,表示可以从队列中取数据了 if (q.peek() == e) { leader = null; // 唤醒通知 available.signal(); } return true; } finally { // 解锁 lock.unlock(); } }
出队方法
poll(),获取并移除此队列的头,如果此队列为空,则返回 null
poll(long timeout, TimeUnit unit),获取并移除此队列的头部,在指定的等待时间前等待
take(),获取并移除此队列的头部,在元素变得可用之前一直等待
peek(),调用此方法,可以返回队头元素,但是元素并不出队
poll()
获取并移除此队列的头,如果此队列为空,则返回 null
public E poll() { final ReentrantLock lock = this.lock; // 获取同步锁 lock.lock(); try { // 获取队头元素 E first = q.peek(); // 如果对头为null 或者 延时还没有到,则返回 null if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return q.poll(); // 否则元素出队 } finally { lock.unlock(); } }
poll(long timeout, TimeUnit unit)
获取并移除此队列的头部,在指定的等待时间前等待。
public E poll(long timeout, TimeUnit unit) throws InterruptedException { // 超时等待时间 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // 获取可中断锁 lock.lockInterruptibly(); try // 无限循环 for (;;) { // 获取队头元素 E first = q.peek // 队头为空,也就是队列为空 if (first == null) { // 达到超时指定时间,返回null if (nanos <= 0) return null; else // 如果还没有超时,那么在available条件上进行等待nanos时间 nanos = available.awaitNanos(nanos); } else { // 获取元素延迟时间 long delay = first.getDelay(NANOSECONDS); // 延时到期 if (delay <= 0) return q.poll(); // 返回出队元素 // 延时未到期,超时到期,返回null if (nanos <= 0) return null; first = null; // don't retain ref while waiting // 超时等待时间 < 延迟时间 或者 有其他线程再取数据 if (nanos < delay || leader != null) // 在available条件上进行等待nanos时间 nanos = available.awaitNanos(nanos); else { // 超时等待时间 > 延迟时间 // 并且没有其他线程在等待,那么当前元素成为leader,表示 leader线程最早 正在等待获取元素 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待 延迟时间 超时 long timeLeft = available.awaitNanos(delay); // 还需要继续等待 nanos nanos -= delay - timeLeft; } finally { // 清除 leader if (leader == thisThread) leader = null; } } } } } finally { // 唤醒阻塞在 available 的一个线程,表示可以取数据了 if (leader == null && q.peek() != null) available.signal // 释放锁 lock.unlock(); } }
梳理一下
如果队列为空,如果超时时间未到,则进行等待,否则返回null
队列不为空,取出队头元素,如果延迟时间到来,则返回元素,否则如果超时时间到返回null
超时时间未到,并且超时时间 < 延迟时间 或者 有线程正在获取元素,那么进行等待
超时时间 > 延迟时间,那么肯定可以取到元素,设置 leader为当前线程,等待延迟时间到期。
需要注意的时Condition 条件在阻塞时会释放锁,在被唤醒时会再次获取锁,获取成功才会返回。 当进行超时等待时,阻塞在Condition 上后会释放锁,一旦释放了锁,那么其它线程就有可能参与竞争,某一个线程就可能会成为leader(参与竞争的时间早,并且能在等待时间内能获取到队头元素那么就可能成为leader) leader是用来减少不必要的竞争,如果leader不为空说明已经有线程在取了,设置当前线程等待即可。
leader 就是一个信号,告诉其它线程:你们不要再去获取元素了,它们延迟时间还没到期,我都还没有取到数据呢,你们要取数据,等我取了再说
take()
获取并移除此队列的头部,在元素变得可用之前一直等待
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 获取可中断锁 lock.lockInterruptibly(); try { for (;;) { // 获取队头元素 E first = q.peek(); // 队头元素为空,则阻塞等待 if (first == null) available.await(); else { // 获取元素延迟时间 long delay = first.getDelay(NANOSECONDS); // 延时到期 if (delay <= 0) return q.poll(); // 返回出队元素 first = null; // don't retain ref while waiting // 如果有其它线程在等待获取元素,则当前线程不用去竞争,直接等待 if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待延迟时间到期 available.awaitNanos(delay); } finally { //清除 leader if (leader == thisThread) leader = null; } } } } } finally { //唤醒阻塞在available 的一个线程,表示可以取数据了 if (leader == null && q.peek() != null) available.signal(); // 释放锁 lock.unlock(); } }
该方法就是相当于在前面的超时等待中,把超时时间设置为无限大,那么这样只要队列中有元素,要是元素延迟时间要求,那么就可以取出元素,否则就直接等待元素延迟时间到期,再取出元素,最先参与等待的线程会成为leader。
peek
调用此方法,可以返回队头元素,但是元素并不出队。
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { //返回队列头部元素,元素不出队 return q.peek(); } finally { lock.unlock(); } }
小结
DelayQueue 内部通过组合PriorityQueue 来实现存储和维护元素顺序的;
DelayQueue 存储元素必须实现Delayed 接口,通过实现Delayed 接口,可以获取到元素延迟时间,以及可以比较元素大小(Delayed 继承Comparable);
DelayQueue 通过一个可重入锁来控制元素的入队出队行为;
DelayQueue 中leader 标识 用于减少线程的竞争,表示当前有其它线程正在获取队头元素;
PriorityQueue 只是负责存储数据以及维护元素的顺序,对于延迟时间取数据则是在DelayQueue 中进行判断控制的;
DelayQueue 没有实现序列化接口