Java线程池ThreadPoolExcutor源码解读详解04-阻塞队列之PriorityBlockingQueue原理及扩容机制详解

本文涉及的产品
云原生网关 MSE Higress,422元/月
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
应用实时监控服务-用户体验监控,每月100OCU免费额度
简介: 1. **继承实现图关系**: - `PriorityBlockingQueue`实现了`BlockingQueue`接口,提供了线程安全的队列操作。 - 内部基于优先级堆(小顶堆或大顶堆)的数据结构实现,可以保证元素按照优先级顺序出队。2. **底层数据存储结构**: - 默认容量是11,存储数据的数组会在需要时动态扩容。 - 数组长度总是2的幂,以满足堆的性质。3. **构造器**: - 无参构造器创建一个默认容量的队列,元素需要实现`Comparable`接口。 - 指定容量构造器允许设置初始容量,但不指定排序规则。 - 可指定容量和比较


一、继承实现图关系

image.gif 1711818828442.png

二、底层数据存储结构

属性说明:

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
...
// 数组默认容量11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 存储数据的数组队列
private transient Object[] queue;
// 当前数组队列数据大小
private transient int size;
// 比较规则(排序规则)
private transient Comparator<? super E> comparator;
// 独占锁
private final ReentrantLock lock = new ReentrantLock();
// 队列为空是阻塞
private final Condition notEmpty = lock.newCondition();
// CAS自旋锁
private transient volatile int allocationSpinLock;
// 序列化优先队列
private PriorityQueue<E> q;
...
}

image.gif

构造器:

/**
 * 默认构造器,默认容量大小为11,无排序规则
 */
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
/**
 * 指定容器大小,无排序规则
 */
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
/**
 * 指定容器大小 和 指定排序规则
 */
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.comparator = comparator;
    this.queue = new Object[Math.max(1, initialCapacity)];
}
...

image.gif

注:默认构造器容量大小为11,没有比较器,如果未指定比较器,插入队列的元素需要实现Comparable接口

三、特点及优缺点

3.1 特点

  • 支持优先级排序的无界阻塞队列,优先级高的先出队,先先级低的后出队
  • 存储数据结构为数组,以二叉堆形式存储于数组中,二叉堆叶子节点均小于父节点
  • 存取都为一把锁
  • 入队不阻塞,出队阻塞
  • 出队取队头元素
  • 默认容量是11,数组最大容量为Integer.MAX_VALUE - 8

3.2 优缺点

  • 根据排序规则排序(可提供排序规则,若不提供则队列元素必须实现 Comparable接口)
  • put不阻塞(若生产速度快于消费速度,会耗尽所有的可用堆内空间)
  • 优先级排序无界阻塞队列
  • 取数据锁为独占锁
  • 内部数组长度不够会进行扩容
  • 新增元素进行最小二叉推排序

四、源码详解

读取部分源码:

  • 添加任务方法
  • 获取和删除任务方法
  • 扩容

阅读明白这几个接口即可,其它都很简单。

4.1 put、offer

  • 数组无界,不阻塞添加
  • 不允许添加空对象
  • 父节点大于所有子节点
public void put(E e) {
    offer(e); // never need to block
}
/**
 * 向队列中添加元素 <br/>
 * 添加元素若为空,直接抛出异常 <br/>
 */
public boolean offer(E e) {
    // 添加的对象不能为空
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] es;
    while ((n = size) >= (cap = (es = queue).length)) {
        // 若数组已满,则进行扩容
        tryGrow(es, cap);
    } 
    try {
        final Comparator<? super E> cmp;
        if ((cmp = comparator) == null) {
            // 无配置排序规则时,走对象自带的排序逻辑
            siftUpComparable(n, e, es);
        }   
        else {
            // 走自定义排序规则
            siftUpUsingComparator(n, e, es, cmp);
        } 
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}
/**
 * 创建队列时无指定排序规则,调添加对象的compareTo进行排序 <br/>
 * 所以,添加的元素必须实现Comparable接口 <br/>
 */
private static <T> void siftUpComparable(int k, T x, Object[] es) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        // 由新元素下标(移动下标),获取父节点的数组下标
        int parent = (k - 1) >>> 1;
        Object e = es[parent];
        // 若排序规则成立,则直接退出循环,key放到k的位置
        if (key.compareTo((T) e) >= 0)
            break;
        // 若不成立,父节点元素放到位置k
        es[k] = e;
        // 移动下标指向父节点下标,然后继续向上(父节点的父节点)比较,直到if成立,或到根节点
        k = parent;
    }
    es[k] = key;
}
/**
 * 创建队列时指定排序规则,则排序规则按自定义排序处理数组二叉堆数据 <br/>
 * 逻辑同siftUpComparable
 */
private static <T> void siftUpUsingComparator(
        int k, T x, Object[] es, Comparator<? super T> cmp) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = es[parent];
        if (cmp.compare(x, (T) e) >= 0)
            break;
        es[k] = e;
        k = parent;
    }
    es[k] = x;
}

image.gif

4.2 take

  • 若数组队列为空,则阻塞。
  • 取出最后一个元素L,并最后一个下标的值赋值null
  • 1 二叉堆下标为0出队(树的根节点)
  • 2 大的子节点元素a放到树的根节点(假设排序规则为降序)
  • 3. 以元素a的原位置似为树,重复1,2步骤,直到找到最小树 s(有叶子节点)
  • 4. s的叶子节点大子元素b放到根节点,原树最后一个元素L放到叶子节点元素b的原位置

       元素的移动次数最多为树的高度

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lockInterruptibly();
    E result;
    try {
        // 若队列为空,则阻塞
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}
/**
 * 出队(假调排序规则是降序) <br/>
 * 取出最后一个元素a,并且最后一个标的值赋值为空 <br/>
 * 1. 取出第一个元素,则该位置空缺 <br/>
 * 2. 大的子元素放到第一个位置, 则这个位置空缺 <br/>
 * 3. 以空缺为父节点,重复1和2步骤,找到最小子树
 * 4. 大的叶子节点放到父节点,元素a放到大的叶子节点原来的位置
 */
