Java线程池ThreadPoolExcutor源码解读详解05-阻塞队列之DelayQueue原理及扩容机制详解

本文涉及的产品
应用实时监控服务-应用监控,每月50GB免费额度
容器镜像服务 ACR,镜像仓库100个 不限时长
可观测监控 Prometheus 版,每月50GB免费额度
简介: DelayQueue` 是 Java 中的一个线程安全的阻塞队列,它用于存储实现了 `Delayed` 接口的元素,这些元素都有一个延迟时间。当元素的延迟时间过去之后,它们才能被从队列中取出。以下是摘要:1. **核心特性**: - 基于 `PriorityQueue` 实现,元素按延迟时间排序,优先级高的先出队。 - 使用 `ReentrantLock` 和条件变量 `available` 控制并发。 - 只有延迟时间小于0的元素才能被取出。 - 不允许插入 `null` 元素。2. **构造器**: - 默认构造器创建无初始元素的队列。 - 可以


💪🏻 制定明确可量化的目标,坚持默默的做事。




一、继承实现关系图

image.gif 1711819050810.png

二、低层数据存储结构

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
...
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader;
private final Condition available = lock.newCondition();
...
}

image.gif

说明:

  • lock:添加和获取锁(同一把锁)
  • q:PriorityQueue存储数据(实现优化级队列)
  • leader:  标记正在延迟等待出队的线程
  • available: 出队阻塞对象

构造器:

public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

image.gif

说明:

  • 默认无参构造器
  • 第二个传入一个集合初始化队列元素构造器

三、特点及优缺点

3.1 特点

  • 支持优先级排序的无界阻塞延迟队列,优先级高的先出队,先先级低的后出队
  • 存储数据结构为数组,以二叉堆形式存储于数组中,二叉堆叶子节点均小于父节点
  • 只有一把锁(出队有阻塞)
  • 不可添加为null的元素
  • 出队取队头元素(只有队头元素的delay为0时才能取出元素)
  • 初始容量大小为11
  • 最大可扩容到Integer.MAX_VALUE

3.2 优缺点

  • 根据排序规则排序(元素必须实现 Delayed接口)
  • put不阻塞(若生产速度快于消费速度,会耗尽所有的可用堆内空间)
  • 优先级排序无界阻塞队列(无界是说Integer.MAX_VALUE非常大,一般不会达这个数字内存就已经撑不住)
  • 取数据锁为延迟阻塞出队
  • 内部数组长度不够会进行扩容
  • 新增元素进行最小二叉推排序

四、源码详解

读取重要源码:

  • 添加任务方法
  • 获取和删除任务方法
  • 扩容

阅读明白这几个接口即可,其它都很简单。

4.1 put、offer

public void put(E e) {
    offer(e);
}
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            // 刚插入就在队首,说明队列是空的,目前只有一个元素 
            //     则唤醒队列为空时的阻塞
            //     唤醒队列leader不为空时的阻塞
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}
// p.offer(e)
public boolean offer(E e) {
    // 插入元素不允许为空
    if (e == null)
        throw new NullPointerException();
    modCount++;
    int i = size;
    if (i >= queue.length)
        // 队列为空则扩容
        grow(i + 1);
    // 按优先级插入到队列中
    siftUp(i, e);
    // 队列元素个数+1
    size = i + 1;
    return true;
}
/**
 * 按优先级插入到队列中
 */
private void siftUp(int k, E x) {
    if (comparator != null)
        siftUpUsingComparator(k, x, queue, comparator);
    else
        siftUpComparable(k, x, queue);
}

image.gif

4.2 take

  • 加锁
  • 读取队首元素,若为空则阻塞,
  • 若不为空且过期时间为0,则取出队首元素),队列元素重排
  • 最后一个元素n 并且最后一个元素赋值为null
  • 队列按排序规则从子节点中取元素填充父节点
  • 子节点中取元素似为空的父节点,重复上一步骤,直到取出的子节点为叶子节点leaf
  • first = null 解决高并发情况下某一线程a延迟等待出队,而其它线程因leader!=null而available.await()等待,因线程a出取first而其它线程first一直不为空,导致JVM无法回收导致内存泄漏问题
  • 判断leader元素是否为空,不为空的话阻塞当前线程
  • leader元素为空,则设置leader为当前线程,阻塞delay时间,delay结束重复以上步骤
  • 若无取出线程,且队列不为空,则唤配所有其它awai()线程
  • 解锁
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)
                    return q.poll();
                // first = null 解决高并发情况下某一线程a延迟等待出队,而其它线程因leader!=null而available.await()等待,因线程a出取first而其它线程first一直不为空,导致JVM无法回收导致内存泄漏问题
                first = null;
                if (leader != null)
                    available.await();
                else {
                    // leader为空,则设置当前线程为取元素线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 延迟等待取出元素
                        available.awaitNanos(delay);
                    } finally {
                        // 延迟等待结束之后,还原现场
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            // 无正在取元素线程,且队列不为空,唤醒await的线程
            available.signal();
        // 释放锁
        lock.unlock();
    }
}
public E poll() {
    final Object[] es;
    final E result;
    if ((result = (E) ((es = queue)[0])) != null) {
        modCount++;
        final int n;
        final E x = (E) es[(n = --size)];
        es[n] = null;
        if (n > 0) {
            final Comparator<? super E> cmp;
            if ((cmp = comparator) == null)
                siftDownComparable(0, x, es, n);
            else
                siftDownUsingComparator(0, x, es, n, cmp);
        }
    }
    return result;
}

image.gif

4.3 扩容原理

  • 默认队列大小为11,扩容最大大小为Integer.MAX_VALUE
  • 扩容时机:元素个数 >= 队列的大小
  • 扩容规则:
  • 队列大小 < 64,则新队列大小 = (旧队列大小 + 1) * 2
  • 队列大小 >= 64,则新队列大小 = 旧队列大小 + 旧队列大小 / 2
  • 当旧容量大于Integer.MAX_VALUE - 8时,新容量值=旧容量 + 1(下面看源码说明)
  • 当超过Ineter.MAX_VALUE时,报OutOfMemoryError错误
  • 创建新容量数组队列,原数组队列数据复制到新数组队列中
private void grow(int minCapacity) {
    int oldCapacity = queue.length;
    // 当旧容量大小小于64,新容量 = 旧容量 + (旧容量 + 2)
    // 当旧容量大小大于63,新容量 = 旧容量 + (旧容量 / 2)
    // 当旧容量大于Integer.MAX_VALUE - 8时,走hugeLength(int oldLength, int minGrowth)逻辑
    // minCapacity = (i = size) + 1 (即oldCapacity+1) - oldCapacity = 1
    int newCapacity = ArraysSupport.newLength(oldCapacity,
            minCapacity - oldCapacity, /* minimum growth */
            oldCapacity < 64 ? oldCapacity + 2 : oldCapacity >> 1
            /* preferred growth */);
    queue = Arrays.copyOf(queue, newCapacity);
}
public static int newLength(int oldLength, int minGrowth, int prefGrowth) {
    // preconditions not checked because of inlining
    // assert oldLength >= 0
    // assert minGrowth > 0
    int prefLength = oldLength + Math.max(minGrowth, prefGrowth); // might overflow
    if (0 < prefLength && prefLength <= SOFT_MAX_ARRAY_LENGTH) {
        return prefLength;
    } else {
        // put code cold in a separate method
        return hugeLength(oldLength, minGrowth);
    }
}
private static int hugeLength(int oldLength, int minGrowth) {
    // 当oldLength > Integer.MAX_VALUE - 8时
    // minLength = oldLength + 1
    // 当minLength < Integer.MAX_VALUE,此时minLength > SOFT_MAX_ARRAY_LENGTH(Integer.MAX_VALUE - 8)返回 minLength
    // 否则抛OutOfMemoryError错误
    int minLength = oldLength + minGrowth;
    if (minLength < 0) { // overflow
        throw new OutOfMemoryError(
                "Required array length " + oldLength + " + " + minGrowth + " is too large");
    } else if (minLength <= SOFT_MAX_ARRAY_LENGTH) {
        return SOFT_MAX_ARRAY_LENGTH;
    } else {
        return minLength;
    }
}

image.gif

五、作用及使用场景

  • 下单:如唯品会,下单30分钟之内不支付自动取消
  • 试课课程:试课结束前15分钟通知老师
  • 空闲连接延迟自动关闭
  • 缓存:超过时间自动清除
  • 超时处理、业务办理排队叫号、插队和枪购活动等



相关文章
|
1月前
|
存储 Java 关系型数据库
高效连接之道:Java连接池原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。频繁创建和关闭连接会消耗大量资源,导致性能瓶颈。为此,Java连接池技术通过复用连接,实现高效、稳定的数据库连接管理。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接池的基本操作、配置和使用方法,以及在电商应用中的具体应用示例。
74 5
|
3天前
|
监控 Java API
探索Java NIO:究竟在哪些领域能大显身手?揭秘原理、应用场景与官方示例代码
Java NIO(New IO)自Java SE 1.4引入,提供比传统IO更高效、灵活的操作,支持非阻塞IO和选择器特性,适用于高并发、高吞吐量场景。NIO的核心概念包括通道(Channel)、缓冲区(Buffer)和选择器(Selector),能实现多路复用和异步操作。其应用场景涵盖网络通信、文件操作、进程间通信及数据库操作等。NIO的优势在于提高并发性和性能,简化编程;但学习成本较高,且与传统IO存在不兼容性。尽管如此,NIO在构建高性能框架如Netty、Mina和Jetty中仍广泛应用。
14 3
|
3天前
|
安全 算法 Java
Java CAS原理和应用场景大揭秘:你掌握了吗?
CAS(Compare and Swap)是一种乐观锁机制,通过硬件指令实现原子操作,确保多线程环境下对共享变量的安全访问。它避免了传统互斥锁的性能开销和线程阻塞问题。CAS操作包含三个步骤:获取期望值、比较当前值与期望值是否相等、若相等则更新为新值。CAS广泛应用于高并发场景,如数据库事务、分布式锁、无锁数据结构等,但需注意ABA问题。Java中常用`java.util.concurrent.atomic`包下的类支持CAS操作。
19 2
|
25天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
53 12
|
1月前
|
存储 算法 Java
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
本文详解自旋锁的概念、优缺点、使用场景及Java实现。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
|
1月前
|
Java
Java之CountDownLatch原理浅析
本文介绍了Java并发工具类`CountDownLatch`的使用方法、原理及其与`Thread.join()`的区别。`CountDownLatch`通过构造函数接收一个整数参数作为计数器,调用`countDown`方法减少计数,`await`方法会阻塞当前线程,直到计数为零。文章还详细解析了其内部机制,包括初始化、`countDown`和`await`方法的工作原理,并给出了一个游戏加载场景的示例代码。
Java之CountDownLatch原理浅析
|
1月前
|
Java 索引 容器
Java ArrayList扩容的原理
Java 的 `ArrayList` 是基于数组实现的动态集合。初始时,`ArrayList` 底层创建一个空数组 `elementData`,并设置 `size` 为 0。当首次添加元素时,会调用 `grow` 方法将数组扩容至默认容量 10。之后每次添加元素时,如果当前数组已满,则会再次调用 `grow` 方法进行扩容。扩容规则为:首次扩容至 10,后续扩容至原数组长度的 1.5 倍或根据实际需求扩容。例如,当需要一次性添加 100 个元素时,会直接扩容至 110 而不是 15。
Java ArrayList扩容的原理
|
1月前
|
存储 Java 关系型数据库
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接创建、分配、复用和释放等操作,并通过电商应用实例展示了如何选择合适的连接池库(如HikariCP)和配置参数,实现高效、稳定的数据库连接管理。
66 2
|
1月前
|
Java 数据格式 索引
使用 Java 字节码工具检查类文件完整性的原理是什么
Java字节码工具通过解析和分析类文件的字节码,检查其结构和内容是否符合Java虚拟机规范,确保类文件的完整性和合法性,防止恶意代码或损坏的类文件影响程序运行。
49 5
|
1月前
|
算法 Java 数据库连接
Java连接池技术,从基础概念出发,解析了连接池的工作原理及其重要性
本文详细介绍了Java连接池技术,从基础概念出发,解析了连接池的工作原理及其重要性。连接池通过复用数据库连接,显著提升了应用的性能和稳定性。文章还展示了使用HikariCP连接池的示例代码,帮助读者更好地理解和应用这一技术。
60 1