Java线程池ThreadPoolExcutor源码解读详解05-阻塞队列之DelayQueue原理及扩容机制详解

本文涉及的产品
可观测监控 Prometheus 版,每月50GB免费额度
云原生网关 MSE Higress,422元/月
应用实时监控服务-应用监控,每月50GB免费额度
简介: DelayQueue` 是 Java 中的一个线程安全的阻塞队列,它用于存储实现了 `Delayed` 接口的元素,这些元素都有一个延迟时间。当元素的延迟时间过去之后,它们才能被从队列中取出。以下是摘要:1. **核心特性**: - 基于 `PriorityQueue` 实现,元素按延迟时间排序,优先级高的先出队。 - 使用 `ReentrantLock` 和条件变量 `available` 控制并发。 - 只有延迟时间小于0的元素才能被取出。 - 不允许插入 `null` 元素。2. **构造器**: - 默认构造器创建无初始元素的队列。 - 可以


💪🏻 制定明确可量化的目标,坚持默默的做事。




一、继承实现关系图

image.gif 1711819050810.png

二、低层数据存储结构

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
...
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader;
private final Condition available = lock.newCondition();
...
}

image.gif

说明:

  • lock:添加和获取锁(同一把锁)
  • q:PriorityQueue存储数据(实现优化级队列)
  • leader:  标记正在延迟等待出队的线程
  • available: 出队阻塞对象

构造器:

public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

image.gif

说明:

  • 默认无参构造器
  • 第二个传入一个集合初始化队列元素构造器

三、特点及优缺点

3.1 特点

  • 支持优先级排序的无界阻塞延迟队列,优先级高的先出队,先先级低的后出队
  • 存储数据结构为数组,以二叉堆形式存储于数组中,二叉堆叶子节点均小于父节点
  • 只有一把锁(出队有阻塞)
  • 不可添加为null的元素
  • 出队取队头元素(只有队头元素的delay为0时才能取出元素)
  • 初始容量大小为11
  • 最大可扩容到Integer.MAX_VALUE

3.2 优缺点

  • 根据排序规则排序(元素必须实现 Delayed接口)
  • put不阻塞(若生产速度快于消费速度,会耗尽所有的可用堆内空间)
  • 优先级排序无界阻塞队列(无界是说Integer.MAX_VALUE非常大,一般不会达这个数字内存就已经撑不住)
  • 取数据锁为延迟阻塞出队
  • 内部数组长度不够会进行扩容
  • 新增元素进行最小二叉推排序

四、源码详解

读取重要源码:

  • 添加任务方法
  • 获取和删除任务方法
  • 扩容

阅读明白这几个接口即可,其它都很简单。

4.1 put、offer

public void put(E e) {
    offer(e);
}
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            // 刚插入就在队首,说明队列是空的,目前只有一个元素 
            //     则唤醒队列为空时的阻塞
            //     唤醒队列leader不为空时的阻塞
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}
// p.offer(e)
public boolean offer(E e) {
    // 插入元素不允许为空
    if (e == null)
        throw new NullPointerException();
    modCount++;
    int i = size;
    if (i >= queue.length)
        // 队列为空则扩容
        grow(i + 1);
    // 按优先级插入到队列中
    siftUp(i, e);
    // 队列元素个数+1
    size = i + 1;
    return true;
}
/**
 * 按优先级插入到队列中
 */
private void siftUp(int k, E x) {
    if (comparator != null)
        siftUpUsingComparator(k, x, queue, comparator);
    else
        siftUpComparable(k, x, queue);
}

image.gif

4.2 take

  • 加锁
  • 读取队首元素,若为空则阻塞,
  • 若不为空且过期时间为0,则取出队首元素),队列元素重排
  • 最后一个元素n 并且最后一个元素赋值为null
  • 队列按排序规则从子节点中取元素填充父节点
  • 子节点中取元素似为空的父节点,重复上一步骤,直到取出的子节点为叶子节点leaf
  • first = null 解决高并发情况下某一线程a延迟等待出队,而其它线程因leader!=null而available.await()等待,因线程a出取first而其它线程first一直不为空,导致JVM无法回收导致内存泄漏问题
  • 判断leader元素是否为空,不为空的话阻塞当前线程
  • leader元素为空,则设置leader为当前线程,阻塞delay时间,delay结束重复以上步骤
  • 若无取出线程,且队列不为空,则唤配所有其它awai()线程
  • 解锁
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)
                    return q.poll();
                // first = null 解决高并发情况下某一线程a延迟等待出队,而其它线程因leader!=null而available.await()等待,因线程a出取first而其它线程first一直不为空,导致JVM无法回收导致内存泄漏问题
                first = null;
                if (leader != null)
                    available.await();
                else {
                    // leader为空,则设置当前线程为取元素线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 延迟等待取出元素
                        available.awaitNanos(delay);
                    } finally {
                        // 延迟等待结束之后,还原现场
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            // 无正在取元素线程,且队列不为空,唤醒await的线程
            available.signal();
        // 释放锁
        lock.unlock();
    }
}
public E poll() {
    final Object[] es;
    final E result;
    if ((result = (E) ((es = queue)[0])) != null) {
        modCount++;
        final int n;
        final E x = (E) es[(n = --size)];
        es[n] = null;
        if (n > 0) {
            final Comparator<? super E> cmp;
            if ((cmp = comparator) == null)
                siftDownComparable(0, x, es, n);
            else
                siftDownUsingComparator(0, x, es, n, cmp);
        }
    }
    return result;
}

image.gif

4.3 扩容原理

  • 默认队列大小为11,扩容最大大小为Integer.MAX_VALUE
  • 扩容时机:元素个数 >= 队列的大小
  • 扩容规则:
  • 队列大小 < 64,则新队列大小 = (旧队列大小 + 1) * 2
  • 队列大小 >= 64,则新队列大小 = 旧队列大小 + 旧队列大小 / 2
  • 当旧容量大于Integer.MAX_VALUE - 8时,新容量值=旧容量 + 1(下面看源码说明)
  • 当超过Ineter.MAX_VALUE时,报OutOfMemoryError错误
  • 创建新容量数组队列,原数组队列数据复制到新数组队列中
private void grow(int minCapacity) {
    int oldCapacity = queue.length;
    // 当旧容量大小小于64,新容量 = 旧容量 + (旧容量 + 2)
    // 当旧容量大小大于63,新容量 = 旧容量 + (旧容量 / 2)
    // 当旧容量大于Integer.MAX_VALUE - 8时,走hugeLength(int oldLength, int minGrowth)逻辑
    // minCapacity = (i = size) + 1 (即oldCapacity+1) - oldCapacity = 1
    int newCapacity = ArraysSupport.newLength(oldCapacity,
            minCapacity - oldCapacity, /* minimum growth */
            oldCapacity < 64 ? oldCapacity + 2 : oldCapacity >> 1
            /* preferred growth */);
    queue = Arrays.copyOf(queue, newCapacity);
}
public static int newLength(int oldLength, int minGrowth, int prefGrowth) {
    // preconditions not checked because of inlining
    // assert oldLength >= 0
    // assert minGrowth > 0
    int prefLength = oldLength + Math.max(minGrowth, prefGrowth); // might overflow
    if (0 < prefLength && prefLength <= SOFT_MAX_ARRAY_LENGTH) {
        return prefLength;
    } else {
        // put code cold in a separate method
        return hugeLength(oldLength, minGrowth);
    }
}
private static int hugeLength(int oldLength, int minGrowth) {
    // 当oldLength > Integer.MAX_VALUE - 8时
    // minLength = oldLength + 1
    // 当minLength < Integer.MAX_VALUE,此时minLength > SOFT_MAX_ARRAY_LENGTH(Integer.MAX_VALUE - 8)返回 minLength
    // 否则抛OutOfMemoryError错误
    int minLength = oldLength + minGrowth;
    if (minLength < 0) { // overflow
        throw new OutOfMemoryError(
                "Required array length " + oldLength + " + " + minGrowth + " is too large");
    } else if (minLength <= SOFT_MAX_ARRAY_LENGTH) {
        return SOFT_MAX_ARRAY_LENGTH;
    } else {
        return minLength;
    }
}

image.gif

五、作用及使用场景

  • 下单:如唯品会,下单30分钟之内不支付自动取消
  • 试课课程:试课结束前15分钟通知老师
  • 空闲连接延迟自动关闭
  • 缓存:超过时间自动清除
  • 超时处理、业务办理排队叫号、插队和枪购活动等



相关文章
|
28天前
|
XML Java 编译器
Java注解的底层源码剖析与技术认识
Java注解(Annotation)是Java 5引入的一种新特性,它提供了一种在代码中添加元数据(Metadata)的方式。注解本身并不是代码的一部分,它们不会直接影响代码的执行,但可以在编译、类加载和运行时被读取和处理。注解为开发者提供了一种以非侵入性的方式为代码提供额外信息的手段,这些信息可以用于生成文档、编译时检查、运行时处理等。
62 7
|
2月前
|
数据采集 人工智能 Java
Java产科专科电子病历系统源码
产科专科电子病历系统,全结构化设计,实现产科专科电子病历与院内HIS、LIS、PACS信息系统、区域妇幼信息平台的三级互联互通,系统由门诊系统、住院系统、数据统计模块三部分组成,它管理了孕妇从怀孕开始到生产结束42天一系列医院保健服务信息。
35 4
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
87 2
|
20天前
|
存储 JavaScript 前端开发
基于 SpringBoot 和 Vue 开发校园点餐订餐外卖跑腿Java源码
一个非常实用的校园外卖系统,基于 SpringBoot 和 Vue 的开发。这一系统源于黑马的外卖案例项目 经过站长的进一步改进和优化,提供了更丰富的功能和更高的可用性。 这个项目的架构设计非常有趣。虽然它采用了SpringBoot和Vue的组合,但并不是一个完全分离的项目。 前端视图通过JS的方式引入了Vue和Element UI,既能利用Vue的快速开发优势,
102 13
|
1月前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
2月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
57 12
|
28天前
|
JavaScript 安全 Java
java版药品不良反应智能监测系统源码,采用SpringBoot、Vue、MySQL技术开发
基于B/S架构,采用Java、SpringBoot、Vue、MySQL等技术自主研发的ADR智能监测系统,适用于三甲医院,支持二次开发。该系统能自动监测全院患者药物不良反应,通过移动端和PC端实时反馈,提升用药安全。系统涵盖规则管理、监测报告、系统管理三大模块,确保精准、高效地处理ADR事件。
|
29天前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####
|
2月前
|
存储 缓存 监控
Java中的线程池深度解析####
本文深入探讨了Java并发编程中的核心组件——线程池,从其基本概念、工作原理、核心参数解析到应用场景与最佳实践,全方位剖析了线程池在提升应用性能、资源管理和任务调度方面的重要作用。通过实例演示和性能对比,揭示合理配置线程池对于构建高效Java应用的关键意义。 ####
|
30天前
|
人工智能 移动开发 安全
家政上门系统用户端、阿姨端源码,java家政管理平台源码
家政上门系统基于互联网技术,整合大数据分析、AI算法和现代通信技术,提供便捷高效的家政服务。涵盖保洁、月嫂、烹饪等多元化服务,支持多终端访问,具备智能匹配、在线支付、订单管理等功能,确保服务透明、安全,适用于家庭生活的各种需求场景,推动家政市场规范化发展。