Java Review - 并发编程_DelayQueue原理&源码剖析

简介: Java Review - 并发编程_DelayQueue原理&源码剖析

195d03d17afc4a928bc581f313b01dfe.png

概述


DelayQueue并发队列是一个无界阻塞延迟队列,队列中的每个元素都有个过期时间,当从队列获取元素时,只有过期元素才会出队列。

队列头元素是最快要过期的元素。


类图结构


684b8944414e49519590ca8f1d221682.png

由该图可知


DelayQueue内部使用PriorityQueue存放数据,使用ReentrantLock实现线程同步。


另外,队列里面的元素要实现Delayed接口,由于每个元素都有一个过期时间,所以要实现获知当前元素还剩下多少时间就过期了的接口,由于内部使用优先级队列来实现,所以要实现元素之间相互比较的接口。

44bf7efb667743d2ad5ad25b79ee0360.png

/**
 * A mix-in style interface for marking objects that should be
 * acted upon after a given delay.
 *
 * <p>An implementation of this interface must define a
 * {@code compareTo} method that provides an ordering consistent with
 * its {@code getDelay} method.
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface Delayed extends Comparable<Delayed> {
    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}


条件变量available与lock锁是对应的,其目的是为了实现线程间同步


a29b207c1a06483b9f5a2eeceb95b227.png

 private final transient ReentrantLock lock = new ReentrantLock();
  /**
     * Condition signalled when a newer element becomes available
     * at the head of the queue or a new thread may need to
     * become leader.
     */
    private final Condition available = lock.newCondition();


其中leader变量的使用基于Leader-Follower模式的变体,用于尽量减少不必要的线程等待。当一个线程调用队列的take方法变为leader线程后,它会调用条件变量available.awaitNanos(delay)等待delay时间,但是其他线程(follwer线程)则会调用available.await()进行无限等待

leader线程延迟时间过期后,会退出take方法,并通过调用available.signal()方法唤醒一个follwer线程,被唤醒的follwer线程被选举为新的leader线程。


每日一博 - DelayQueue阻塞队列源码解读

    /**
     * Thread designated to wait for the element at the head of
     * the queue.  This variant of the Leader-Follower pattern
     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
     * minimize unnecessary timed waiting.  When a thread becomes
     * the leader, it waits only for the next delay to elapse, but
     * other threads await indefinitely.  The leader thread must
     * signal some other thread before returning from take() or
     * poll(...), unless some other thread becomes leader in the
     * interim.  Whenever the head of the queue is replaced with
     * an element with an earlier expiration time, the leader
     * field is invalidated by being reset to null, and some
     * waiting thread, but not necessarily the current leader, is
     * signalled.  So waiting threads must be prepared to acquire
     * and lose leadership while waiting.
     */
    private Thread leader = null;

小Demo


import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/12/19 23:05
 * @mark: show me the code , change the world
 */