private E dequeue() {
    // assert lock.isHeldByCurrentThread();
    final Object[] es;
    final E result;
    // 取出第一个元素
    if ((result = (E) ((es = queue)[0])) != null) {
        final int n;
        // 取出最后一个元素
        final E x = (E) es[(n = --size)];
        // 删除最后一个下标元素
        es[n] = null;
        if (n > 0) {
            final Comparator<? super E> cmp;
            // 大的子节点中
            if ((cmp = comparator) == null)
                siftDownComparable(0, x, es, n);
            else
                siftDownUsingComparator(0, x, es, n, cmp);
        }
    }
    return result;
}
private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
    // assert n > 0;
    Comparable<? super T> key = (Comparable<? super T>)x;
    int half = n >>> 1;           // loop while a non-leaf
    while (k < half) {
        // 计算出左子节点下标
        int child = (k << 1) + 1; // assume left child is least
        Object c = es[child];
        int right = child + 1;
        // 取出大的子节点
        if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
            c = es[child = right];
        if (key.compareTo((T) c) <= 0)
            break;
        // 大的子节点元素放到父节点的位置
        es[k] = c;
        // 以大的子节点元素为根,重复以上逻辑,继续找大的子节点放到其位置
        k = child;
    }
    // 原树的最后一个位置的元素放到最后一个移动的叶子节点的原来位置上
    es[k] = key;
}
/**
 * 逻辑同上 <br/>
 */
private static <T> void siftDownUsingComparator(
        int k, T x, Object[] es, int n, Comparator<? super T> cmp) {
    // assert n > 0;
    int half = n >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;
        Object c = es[child];
        int right = child + 1;
        if (right < n && cmp.compare((T) c, (T) es[right]) > 0)
            c = es[child = right];
        if (cmp.compare(x, (T) c) <= 0)
            break;
        es[k] = c;
        k = child;
    }
    es[k] = x;
}

image.gif

4.3 扩容原理

  • CAS自旋锁,避免并发扩容
  • 扩容时机:元素数量等于数组大小时进行扩容
  • 扩容大小:
  • 容量(len)小于64时 len = len + (len + 2),即新容量 = 原容量*2 + 2
  • 容量(len) >= 64 时 len = len + len / 2,即新容量 = 原容量 + 原容量/2
  • 创建新数组,原数组数据复制过来
public boolean offer(E e) {
    ...
    // 元素数量等于数组大小时进行扩容
    // 有可能别的线程扩容了,又添加元素满了,所以while判断就轮到本线程再扩容
    while ((n = size) >= (cap = (es = queue).length))
        tryGrow(es, cap);
    ...
}
/**
 * 扩容
 */
private void tryGrow(Object[] array, int oldCap) {
    // 必须释放然后重新获得主锁,因为只有一把锁,释放锁后可以读
    lock.unlock();
    Object[] newArray = null;
    // CAS自旋锁,避免并发扩容
    if (allocationSpinLock == 0 &&
            ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
        try {
            // 计算新容量大小,容量<64时 len = len + (len + 2)
            // 计算新容量大小,容量>=64时 len = len + (len / 2)
            int growth = (oldCap < 64)
                    ? (oldCap + 2) // grow faster if small
                    : (oldCap >> 1);
            int newCap = ArraysSupport.newLength(oldCap, 1, growth);
            if (queue == array)
                // 创建新的空数组
                newArray = new Object[newCap];
        } finally {
            // 还原自旋锁为0
            allocationSpinLock = 0;
        }
    }
    // 如果另一个线程已分配,则退出
    if (newArray == null)
        Thread.yield();
    // 数组队列指向新数组,旧数据复制到新数组前加锁
    lock.lock();
    if (newArray != null && queue == array) {
        // 队列数组指向新数组
        queue = newArray;
        // 复制旧数据到新数组中
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

image.gif

五、作用和使用场景

  • 业务办理排队叫号,VIP客户插队
  • 枪购
相关文章
|
9天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
21天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
100 38
|
21天前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
58 4
|
21天前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
89 2
|
23天前
|
缓存 监控 Java
java中线程池的使用
java中线程池的使用
|
2月前
|
数据采集 负载均衡 安全
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
本文提供了多个多线程编程问题的解决方案,包括设计有限阻塞队列、多线程网页爬虫、红绿灯路口等,每个问题都给出了至少一种实现方法,涵盖了互斥锁、条件变量、信号量等线程同步机制的使用。
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
|
2月前
|
Java 调度 开发者
Java并发编程:深入理解线程池
在Java的世界中,线程池是提升应用性能、实现高效并发处理的关键工具。本文将深入浅出地介绍线程池的核心概念、工作原理以及如何在实际应用中有效利用线程池来优化资源管理和任务调度。通过本文的学习,读者能够掌握线程池的基本使用技巧,并理解其背后的设计哲学。
|
1月前
|
消息中间件 NoSQL 关系型数据库
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
23 0
|
2月前
|
Java 数据中心 微服务
Java高级知识:线程池隔离与信号量隔离的实战应用
在Java并发编程中,线程池隔离与信号量隔离是两种常用的资源隔离技术,它们在提高系统稳定性、防止系统过载方面发挥着重要作用。
44 0
|
2月前
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。