Java线程池ThreadPoolExcutor源码解读详解05-阻塞队列之DelayQueue原理及扩容机制详解

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
函数计算FC,每月免费额度15元,12个月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: DelayQueue` 是 Java 中的一个线程安全的阻塞队列,它用于存储实现了 `Delayed` 接口的元素,这些元素都有一个延迟时间。当元素的延迟时间过去之后,它们才能被从队列中取出。以下是摘要:1. **核心特性**: - 基于 `PriorityQueue` 实现,元素按延迟时间排序,优先级高的先出队。 - 使用 `ReentrantLock` 和条件变量 `available` 控制并发。 - 只有延迟时间小于0的元素才能被取出。 - 不允许插入 `null` 元素。2. **构造器**: - 默认构造器创建无初始元素的队列。 - 可以


💪🏻 制定明确可量化的目标,坚持默默的做事。




一、继承实现关系图

image.gif 1711819050810.png

二、低层数据存储结构

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
...
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader;
private final Condition available = lock.newCondition();
...
}

image.gif

说明:

  • lock:添加和获取锁(同一把锁)
  • q:PriorityQueue存储数据(实现优化级队列)
  • leader:  标记正在延迟等待出队的线程
  • available: 出队阻塞对象

构造器:

public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

image.gif

说明:

  • 默认无参构造器
  • 第二个传入一个集合初始化队列元素构造器

三、特点及优缺点

3.1 特点

  • 支持优先级排序的无界阻塞延迟队列,优先级高的先出队,先先级低的后出队
  • 存储数据结构为数组,以二叉堆形式存储于数组中,二叉堆叶子节点均小于父节点
  • 只有一把锁(出队有阻塞)
  • 不可添加为null的元素
  • 出队取队头元素(只有队头元素的delay为0时才能取出元素)
  • 初始容量大小为11
  • 最大可扩容到Integer.MAX_VALUE

3.2 优缺点

  • 根据排序规则排序(元素必须实现 Delayed接口)
  • put不阻塞(若生产速度快于消费速度,会耗尽所有的可用堆内空间)
  • 优先级排序无界阻塞队列(无界是说Integer.MAX_VALUE非常大,一般不会达这个数字内存就已经撑不住)
  • 取数据锁为延迟阻塞出队
  • 内部数组长度不够会进行扩容
  • 新增元素进行最小二叉推排序

四、源码详解

读取重要源码:

  • 添加任务方法
  • 获取和删除任务方法
  • 扩容

阅读明白这几个接口即可,其它都很简单。

4.1 put、offer

public void put(E e) {
    offer(e);
}
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            // 刚插入就在队首,说明队列是空的,目前只有一个元素 
            //     则唤醒队列为空时的阻塞
            //     唤醒队列leader不为空时的阻塞
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}
// p.offer(e)
public boolean offer(E e) {
    // 插入元素不允许为空
    if (e == null)
        throw new NullPointerException();
    modCount++;
    int i = size;
    if (i >= queue.length)
        // 队列为空则扩容
        grow(i + 1);
    // 按优先级插入到队列中
    siftUp(i, e);
    // 队列元素个数+1
    size = i + 1;
    return true;
}
/**
 * 按优先级插入到队列中
 */
private void siftUp(int k, E x) {
    if (comparator != null)
        siftUpUsingComparator(k, x, queue, comparator);
    else
        siftUpComparable(k, x, queue);
}

image.gif

4.2 take

  • 加锁
  • 读取队首元素,若为空则阻塞,
  • 若不为空且过期时间为0,则取出队首元素),队列元素重排
  • 最后一个元素n 并且最后一个元素赋值为null
  • 队列按排序规则从子节点中取元素填充父节点
  • 子节点中取元素似为空的父节点,重复上一步骤,直到取出的子节点为叶子节点leaf
  • first = null 解决高并发情况下某一线程a延迟等待出队,而其它线程因leader!=null而available.await()等待,因线程a出取first而其它线程first一直不为空,导致JVM无法回收导致内存泄漏问题
  • 判断leader元素是否为空,不为空的话阻塞当前线程
  • leader元素为空,则设置leader为当前线程,阻塞delay时间,delay结束重复以上步骤
  • 若无取出线程,且队列不为空,则唤配所有其它awai()线程
  • 解锁
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)
                    return q.poll();
                // first = null 解决高并发情况下某一线程a延迟等待出队,而其它线程因leader!=null而available.await()等待,因线程a出取first而其它线程first一直不为空,导致JVM无法回收导致内存泄漏问题
                first = null;
                if (leader != null)
                    available.await();
                else {
                    // leader为空,则设置当前线程为取元素线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 延迟等待取出元素
                        available.awaitNanos(delay);
                    } finally {
                        // 延迟等待结束之后,还原现场
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            // 无正在取元素线程,且队列不为空,唤醒await的线程
            available.signal();
        // 释放锁
        lock.unlock();
    }
}
public E poll() {
    final Object[] es;
    final E result;
    if ((result = (E) ((es = queue)[0])) != null) {
        modCount++;
        final int n;
        final E x = (E) es[(n = --size)];
        es[n] = null;
        if (n > 0) {
            final Comparator<? super E> cmp;
            if ((cmp = comparator) == null)
                siftDownComparable(0, x, es, n);
            else
                siftDownUsingComparator(0, x, es, n, cmp);
        }
    }
    return result;
}

image.gif

4.3 扩容原理

  • 默认队列大小为11,扩容最大大小为Integer.MAX_VALUE
  • 扩容时机:元素个数 >= 队列的大小
  • 扩容规则:
  • 队列大小 < 64,则新队列大小 = (旧队列大小 + 1) * 2
  • 队列大小 >= 64,则新队列大小 = 旧队列大小 + 旧队列大小 / 2
  • 当旧容量大于Integer.MAX_VALUE - 8时,新容量值=旧容量 + 1(下面看源码说明)
  • 当超过Ineter.MAX_VALUE时,报OutOfMemoryError错误
  • 创建新容量数组队列,原数组队列数据复制到新数组队列中
private void grow(int minCapacity) {
    int oldCapacity = queue.length;
    // 当旧容量大小小于64,新容量 = 旧容量 + (旧容量 + 2)
    // 当旧容量大小大于63,新容量 = 旧容量 + (旧容量 / 2)
    // 当旧容量大于Integer.MAX_VALUE - 8时,走hugeLength(int oldLength, int minGrowth)逻辑
    // minCapacity = (i = size) + 1 (即oldCapacity+1) - oldCapacity = 1
    int newCapacity = ArraysSupport.newLength(oldCapacity,
            minCapacity - oldCapacity, /* minimum growth */
            oldCapacity < 64 ? oldCapacity + 2 : oldCapacity >> 1
            /* preferred growth */);
    queue = Arrays.copyOf(queue, newCapacity);
}
public static int newLength(int oldLength, int minGrowth, int prefGrowth) {
    // preconditions not checked because of inlining
    // assert oldLength >= 0
    // assert minGrowth > 0
    int prefLength = oldLength + Math.max(minGrowth, prefGrowth); // might overflow
    if (0 < prefLength && prefLength <= SOFT_MAX_ARRAY_LENGTH) {
        return prefLength;
    } else {
        // put code cold in a separate method
        return hugeLength(oldLength, minGrowth);
    }
}
private static int hugeLength(int oldLength, int minGrowth) {
    // 当oldLength > Integer.MAX_VALUE - 8时
    // minLength = oldLength + 1
    // 当minLength < Integer.MAX_VALUE,此时minLength > SOFT_MAX_ARRAY_LENGTH(Integer.MAX_VALUE - 8)返回 minLength
    // 否则抛OutOfMemoryError错误
    int minLength = oldLength + minGrowth;
    if (minLength < 0) { // overflow
        throw new OutOfMemoryError(
                "Required array length " + oldLength + " + " + minGrowth + " is too large");
    } else if (minLength <= SOFT_MAX_ARRAY_LENGTH) {
        return SOFT_MAX_ARRAY_LENGTH;
    } else {
        return minLength;
    }
}

image.gif

五、作用及使用场景

  • 下单:如唯品会,下单30分钟之内不支付自动取消
  • 试课课程:试课结束前15分钟通知老师
  • 空闲连接延迟自动关闭
  • 缓存:超过时间自动清除
  • 超时处理、业务办理排队叫号、插队和枪购活动等



相关文章
|
7天前
|
存储 Java 调度
深入浅出Java线程池原理
本文深入分析了Java线程池的原理和实现,帮助读者更好地理解Java并发编程中线程池的创建、工作流程和性能优化。
|
8天前
|
存储 数据可视化 Java
【Java】Java swing 民宿管理系统 GUI(源码+可视化界面)【独一无二】
【Java】Java swing 民宿管理系统 GUI(源码+可视化界面)【独一无二】
|
6天前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
5天前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
6天前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
3天前
|
存储 SQL 关系型数据库
深入MySQL锁机制:原理、死锁解决及Java防范技巧
深入MySQL锁机制:原理、死锁解决及Java防范技巧
|
3天前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践
|
8天前
|
存储 Java
【Java】Java学生成绩管理系统(源码+论文)【独一无二】
【Java】Java学生成绩管理系统(源码+论文)【独一无二】
|
8天前
|
SQL Java 数据库连接
【Java】Java Swing 图书管借阅管理系统(源码+论文)【独一无二】
【Java】Java Swing 图书管借阅管理系统(源码+论文)【独一无二】
|
3月前
|
Java 调度
Java并发编程:深入理解线程池的原理与实践
【4月更文挑战第6天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将从线程池的基本原理入手,逐步解析其工作过程,以及如何在实际开发中合理使用线程池以提高程序性能。同时,我们还将关注线程池的一些高级特性,如自定义线程工厂、拒绝策略等,以帮助读者更好地掌握线程池的使用技巧。