public class DelayQueueTest {
    static class DelayedEle implements Delayed {
        private final long delayTime; //延迟时间
        private final long expire;  //到期时间
        private String data;   //数据
        public DelayedEle(long delay, String data) {
            delayTime = delay;
            this.data = data;
            expire = System.currentTimeMillis() + delay;
        }
        /**
         * 剩余时间=到期时间-当前时间
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        /**
         * 优先队列里面优先级规则
         */
        @Override
        public int compareTo(Delayed o) {
            return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
        }
        @Override
        public String toString() {
            final StringBuilder sb = new StringBuilder("DelayedElement{");
            sb.append("delay=").append(delayTime);
            sb.append(", expire=").append(expire);
            sb.append(", data='").append(data).append('\'');
            sb.append('}');
            return sb.toString();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        // 1 创建延时队列
        DelayQueue<DelayedEle> delayQueue = new DelayQueue<DelayedEle>();
        // 2 创建延时任务
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            DelayedEle ele = new DelayedEle(random.nextInt(500), "task-" + i);
            delayQueue.offer(ele);
        }
        System.out.println("开始操作,delayQueue队列大小为:" + delayQueue.size());
        // 3 依次取出任务并打印
        DelayedEle delayedEle = null;
        try {
            // 3.1 循环,如果想避免虚假唤醒,则不能把全部元素都打印出来
            for (; ; ) {
                // 3.2 获取过期的任务并打印
                while ((delayedEle = delayQueue.take()) != null) {
                    System.out.println(delayedEle.toString());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


首先创建延迟任务DelayedEle类,其中delayTime表示当前任务需要延迟多少ms时间过期,expire则是当前时间的ms值加上delayTime的值。


另外,实现了Delayed接口,实现了long getDelay(TimeUnit unit)方法用来获取当前元素还剩下多少时间过期,实现了int compareTo(Delayed o)方法用来决定优先级队列元素的比较规则。


在main函数内首先创建了一个延迟队列,然后使用随机数生成器生成了10个延迟任务,最后通过循环依次获取延迟任务,并打印。运行上面代码,一个可能的输出如下所示。


116ebe60795045f7a9bf01a53c4526ce.png


可见,出队的顺序和delay时间有关,而与创建任务的顺序无关。


核心方法&源码解读

offer操作


插入元素到队列,如果插入元素为null则抛出NullPointerException异常,否则由于是无界队列,所以一直返回true。插入元素要实现Delayed接口。

    /**
     * Inserts the specified element into this delay queue.
     *
     * @param e the element to add
     * @return {@code true}
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock; // 1 
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {// 2 
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }


首先获取独占锁,然后添加元素到优先级队列,由于q是优先级队列,所以添加元素后,调用q.peek()方法返回的并不一定是当前添加的元素

如果代码(2)判断结果为true,则说明当前元素e是最先将过期的,那么重置leader线程为null,这时候激活avaliable变量条件队列里面的一个线程,告诉它队列里面有元素了。


take操作


获取并移除队列里面延迟时间过期的元素,如果队列里面没有过期元素则等待。

/**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
              // 1 获取但不移除队首元素 
                E first = q.peek();
                if (first == null) 
                    available.await(); // 2 
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0) // 3 
                        return q.poll();
                    first = null; // don't retain ref while waiting  
                    if (leader != null) // 4
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread; // 5
                        try {
                            available.awaitNanos(delay); // 6
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null) // 7  
                available.signal();
            lock.unlock();
        }
    }


首先获取独占锁lock。假设线程A第一次调用队列的take()方法时队列为空,则执行代码(1)后first==null,所以会执行代码(2)把当前线程放入available的条件队列里阻塞等待。


当有另外一个线程B执行offer(item)方法并且添加元素到队列时,假设此时没有其他线程执行入队操作,则线程B添加的元素是队首元素,那么执行q.peek()。


e这时候就会重置leader线程为null,并且激活条件变量的条件队列里面的一个线程。此时线程A就会被激活。


线程A被激活并循环后重新获取队首元素,这时候first就是线程B新增的元素,可知这时候first不为null,则调用first.getDelay(TimeUnit.NANOSECONDS)方法查看该元素还剩余多少时间就要过期,如果delay<=0则说明已经过期,那么直接出队返回。


否则查看leader是否为null,不为null则说明其他线程也在执行take,则把该线程放入条件队列。如果这时候leader为null,则选取当前线程A为leader线程,


然后执行代码(5)等待delay时间(这期间该线程会释放锁,所以其他线程可以offer添加元素,也可以take阻塞自己),剩余过期时间到后,线程A会重新竞争得到锁,然后重置leader线程为null,重新进入循环,这时候就会发现队头的元素已经过期了,则会直接返回队头元素。


在返回前会执行finally块里面的代码(7),代码(7)执行结果为true则说明当前线程从队列移除过期元素后,又有其他线程执行了入队操作,那么这时候调用条件变量的singal方法,激活条件队列里面的等待线程。


poll操作


获取并移除队头过期元素,如果没有过期元素则返回null。

    /**
     * Retrieves and removes the head of this queue, or returns {@code null}
     * if this queue has no elements with an expired delay.
     *
     * @return the head of this queue, or {@code null} if this
     *         queue has no elements with an expired delay
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            // 如果队列为空,或者不为空但是对头元素没有过期,则返回null
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }


首先获取独占锁,然后获取队头元素,如果队头元素为null或者还没过期则返回null,否则返回队头元素。


size操作

计算队列元素个数,包含过期的和没有过期的。

 public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.size();
        } finally {
            lock.unlock();
        }
    }


先获取独占锁,然后调用优先级队列的size方法。


小结

DelayQueue队列内部使用PriorityQueue存放数据,使用ReentrantLock实现线程同步。


57f8eff709684011b3fb27e47cd85165.png


另外队列里面的元素要实现Delayed接口,其中一个是获取当前元素到过期时间剩余时间的接口,在出队时判断元素是否过期了,一个是元素之间比较的接口,因为这是一个有优先级的队列。

相关文章
|
21天前
|
XML Java 编译器
Java注解的底层源码剖析与技术认识
Java注解(Annotation)是Java 5引入的一种新特性,它提供了一种在代码中添加元数据(Metadata)的方式。注解本身并不是代码的一部分,它们不会直接影响代码的执行,但可以在编译、类加载和运行时被读取和处理。注解为开发者提供了一种以非侵入性的方式为代码提供额外信息的手段,这些信息可以用于生成文档、编译时检查、运行时处理等。
59 7
|
22天前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
24 0
|
4天前
|
监控 Java API
探索Java NIO:究竟在哪些领域能大显身手?揭秘原理、应用场景与官方示例代码
Java NIO(New IO)自Java SE 1.4引入,提供比传统IO更高效、灵活的操作,支持非阻塞IO和选择器特性,适用于高并发、高吞吐量场景。NIO的核心概念包括通道(Channel)、缓冲区(Buffer)和选择器(Selector),能实现多路复用和异步操作。其应用场景涵盖网络通信、文件操作、进程间通信及数据库操作等。NIO的优势在于提高并发性和性能,简化编程;但学习成本较高,且与传统IO存在不兼容性。尽管如此,NIO在构建高性能框架如Netty、Mina和Jetty中仍广泛应用。
19 3
|
4天前
|
安全 算法 Java
Java CAS原理和应用场景大揭秘:你掌握了吗?
CAS(Compare and Swap)是一种乐观锁机制,通过硬件指令实现原子操作,确保多线程环境下对共享变量的安全访问。它避免了传统互斥锁的性能开销和线程阻塞问题。CAS操作包含三个步骤:获取期望值、比较当前值与期望值是否相等、若相等则更新为新值。CAS广泛应用于高并发场景,如数据库事务、分布式锁、无锁数据结构等,但需注意ABA问题。Java中常用`java.util.concurrent.atomic`包下的类支持CAS操作。
23 2
|
13天前
|
存储 JavaScript 前端开发
基于 SpringBoot 和 Vue 开发校园点餐订餐外卖跑腿Java源码
一个非常实用的校园外卖系统,基于 SpringBoot 和 Vue 的开发。这一系统源于黑马的外卖案例项目 经过站长的进一步改进和优化,提供了更丰富的功能和更高的可用性。 这个项目的架构设计非常有趣。虽然它采用了SpringBoot和Vue的组合,但并不是一个完全分离的项目。 前端视图通过JS的方式引入了Vue和Element UI,既能利用Vue的快速开发优势,
77 13
|
27天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
54 12
|
21天前
|
JavaScript 安全 Java
java版药品不良反应智能监测系统源码,采用SpringBoot、Vue、MySQL技术开发
基于B/S架构,采用Java、SpringBoot、Vue、MySQL等技术自主研发的ADR智能监测系统,适用于三甲医院,支持二次开发。该系统能自动监测全院患者药物不良反应,通过移动端和PC端实时反馈,提升用药安全。系统涵盖规则管理、监测报告、系统管理三大模块,确保精准、高效地处理ADR事件。
|
23天前
|
人工智能 移动开发 安全
家政上门系统用户端、阿姨端源码,java家政管理平台源码
家政上门系统基于互联网技术,整合大数据分析、AI算法和现代通信技术,提供便捷高效的家政服务。涵盖保洁、月嫂、烹饪等多元化服务,支持多终端访问,具备智能匹配、在线支付、订单管理等功能,确保服务透明、安全,适用于家庭生活的各种需求场景,推动家政市场规范化发展。
|
机器学习/深度学习 Java 程序员
Java Review(三十二、异常处理)
Java Review(三十二、异常处理)
140 0
Java Review(三十二、异常处理)
|
XML 存储 Java
Java Review(三十三、异常处理----补充:断言、日志、调试)
Java Review(三十三、异常处理----补充:断言、日志、调试)
180 0