Java线程池ThreadPoolExcutor源码解读详解05-阻塞队列之DelayQueue原理及扩容机制详解

本文涉及的产品
应用实时监控服务-可观测链路OpenTelemetry版,每月50GB免费额度
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: DelayQueue` 是 Java 中的一个线程安全的阻塞队列,它用于存储实现了 `Delayed` 接口的元素,这些元素都有一个延迟时间。当元素的延迟时间过去之后,它们才能被从队列中取出。以下是摘要:1. **核心特性**: - 基于 `PriorityQueue` 实现,元素按延迟时间排序,优先级高的先出队。 - 使用 `ReentrantLock` 和条件变量 `available` 控制并发。 - 只有延迟时间小于0的元素才能被取出。 - 不允许插入 `null` 元素。2. **构造器**: - 默认构造器创建无初始元素的队列。 - 可以


💪🏻 制定明确可量化的目标,坚持默默的做事。




一、继承实现关系图

image.gif 1711819050810.png

二、低层数据存储结构

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
...
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader;
private final Condition available = lock.newCondition();
...
}

image.gif

说明:

  • lock:添加和获取锁(同一把锁)
  • q:PriorityQueue存储数据(实现优化级队列)
  • leader:  标记正在延迟等待出队的线程
  • available: 出队阻塞对象

构造器:

public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

image.gif

说明:

  • 默认无参构造器
  • 第二个传入一个集合初始化队列元素构造器

三、特点及优缺点

3.1 特点

  • 支持优先级排序的无界阻塞延迟队列,优先级高的先出队,先先级低的后出队
  • 存储数据结构为数组,以二叉堆形式存储于数组中,二叉堆叶子节点均小于父节点
  • 只有一把锁(出队有阻塞)
  • 不可添加为null的元素
  • 出队取队头元素(只有队头元素的delay为0时才能取出元素)
  • 初始容量大小为11
  • 最大可扩容到Integer.MAX_VALUE

3.2 优缺点

  • 根据排序规则排序(元素必须实现 Delayed接口)
  • put不阻塞(若生产速度快于消费速度,会耗尽所有的可用堆内空间)
  • 优先级排序无界阻塞队列(无界是说Integer.MAX_VALUE非常大,一般不会达这个数字内存就已经撑不住)
  • 取数据锁为延迟阻塞出队
  • 内部数组长度不够会进行扩容
  • 新增元素进行最小二叉推排序

四、源码详解

读取重要源码:

  • 添加任务方法
  • 获取和删除任务方法
  • 扩容

阅读明白这几个接口即可,其它都很简单。

4.1 put、offer

public void put(E e) {
    offer(e);
}
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            // 刚插入就在队首,说明队列是空的,目前只有一个元素 
            //     则唤醒队列为空时的阻塞
            //     唤醒队列leader不为空时的阻塞
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}
// p.offer(e)
public boolean offer(E e) {
    // 插入元素不允许为空
    if (e == null)
        throw new NullPointerException();
    modCount++;
    int i = size;
    if (i >= queue.length)
        // 队列为空则扩容
        grow(i + 1);
    // 按优先级插入到队列中
    siftUp(i, e);
    // 队列元素个数+1
    size = i + 1;
    return true;
}
/**
 * 按优先级插入到队列中
 */
private void siftUp(int k, E x) {
    if (comparator != null)
        siftUpUsingComparator(k, x, queue, comparator);
    else
        siftUpComparable(k, x, queue);
}

image.gif

4.2 take

  • 加锁
  • 读取队首元素,若为空则阻塞,
  • 若不为空且过期时间为0,则取出队首元素),队列元素重排
  • 最后一个元素n 并且最后一个元素赋值为null
  • 队列按排序规则从子节点中取元素填充父节点
  • 子节点中取元素似为空的父节点,重复上一步骤,直到取出的子节点为叶子节点leaf
  • first = null 解决高并发情况下某一线程a延迟等待出队,而其它线程因leader!=null而available.await()等待,因线程a出取first而其它线程first一直不为空,导致JVM无法回收导致内存泄漏问题
  • 判断leader元素是否为空,不为空的话阻塞当前线程
  • leader元素为空,则设置leader为当前线程,阻塞delay时间,delay结束重复以上步骤
  • 若无取出线程,且队列不为空,则唤配所有其它awai()线程
  • 解锁
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)
                    return q.poll();
                // first = null 解决高并发情况下某一线程a延迟等待出队,而其它线程因leader!=null而available.await()等待,因线程a出取first而其它线程first一直不为空,导致JVM无法回收导致内存泄漏问题
                first = null;
                if (leader != null)
                    available.await();
                else {
                    // leader为空,则设置当前线程为取元素线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 延迟等待取出元素
                        available.awaitNanos(delay);
                    } finally {
                        // 延迟等待结束之后,还原现场
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            // 无正在取元素线程,且队列不为空,唤醒await的线程
            available.signal();
        // 释放锁
        lock.unlock();
    }
}
public E poll() {
    final Object[] es;
    final E result;
    if ((result = (E) ((es = queue)[0])) != null) {
        modCount++;
        final int n;
        final E x = (E) es[(n = --size)];
        es[n] = null;
        if (n > 0) {
            final Comparator<? super E> cmp;
            if ((cmp = comparator) == null)
                siftDownComparable(0, x, es, n);
            else
                siftDownUsingComparator(0, x, es, n, cmp);
        }
    }
    return result;
}

image.gif

4.3 扩容原理

  • 默认队列大小为11,扩容最大大小为Integer.MAX_VALUE
  • 扩容时机:元素个数 >= 队列的大小
  • 扩容规则:
  • 队列大小 < 64,则新队列大小 = (旧队列大小 + 1) * 2
  • 队列大小 >= 64,则新队列大小 = 旧队列大小 + 旧队列大小 / 2
  • 当旧容量大于Integer.MAX_VALUE - 8时,新容量值=旧容量 + 1(下面看源码说明)
  • 当超过Ineter.MAX_VALUE时,报OutOfMemoryError错误
  • 创建新容量数组队列,原数组队列数据复制到新数组队列中
private void grow(int minCapacity) {
    int oldCapacity = queue.length;
    // 当旧容量大小小于64,新容量 = 旧容量 + (旧容量 + 2)
    // 当旧容量大小大于63,新容量 = 旧容量 + (旧容量 / 2)
    // 当旧容量大于Integer.MAX_VALUE - 8时,走hugeLength(int oldLength, int minGrowth)逻辑
    // minCapacity = (i = size) + 1 (即oldCapacity+1) - oldCapacity = 1
    int newCapacity = ArraysSupport.newLength(oldCapacity,
            minCapacity - oldCapacity, /* minimum growth */
            oldCapacity < 64 ? oldCapacity + 2 : oldCapacity >> 1
            /* preferred growth */);
    queue = Arrays.copyOf(queue, newCapacity);
}
public static int newLength(int oldLength, int minGrowth, int prefGrowth) {
    // preconditions not checked because of inlining
    // assert oldLength >= 0
    // assert minGrowth > 0
    int prefLength = oldLength + Math.max(minGrowth, prefGrowth); // might overflow
    if (0 < prefLength && prefLength <= SOFT_MAX_ARRAY_LENGTH) {
        return prefLength;
    } else {
        // put code cold in a separate method
        return hugeLength(oldLength, minGrowth);
    }
}
private static int hugeLength(int oldLength, int minGrowth) {
    // 当oldLength > Integer.MAX_VALUE - 8时
    // minLength = oldLength + 1
    // 当minLength < Integer.MAX_VALUE,此时minLength > SOFT_MAX_ARRAY_LENGTH(Integer.MAX_VALUE - 8)返回 minLength
    // 否则抛OutOfMemoryError错误
    int minLength = oldLength + minGrowth;
    if (minLength < 0) { // overflow
        throw new OutOfMemoryError(
                "Required array length " + oldLength + " + " + minGrowth + " is too large");
    } else if (minLength <= SOFT_MAX_ARRAY_LENGTH) {
        return SOFT_MAX_ARRAY_LENGTH;
    } else {
        return minLength;
    }
}

image.gif

五、作用及使用场景

  • 下单:如唯品会,下单30分钟之内不支付自动取消
  • 试课课程:试课结束前15分钟通知老师
  • 空闲连接延迟自动关闭
  • 缓存:超过时间自动清除
  • 超时处理、业务办理排队叫号、插队和枪购活动等



相关文章
|
1月前
|
XML Java 编译器
Java注解的底层源码剖析与技术认识
Java注解(Annotation)是Java 5引入的一种新特性,它提供了一种在代码中添加元数据(Metadata)的方式。注解本身并不是代码的一部分,它们不会直接影响代码的执行,但可以在编译、类加载和运行时被读取和处理。注解为开发者提供了一种以非侵入性的方式为代码提供额外信息的手段,这些信息可以用于生成文档、编译时检查、运行时处理等。
71 7
|
6天前
|
Java Linux 调度
硬核揭秘:线程与进程的底层原理,面试高分必备!
嘿,大家好!我是小米,29岁的技术爱好者。今天来聊聊线程和进程的区别。进程是操作系统中运行的程序实例,有独立内存空间;线程是进程内的最小执行单元,共享内存。创建进程开销大但更安全,线程轻量高效但易引发数据竞争。面试时可强调:进程是资源分配单位,线程是CPU调度单位。根据不同场景选择合适的并发模型,如高并发用线程池。希望这篇文章能帮你更好地理解并回答面试中的相关问题,祝你早日拿下心仪的offer!
25 6
|
15天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
72 17
|
13天前
|
监控 JavaScript 数据可视化
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
|
28天前
|
监控 Java API
探索Java NIO:究竟在哪些领域能大显身手?揭秘原理、应用场景与官方示例代码
Java NIO(New IO)自Java SE 1.4引入,提供比传统IO更高效、灵活的操作,支持非阻塞IO和选择器特性,适用于高并发、高吞吐量场景。NIO的核心概念包括通道(Channel)、缓冲区(Buffer)和选择器(Selector),能实现多路复用和异步操作。其应用场景涵盖网络通信、文件操作、进程间通信及数据库操作等。NIO的优势在于提高并发性和性能,简化编程;但学习成本较高,且与传统IO存在不兼容性。尽管如此,NIO在构建高性能框架如Netty、Mina和Jetty中仍广泛应用。
41 3
|
1月前
|
存储 JavaScript 前端开发
基于 SpringBoot 和 Vue 开发校园点餐订餐外卖跑腿Java源码
一个非常实用的校园外卖系统,基于 SpringBoot 和 Vue 的开发。这一系统源于黑马的外卖案例项目 经过站长的进一步改进和优化,提供了更丰富的功能和更高的可用性。 这个项目的架构设计非常有趣。虽然它采用了SpringBoot和Vue的组合,但并不是一个完全分离的项目。 前端视图通过JS的方式引入了Vue和Element UI,既能利用Vue的快速开发优势,
130 13
|
28天前
|
安全 算法 Java
Java CAS原理和应用场景大揭秘:你掌握了吗?
CAS(Compare and Swap)是一种乐观锁机制,通过硬件指令实现原子操作,确保多线程环境下对共享变量的安全访问。它避免了传统互斥锁的性能开销和线程阻塞问题。CAS操作包含三个步骤:获取期望值、比较当前值与期望值是否相等、若相等则更新为新值。CAS广泛应用于高并发场景,如数据库事务、分布式锁、无锁数据结构等,但需注意ABA问题。Java中常用`java.util.concurrent.atomic`包下的类支持CAS操作。
64 2
|
1月前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
2月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
65 12
|
1月前
|
JavaScript 安全 Java
java版药品不良反应智能监测系统源码,采用SpringBoot、Vue、MySQL技术开发
基于B/S架构,采用Java、SpringBoot、Vue、MySQL等技术自主研发的ADR智能监测系统,适用于三甲医院,支持二次开发。该系统能自动监测全院患者药物不良反应,通过移动端和PC端实时反馈,提升用药安全。系统涵盖规则管理、监测报告、系统管理三大模块,确保精准、高效地处理ADR事件。