概述
DelayQueue并发队列是一个无界阻塞延迟队列,队列中的每个元素都有个过期时间,当从队列获取元素时,只有过期元素才会出队列。
队列头元素是最快要过期的元素。
类图结构
由该图可知
DelayQueue内部使用PriorityQueue存放数据,使用ReentrantLock实现线程同步。
另外,队列里面的元素要实现Delayed接口,由于每个元素都有一个过期时间,所以要实现获知当前元素还剩下多少时间就过期了的接口,由于内部使用优先级队列来实现,所以要实现元素之间相互比较的接口。
/** * A mix-in style interface for marking objects that should be * acted upon after a given delay. * * <p>An implementation of this interface must define a * {@code compareTo} method that provides an ordering consistent with * its {@code getDelay} method. * * @since 1.5 * @author Doug Lea */ public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
条件变量available与lock锁是对应的,其目的是为了实现线程间同步
private final transient ReentrantLock lock = new ReentrantLock(); /** * Condition signalled when a newer element becomes available * at the head of the queue or a new thread may need to * become leader. */ private final Condition available = lock.newCondition();
其中leader变量的使用基于Leader-Follower模式的变体,用于尽量减少不必要的线程等待。当一个线程调用队列的take方法变为leader线程后,它会调用条件变量available.awaitNanos(delay)等待delay时间,但是其他线程(follwer线程)则会调用available.await()进行无限等待
leader线程延迟时间过期后,会退出take方法,并通过调用available.signal()方法唤醒一个follwer线程,被唤醒的follwer线程被选举为新的leader线程。
每日一博 - DelayQueue阻塞队列源码解读
/** * Thread designated to wait for the element at the head of * the queue. This variant of the Leader-Follower pattern * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to * minimize unnecessary timed waiting. When a thread becomes * the leader, it waits only for the next delay to elapse, but * other threads await indefinitely. The leader thread must * signal some other thread before returning from take() or * poll(...), unless some other thread becomes leader in the * interim. Whenever the head of the queue is replaced with * an element with an earlier expiration time, the leader * field is invalidated by being reset to null, and some * waiting thread, but not necessarily the current leader, is * signalled. So waiting threads must be prepared to acquire * and lose leadership while waiting. */ private Thread leader = null;
小Demo
import java.util.Random; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/12/19 23:05 * @mark: show me the code , change the world */ public class DelayQueueTest { static class DelayedEle implements Delayed { private final long delayTime; //延迟时间 private final long expire; //到期时间 private String data; //数据 public DelayedEle(long delay, String data) { delayTime = delay; this.data = data; expire = System.currentTimeMillis() + delay; } /** * 剩余时间=到期时间-当前时间 */ @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } /** * 优先队列里面优先级规则 */ @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString() { final StringBuilder sb = new StringBuilder("DelayedElement{"); sb.append("delay=").append(delayTime); sb.append(", expire=").append(expire); sb.append(", data='").append(data).append('\''); sb.append('}'); return sb.toString(); } } public static void main(String[] args) throws InterruptedException { // 1 创建延时队列 DelayQueue<DelayedEle> delayQueue = new DelayQueue<DelayedEle>(); // 2 创建延时任务 Random random = new Random(); for (int i = 0; i < 10; i++) { DelayedEle ele = new DelayedEle(random.nextInt(500), "task-" + i); delayQueue.offer(ele); } System.out.println("开始操作,delayQueue队列大小为:" + delayQueue.size()); // 3 依次取出任务并打印 DelayedEle delayedEle = null; try { // 3.1 循环,如果想避免虚假唤醒,则不能把全部元素都打印出来 for (; ; ) { // 3.2 获取过期的任务并打印 while ((delayedEle = delayQueue.take()) != null) { System.out.println(delayedEle.toString()); } } } catch (InterruptedException e) { e.printStackTrace(); } } }
首先创建延迟任务DelayedEle类,其中delayTime表示当前任务需要延迟多少ms时间过期,expire则是当前时间的ms值加上delayTime的值。
另外,实现了Delayed接口,实现了long getDelay(TimeUnit unit)方法用来获取当前元素还剩下多少时间过期,实现了int compareTo(Delayed o)方法用来决定优先级队列元素的比较规则。
在main函数内首先创建了一个延迟队列,然后使用随机数生成器生成了10个延迟任务,最后通过循环依次获取延迟任务,并打印。运行上面代码,一个可能的输出如下所示。
可见,出队的顺序和delay时间有关,而与创建任务的顺序无关。
核心方法&源码解读
offer操作
插入元素到队列,如果插入元素为null则抛出NullPointerException异常,否则由于是无界队列,所以一直返回true。插入元素要实现Delayed接口。
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return {@code true} * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; // 1 lock.lock(); try { q.offer(e); if (q.peek() == e) {// 2 leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
首先获取独占锁,然后添加元素到优先级队列,由于q是优先级队列,所以添加元素后,调用q.peek()方法返回的并不一定是当前添加的元素
如果代码(2)判断结果为true,则说明当前元素e是最先将过期的,那么重置leader线程为null,这时候激活avaliable变量条件队列里面的一个线程,告诉它队列里面有元素了。
take操作
获取并移除队列里面延迟时间过期的元素,如果队列里面没有过期元素则等待。
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 1 获取但不移除队首元素 E first = q.peek(); if (first == null) available.await(); // 2 else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // 3 return q.poll(); first = null; // don't retain ref while waiting if (leader != null) // 4 available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; // 5 try { available.awaitNanos(delay); // 6 } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) // 7 available.signal(); lock.unlock(); } }
首先获取独占锁lock。假设线程A第一次调用队列的take()方法时队列为空,则执行代码(1)后first==null,所以会执行代码(2)把当前线程放入available的条件队列里阻塞等待。
当有另外一个线程B执行offer(item)方法并且添加元素到队列时,假设此时没有其他线程执行入队操作,则线程B添加的元素是队首元素,那么执行q.peek()。
e这时候就会重置leader线程为null,并且激活条件变量的条件队列里面的一个线程。此时线程A就会被激活。
线程A被激活并循环后重新获取队首元素,这时候first就是线程B新增的元素,可知这时候first不为null,则调用first.getDelay(TimeUnit.NANOSECONDS)方法查看该元素还剩余多少时间就要过期,如果delay<=0则说明已经过期,那么直接出队返回。
否则查看leader是否为null,不为null则说明其他线程也在执行take,则把该线程放入条件队列。如果这时候leader为null,则选取当前线程A为leader线程,
然后执行代码(5)等待delay时间(这期间该线程会释放锁,所以其他线程可以offer添加元素,也可以take阻塞自己),剩余过期时间到后,线程A会重新竞争得到锁,然后重置leader线程为null,重新进入循环,这时候就会发现队头的元素已经过期了,则会直接返回队头元素。
在返回前会执行finally块里面的代码(7),代码(7)执行结果为true则说明当前线程从队列移除过期元素后,又有其他线程执行了入队操作,那么这时候调用条件变量的singal方法,激活条件队列里面的等待线程。
poll操作
获取并移除队头过期元素,如果没有过期元素则返回null。
/** * Retrieves and removes the head of this queue, or returns {@code null} * if this queue has no elements with an expired delay. * * @return the head of this queue, or {@code null} if this * queue has no elements with an expired delay */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); // 如果队列为空,或者不为空但是对头元素没有过期,则返回null if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } }
首先获取独占锁,然后获取队头元素,如果队头元素为null或者还没过期则返回null,否则返回队头元素。
size操作
计算队列元素个数,包含过期的和没有过期的。
public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.size(); } finally { lock.unlock(); } }
先获取独占锁,然后调用优先级队列的size方法。
小结
DelayQueue队列内部使用PriorityQueue存放数据,使用ReentrantLock实现线程同步。
另外队列里面的元素要实现Delayed接口,其中一个是获取当前元素到过期时间剩余时间的接口,在出队时判断元素是否过期了,一个是元素之间比较的接口,因为这是一个有优先级的队列。