每日一博 - DelayQueue阻塞队列源码解读

简介: 每日一博 - DelayQueue阻塞队列源码解读



Pre

每日一博 - 延时任务的多种实现方式解读


d0fdb2e70e1847b2b9749789048967d3.pngDelayQueue 由优先级支持的、基于时间的调度队列,内部使用非线程安全的优先队列(PriorityQueue)实现,而无界队列基于数组的扩容实现。

在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。


创建队列

-

BlockingQueue<String> blockingQueue = new DelayQueue();


入队的对象必须要实现 Delayed接口,而Delayed集成自 Comparable 接口。


Delayed 接口使对象成为延迟对象,它使存放在DelayQueue类中的对象具有了激活日期。该接口强制实现下列两个方法。


compareTo(Delayed o):Delayed接口继承了Comparable接口,因此有了这个方法。让元素按激活日期排队

getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,时间单位由单位参数指定。



c20e6a05cdb2494b9596f0e8db528bcb.png



DelayQueue特征


  • DelayQueue的泛型参数需要实现Delayed接口

Delayed接口继承了Comparable接口,DelayQueue内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待时间。



  • DelayQueue不允许包含null元素。


Leader/Followers模式


有若干个线程(一般组成线程池)用来处理大量的事件


有一个线程作为领导者,等待事件的发生;其他的线程作为追随者,仅仅是睡眠


假如有事件需要处理,领导者会从追随者中指定一个新的领导者,自己去处理事件


唤醒的追随者作为新的领导者等待事件的发生


处理事件的线程处理完毕以后,就会成为追随者的一员,直到被唤醒成为领导者


假如需要处理的事件太多,而线程数量不够(能够动态创建线程处理另当别论),则有的事件可能会得不到处理。


所以线程会有三种身份中的一种:leader 和 follower,以及一个干活中的状态:processser。


基本原则就 永远最多只有一个 leader。


而所有 follower 都在等待成为 leader。


线程池启动时会自动产生一个 Leader 负责等待网络 IO 事件,当有一个事件产生时,Leader 线程首先通知一个 Follower 线程将被其提拔为新的 Leader ,然后自己就去干活了,去处理这个网络事件,处理完毕后加入 Follower 线程等待队列,等待下次成为 Leader。


这种方法可以增强 CPU高速缓存相似性,以及消除动态内存分配和线程间的数据交换。


DelayQueue源码分析

类继承关系


e119b7e104244e8da91f7e190aeaab83.png


核心方法


fea41b05a1134379b14a0e637d8d958e.png


成员变量


d36b655ba9e24b3999c2f71c9392d36b.png


DelayQueue 通过组合一个PriorityQueue 来实现元素的存储以及优先级维护,通过ReentrantLock 来保证线程安全,通过Condition 来判断是否可以取数据,对于leader 后面再来分析它的作用

// 可重入锁
private final transient ReentrantLock lock = new ReentrantLock();
// 存储元素的优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 获取数据 等待线程标识
private Thread leader = null;
// 条件控制,表示是否可以从队列中取数据
private final Condition available = lock.newCondition();

构造函数


306c9f26342c4ddbbb82bb25ad634b64.png


DelayQueue 内部组合PriorityQueue,对元素的操作都是通过PriorityQueue 来实现的,DelayQueue 的构造方法很简单,对于PriorityQueue 都是使用的默认参数,不能通过DelayQueue 来指定PriorityQueue的初始大小,也不能使用指定的Comparator,元素本身就需要实现Comparable ,因此不需要指定的Comparator。

/**
* 无参构造函数
*/
public DelayQueue() {}
/**
* 通过集合初始化
*/
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}


入队方法

8f0ed9808cab4d4598352017ca40a706.png


虽然提供入队的接口方式很多,实际都是调用的offer 方法,通过PriorityQueue 来进行入队操作,入队超时方法并没有其超时功能。


add(E e),将指定的元素插入到此队列中,在成功时返回 true


put(E e),将指定的元素插入此队列中,队列达到最大值,则抛oom异常


offer(E e),将指定的元素插入到此队列中,在成功时返回 true


