Java线程池ThreadPoolExcutor源码解读详解05-阻塞队列之DelayQueue原理及扩容机制详解

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
可观测可视化 Grafana 版,10个用户账号 1个月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: DelayQueue` 是 Java 中的一个线程安全的阻塞队列,它用于存储实现了 `Delayed` 接口的元素,这些元素都有一个延迟时间。当元素的延迟时间过去之后,它们才能被从队列中取出。以下是摘要:1. **核心特性**: - 基于 `PriorityQueue` 实现,元素按延迟时间排序,优先级高的先出队。 - 使用 `ReentrantLock` 和条件变量 `available` 控制并发。 - 只有延迟时间小于0的元素才能被取出。 - 不允许插入 `null` 元素。2. **构造器**: - 默认构造器创建无初始元素的队列。 - 可以


💪🏻 制定明确可量化的目标,坚持默默的做事。




一、继承实现关系图

image.gif 1711819050810.png

二、低层数据存储结构

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
...
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader;
private final Condition available = lock.newCondition();
...
}

image.gif

说明:

  • lock:添加和获取锁(同一把锁)
  • q:PriorityQueue存储数据(实现优化级队列)
  • leader:  标记正在延迟等待出队的线程
  • available: 出队阻塞对象

构造器:

public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

image.gif

说明:

  • 默认无参构造器
  • 第二个传入一个集合初始化队列元素构造器

三、特点及优缺点

3.1 特点

  • 支持优先级排序的无界阻塞延迟队列,优先级高的先出队,先先级低的后出队
  • 存储数据结构为数组,以二叉堆形式存储于数组中,二叉堆叶子节点均小于父节点
  • 只有一把锁(出队有阻塞)
  • 不可添加为null的元素
  • 出队取队头元素(只有队头元素的delay为0时才能取出元素)
  • 初始容量大小为11
  • 最大可扩容到Integer.MAX_VALUE

3.2 优缺点

  • 根据排序规则排序(元素必须实现 Delayed接口)
  • put不阻塞(若生产速度快于消费速度,会耗尽所有的可用堆内空间)
  • 优先级排序无界阻塞队列(无界是说Integer.MAX_VALUE非常大,一般不会达这个数字内存就已经撑不住)
  • 取数据锁为延迟阻塞出队
  • 内部数组长度不够会进行扩容
  • 新增元素进行最小二叉推排序

四、源码详解

读取重要源码:

  • 添加任务方法
  • 获取和删除任务方法
  • 扩容

阅读明白这几个接口即可,其它都很简单。

4.1 put、offer

public void put(E e) {
    offer(e);
}
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            // 刚插入就在队首,说明队列是空的,目前只有一个元素 
            //     则唤醒队列为空时的阻塞
            //     唤醒队列leader不为空时的阻塞
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}
// p.offer(e)
public boolean offer(E e) {
    // 插入元素不允许为空
    if (e == null)
        throw new NullPointerException();
    modCount++;
    int i = size;
    if (i >= queue.length)
        // 队列为空则扩容
        grow(i + 1);
    // 按优先级插入到队列中
    siftUp(i, e);
    // 队列元素个数+1
    size = i + 1;
    return true;
}
/**
 * 按优先级插入到队列中
 */
private void siftUp(int k, E x) {
    if (comparator != null)
        siftUpUsingComparator(k, x, queue, comparator);
    else
        siftUpComparable(k, x, queue);
}

image.gif

4.2 take

  • 加锁
  • 读取队首元素,若为空则阻塞,
  • 若不为空且过期时间为0,则取出队首元素),队列元素重排
  • 最后一个元素n 并且最后一个元素赋值为null
  • 队列按排序规则从子节点中取元素填充父节点
  • 子节点中取元素似为空的父节点,重复上一步骤,直到取出的子节点为叶子节点leaf
  • first = null 解决高并发情况下某一线程a延迟等待出队,而其它线程因leader!=null而available.await()等待,因线程a出取first而其它线程first一直不为空,导致JVM无法回收导致内存泄漏问题
  • 判断leader元素是否为空,不为空的话阻塞当前线程
  • leader元素为空,则设置leader为当前线程,阻塞delay时间,delay结束重复以上步骤
  • 若无取出线程,且队列不为空,则唤配所有其它awai()线程
  • 解锁
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)
                    return q.poll();
                // first = null 解决高并发情况下某一线程a延迟等待出队,而其它线程因leader!=null而available.await()等待,因线程a出取first而其它线程first一直不为空,导致JVM无法回收导致内存泄漏问题
                first = null;
                if (leader != null)
                    available.await();
                else {
                    // leader为空,则设置当前线程为取元素线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 延迟等待取出元素
                        available.awaitNanos(delay);
                    } finally {
                        // 延迟等待结束之后,还原现场
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            // 无正在取元素线程,且队列不为空,唤醒await的线程
            available.signal();
        // 释放锁
        lock.unlock();
    }
}
public E poll() {
    final Object[] es;
    final E result;
    if ((result = (E) ((es = queue)[0])) != null) {
        modCount++;
        final int n;
        final E x = (E) es[(n = --size)];
        es[n] = null;
        if (n > 0) {
            final Comparator<? super E> cmp;
            if ((cmp = comparator) == null)
                siftDownComparable(0, x, es, n);
            else
                siftDownUsingComparator(0, x, es, n, cmp);
        }
    }
    return result;
}

image.gif

4.3 扩容原理

  • 默认队列大小为11,扩容最大大小为Integer.MAX_VALUE
  • 扩容时机:元素个数 >= 队列的大小
  • 扩容规则:
  • 队列大小 < 64,则新队列大小 = (旧队列大小 + 1) * 2
  • 队列大小 >= 64,则新队列大小 = 旧队列大小 + 旧队列大小 / 2
  • 当旧容量大于Integer.MAX_VALUE - 8时,新容量值=旧容量 + 1(下面看源码说明)
  • 当超过Ineter.MAX_VALUE时,报OutOfMemoryError错误
  • 创建新容量数组队列,原数组队列数据复制到新数组队列中
private void grow(int minCapacity) {
    int oldCapacity = queue.length;
    // 当旧容量大小小于64,新容量 = 旧容量 + (旧容量 + 2)
    // 当旧容量大小大于63,新容量 = 旧容量 + (旧容量 / 2)
    // 当旧容量大于Integer.MAX_VALUE - 8时,走hugeLength(int oldLength, int minGrowth)逻辑
    // minCapacity = (i = size) + 1 (即oldCapacity+1) - oldCapacity = 1
    int newCapacity = ArraysSupport.newLength(oldCapacity,
            minCapacity - oldCapacity, /* minimum growth */
            oldCapacity < 64 ? oldCapacity + 2 : oldCapacity >> 1
            /* preferred growth */);
    queue = Arrays.copyOf(queue, newCapacity);
}
public static int newLength(int oldLength, int minGrowth, int prefGrowth) {
    // preconditions not checked because of inlining
    // assert oldLength >= 0
    // assert minGrowth > 0
    int prefLength = oldLength + Math.max(minGrowth, prefGrowth); // might overflow
    if (0 < prefLength && prefLength <= SOFT_MAX_ARRAY_LENGTH) {
        return prefLength;
    } else {
        // put code cold in a separate method
        return hugeLength(oldLength, minGrowth);
    }
}
private static int hugeLength(int oldLength, int minGrowth) {
    // 当oldLength > Integer.MAX_VALUE - 8时
    // minLength = oldLength + 1
    // 当minLength < Integer.MAX_VALUE,此时minLength > SOFT_MAX_ARRAY_LENGTH(Integer.MAX_VALUE - 8)返回 minLength
    // 否则抛OutOfMemoryError错误
    int minLength = oldLength + minGrowth;
    if (minLength < 0) { // overflow
        throw new OutOfMemoryError(
                "Required array length " + oldLength + " + " + minGrowth + " is too large");
    } else if (minLength <= SOFT_MAX_ARRAY_LENGTH) {
        return SOFT_MAX_ARRAY_LENGTH;
    } else {
        return minLength;
    }
}

image.gif

五、作用及使用场景

  • 下单:如唯品会,下单30分钟之内不支付自动取消
  • 试课课程:试课结束前15分钟通知老师
  • 空闲连接延迟自动关闭
  • 缓存:超过时间自动清除
  • 超时处理、业务办理排队叫号、插队和枪购活动等



相关文章
|
5天前
|
Java 应用服务中间件 开发者
Java面试题:解释Spring Boot的优势及其自动配置原理
Java面试题:解释Spring Boot的优势及其自动配置原理
28 0
|
5天前
|
缓存 监控 算法
Java面试题:描述Java垃圾回收的基本原理,以及如何通过代码优化来协助垃圾回收器的工作
Java面试题:描述Java垃圾回收的基本原理,以及如何通过代码优化来协助垃圾回收器的工作
28 8
|
1天前
|
监控 Java API
Java并发编程之线程池深度解析
【7月更文挑战第14天】在Java并发编程领域,线程池是提升性能、管理资源的关键工具。本文将深入探讨线程池的核心概念、内部工作原理以及如何有效使用线程池来处理并发任务,旨在为读者提供一套完整的线程池使用和优化策略。
|
3天前
|
存储 监控 Java
揭秘Java虚拟机:探索JVM的工作原理与性能优化
本文深入探讨了Java虚拟机(JVM)的核心机制,从类加载到垃圾回收,再到即时编译技术,揭示了这些复杂过程如何共同作用于Java程序的性能表现。通过分析现代JVM的内存管理策略和性能监控工具,文章提供了实用的调优建议,帮助开发者有效提升Java应用的性能。
18 3
|
4天前
|
缓存 安全 Java
Java中线程池如何管理?
【7月更文挑战第11天】Java中线程池如何管理?
9 2
|
5天前
|
算法 Java
Java面试题:解释垃圾回收中的标记-清除、复制、标记-压缩算法的工作原理
Java面试题:解释垃圾回收中的标记-清除、复制、标记-压缩算法的工作原理
13 1
|
5天前
|
监控 Java 调度
Java面试题:描述Java线程池的概念、用途及常见的线程池类型。介绍一下Java中的线程池有哪些优缺点
Java面试题:描述Java线程池的概念、用途及常见的线程池类型。介绍一下Java中的线程池有哪些优缺点
18 1
|
5天前
|
缓存 Java
Java面试题:描述Java中的线程池及其实现方式,详细说明其原理
Java面试题:描述Java中的线程池及其实现方式,详细说明其原理
8 0
|
5天前
|
设计模式 Java
Java面试题:描述观察者模式的工作原理及其在Java中的应用。
Java面试题:描述观察者模式的工作原理及其在Java中的应用。
10 0
|
5天前
|
SQL Java 关系型数据库
Java面试题:描述JDBC的工作原理,包括连接数据库、执行SQL语句等步骤。
Java面试题:描述JDBC的工作原理,包括连接数据库、执行SQL语句等步骤。
14 0

热门文章

最新文章