offer(E e, long timeout, TimeUnit unit),指定一个等待时间将元素放入队列中并没有意义


offer(E e)

    /**
     * Inserts the specified element into this delay queue.
     *
     * @param e the element to add
     * @return {@code true}
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }


将指定的元素插入到此队列中,在成功时返回 true,其他几个方法内部都调用了offer 方法,我们也可以直接调用offer 方法来完成入队操作。


peek并不一定是当前添加的元素,队头是当前添加元素,说明当前元素e的优先级最小也就即将过期的,这时候激活avaliable变量条件队列里面的一个线程,通知他们队列里面有元素了。

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        //通过PriorityQueue 来将元素入队
        q.offer(e);
        //peek 是获取的队头元素,唤醒阻塞在available 条件上的一个线程,表示可以从队列中取数据了
        if (q.peek() == e) {
            leader = null;
            // 唤醒通知
            available.signal();
        }
        return true;
    } finally {
        // 解锁
        lock.unlock();
    }
}

出队方法

8d6103215b5b496ba40327d835bd5b00.png

poll(),获取并移除此队列的头,如果此队列为空,则返回 null

poll(long timeout, TimeUnit unit),获取并移除此队列的头部,在指定的等待时间前等待

take(),获取并移除此队列的头部,在元素变得可用之前一直等待

peek(),调用此方法,可以返回队头元素,但是元素并不出队


poll()

获取并移除此队列的头,如果此队列为空,则返回 null

public E poll() {
    final ReentrantLock lock = this.lock;
    // 获取同步锁
    lock.lock();
    try {
        // 获取队头元素
        E first = q.peek();
        // 如果对头为null 或者 延时还没有到,则返回 null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll(); // 否则元素出队
    } finally {
        lock.unlock();
    }
}

poll(long timeout, TimeUnit unit)


获取并移除此队列的头部,在指定的等待时间前等待。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 超时等待时间
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // 获取可中断锁
    lock.lockInterruptibly();
    try 
        // 无限循环
        for (;;) {
            // 获取队头元素
            E first = q.peek
            // 队头为空,也就是队列为空
            if (first == null) {
                // 达到超时指定时间,返回null
                if (nanos <= 0)
                    return null;
                else
                    // 如果还没有超时,那么在available条件上进行等待nanos时间
                    nanos = available.awaitNanos(nanos);
            } else {
                // 获取元素延迟时间
                long delay = first.getDelay(NANOSECONDS);
                // 延时到期
                if (delay <= 0)
                    return q.poll(); // 返回出队元素
                // 延时未到期,超时到期,返回null
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting
                // 超时等待时间 < 延迟时间 或者 有其他线程再取数据
                if (nanos < delay || leader != null)
                    // 在available条件上进行等待nanos时间
                    nanos = available.awaitNanos(nanos);
                else {
                    // 超时等待时间 > 延迟时间 
                    // 并且没有其他线程在等待,那么当前元素成为leader,表示 leader线程最早 正在等待获取元素
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 等待 延迟时间 超时
                        long timeLeft = available.awaitNanos(delay);
                        // 还需要继续等待 nanos
                        nanos -= delay - timeLeft;
                    } finally {
                        // 清除 leader
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 唤醒阻塞在 available 的一个线程,表示可以取数据了
        if (leader == null && q.peek() != null)
            available.signal
        // 释放锁
        lock.unlock();
    }
}


梳理一下


如果队列为空,如果超时时间未到,则进行等待,否则返回null

队列不为空,取出队头元素,如果延迟时间到来,则返回元素,否则如果超时时间到返回null

超时时间未到,并且超时时间 < 延迟时间 或者 有线程正在获取元素,那么进行等待

超时时间 > 延迟时间,那么肯定可以取到元素,设置 leader为当前线程,等待延迟时间到期。

需要注意的时Condition 条件在阻塞时会释放锁,在被唤醒时会再次获取锁,获取成功才会返回。 当进行超时等待时,阻塞在Condition 上后会释放锁,一旦释放了锁,那么其它线程就有可能参与竞争,某一个线程就可能会成为leader(参与竞争的时间早,并且能在等待时间内能获取到队头元素那么就可能成为leader) leader是用来减少不必要的竞争,如果leader不为空说明已经有线程在取了,设置当前线程等待即可。


leader 就是一个信号,告诉其它线程:你们不要再去获取元素了,它们延迟时间还没到期,我都还没有取到数据呢,你们要取数据,等我取了再说


226ae438a60847a39a4ca1692f277e60.png



take()

获取并移除此队列的头部,在元素变得可用之前一直等待

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 获取可中断锁
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 获取队头元素
            E first = q.peek();
            // 队头元素为空,则阻塞等待
            if (first == null)
                available.await();
            else {
                // 获取元素延迟时间
                long delay = first.getDelay(NANOSECONDS);
                // 延时到期
                if (delay <= 0)
                    return q.poll(); // 返回出队元素
                first = null; // don't retain ref while waiting
                // 如果有其它线程在等待获取元素,则当前线程不用去竞争,直接等待
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 等待延迟时间到期
                        available.awaitNanos(delay);
                    } finally {
                        //清除 leader
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        //唤醒阻塞在available 的一个线程,表示可以取数据了
        if (leader == null && q.peek() != null)
            available.signal();
        // 释放锁
        lock.unlock();
    }
}


该方法就是相当于在前面的超时等待中,把超时时间设置为无限大,那么这样只要队列中有元素,要是元素延迟时间要求,那么就可以取出元素,否则就直接等待元素延迟时间到期,再取出元素,最先参与等待的线程会成为leader。


peek

调用此方法,可以返回队头元素,但是元素并不出队。

public E peek() {
 final ReentrantLock lock = this.lock;
 lock.lock();
    try {
     //返回队列头部元素,元素不出队
        return q.peek();
    } finally {
        lock.unlock();
    }
}


小结


DelayQueue 内部通过组合PriorityQueue 来实现存储和维护元素顺序的;

DelayQueue 存储元素必须实现Delayed 接口,通过实现Delayed 接口,可以获取到元素延迟时间,以及可以比较元素大小(Delayed 继承Comparable);

DelayQueue 通过一个可重入锁来控制元素的入队出队行为;

DelayQueue 中leader 标识 用于减少线程的竞争,表示当前有其它线程正在获取队头元素;

PriorityQueue 只是负责存储数据以及维护元素的顺序,对于延迟时间取数据则是在DelayQueue 中进行判断控制的;

DelayQueue 没有实现序列化接口

相关文章
|
4月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解02-阻塞队列之ArrayBlockingQueue
`ArrayBlockingQueue` 是Java中一个基于数组的并发队列,具有线程安全的性质。以下是其关键信息的摘要: - **继承实现关系**:它扩展了`AbstractQueue`并实现了`BlockingQueue`接口,确保线程安全的入队和出队操作。 - **数据结构**:内部由固定大小的数组支撑,有`takeIndex`和`putIndex`跟踪元素的添加和移除位置,`count`记录队列中的元素数量。 - **特点**:队列长度在创建时必须指定且不可变,遵循先进先出(FIFO)原则,当队列满时,添加元素会阻塞,空时,移除元素会阻塞。
55 0
|
1月前
ArrayBlockingQueue原理解析
该文章主要讲述了ArrayBlockingQueue的实现原理。
|
4月前
|
存储 缓存 Java
Java线程池ThreadPoolExcutor源码解读详解06-阻塞队列之SynchronousQueue
SynchronousQueue 是 Java 中的一个特殊阻塞队列,它没有容量,实现线程间的直接对象交换。这个队列的特点和优缺点如下: 1. **无容量限制**:SynchronousQueue 不存储任何元素,每个 put 操作必须等待一个 take 操作,反之亦然。这意味着生产者和消费者必须严格同步。 2. **阻塞性质**:当一个线程试图插入元素时,如果没有线程正在等待获取,那么插入操作会阻塞;同样,尝试获取元素的线程如果没有元素可取,也会被阻塞。 3. **公平与非公平策略**:SynchronousQueue 支持公平和非公平的线程调度策略。公平模式下,等待时间最长的线程优先
81 5
|
4月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解08-阻塞队列之LinkedBlockingDeque
**摘要:** 本文分析了Java中的LinkedBlockingDeque,它是一个基于链表实现的双端阻塞队列,具有并发安全性。LinkedBlockingDeque可以作为有界队列使用,容量由构造函数指定,默认为Integer.MAX_VALUE。队列操作包括在头部和尾部的插入与删除,这些操作由锁和Condition来保证线程安全。例如,`linkFirst()`和`linkLast()`用于在队首和队尾插入元素,而`unlinkFirst()`和`unlinkLast()`则用于删除首尾元素。队列的插入和删除方法根据队列是否满或空,可能会阻塞或唤醒等待的线程,这些操作通过`notFul
294 5
|
4月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解07-阻塞队列之LinkedTransferQueue
`LinkedTransferQueue`是一个基于链表结构的无界并发队列,实现了`TransferQueue`接口,它使用预占模式来协调生产者和消费者的交互。队列中的元素分为数据节点(isData为true)和请求节点(isData为false)。在不同情况下,队列提供四种操作模式:NOW(立即返回,不阻塞),ASYNC(异步,不阻塞,但后续线程可能阻塞),SYNC(同步,阻塞直到匹配),TIMED(超时等待,可能返回)。 `xfer`方法是队列的核心,它处理元素的转移过程。方法内部通过循环和CAS(Compare And Swap)操作来确保线程安全,同时避免锁的使用以提高性能。当找到匹
281 5
|
4月前
|
存储 缓存 安全
Java线程池ThreadPoolExcutor源码解读详解05-阻塞队列之DelayQueue原理及扩容机制详解
DelayQueue` 是 Java 中的一个线程安全的阻塞队列,它用于存储实现了 `Delayed` 接口的元素,这些元素都有一个延迟时间。当元素的延迟时间过去之后,它们才能被从队列中取出。以下是摘要: 1. **核心特性**: - 基于 `PriorityQueue` 实现,元素按延迟时间排序,优先级高的先出队。 - 使用 `ReentrantLock` 和条件变量 `available` 控制并发。 - 只有延迟时间小于0的元素才能被取出。 - 不允许插入 `null` 元素。 2. **构造器**: - 默认构造器创建无初始元素的队列。 - 可以
82 5
|
4月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解03-阻塞队列之LinkedBlockingQueue
LinkedBlockingQueue 和 ArrayBlockingQueue 是 Java 中的两种阻塞队列实现,它们的主要区别在于: 1. **数据结构**:ArrayBlockingQueue 采用固定大小的数组实现,而 LinkedBlockingQueue 则使用链表实现。 2. **容量**:ArrayBlockingQueue 在创建时必须指定容量,而 LinkedBlockingQueue 可以在创建时不指定容量,默认容量为 Integer.MAX_VALUE。 总结起来,如果需要高效并发且内存不是主要考虑因素,LinkedBlockingQueue 通常是更好的选择;
210 1
|
4月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解04-阻塞队列之PriorityBlockingQueue原理及扩容机制详解
1. **继承实现图关系**: - `PriorityBlockingQueue`实现了`BlockingQueue`接口,提供了线程安全的队列操作。 - 内部基于优先级堆(小顶堆或大顶堆)的数据结构实现,可以保证元素按照优先级顺序出队。 2. **底层数据存储结构**: - 默认容量是11,存储数据的数组会在需要时动态扩容。 - 数组长度总是2的幂,以满足堆的性质。 3. **构造器**: - 无参构造器创建一个默认容量的队列,元素需要实现`Comparable`接口。 - 指定容量构造器允许设置初始容量,但不指定排序规则。 - 可指定容量和比较
249 2
|
4月前
并发编程之BlockingQueue(阻塞队列)的详细解析
并发编程之BlockingQueue(阻塞队列)的详细解析
22 0
|
10月前
|
消息中间件
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue使用场景总结
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue使用场景总结
47 0