Java线程池ThreadPoolExcutor源码解读详解04-阻塞队列之PriorityBlockingQueue原理及扩容机制详解

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
可观测可视化 Grafana 版,10个用户账号 1个月
简介: 1. **继承实现图关系**: - `PriorityBlockingQueue`实现了`BlockingQueue`接口,提供了线程安全的队列操作。 - 内部基于优先级堆(小顶堆或大顶堆)的数据结构实现,可以保证元素按照优先级顺序出队。2. **底层数据存储结构**: - 默认容量是11,存储数据的数组会在需要时动态扩容。 - 数组长度总是2的幂,以满足堆的性质。3. **构造器**: - 无参构造器创建一个默认容量的队列,元素需要实现`Comparable`接口。 - 指定容量构造器允许设置初始容量,但不指定排序规则。 - 可指定容量和比较


一、继承实现图关系

image.gif 1711818828442.png

二、底层数据存储结构

属性说明:

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
...
// 数组默认容量11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 存储数据的数组队列
private transient Object[] queue;
// 当前数组队列数据大小
private transient int size;
// 比较规则(排序规则)
private transient Comparator<? super E> comparator;
// 独占锁
private final ReentrantLock lock = new ReentrantLock();
// 队列为空是阻塞
private final Condition notEmpty = lock.newCondition();
// CAS自旋锁
private transient volatile int allocationSpinLock;
// 序列化优先队列
private PriorityQueue<E> q;
...
}

image.gif

构造器:

/**
 * 默认构造器,默认容量大小为11,无排序规则
 */
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
/**
 * 指定容器大小,无排序规则
 */
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
/**
 * 指定容器大小 和 指定排序规则
 */
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.comparator = comparator;
    this.queue = new Object[Math.max(1, initialCapacity)];
}
...

image.gif

注:默认构造器容量大小为11,没有比较器,如果未指定比较器,插入队列的元素需要实现Comparable接口

三、特点及优缺点

3.1 特点

  • 支持优先级排序的无界阻塞队列,优先级高的先出队,先先级低的后出队
  • 存储数据结构为数组,以二叉堆形式存储于数组中,二叉堆叶子节点均小于父节点
  • 存取都为一把锁
  • 入队不阻塞,出队阻塞
  • 出队取队头元素
  • 默认容量是11,数组最大容量为Integer.MAX_VALUE - 8

3.2 优缺点

  • 根据排序规则排序(可提供排序规则,若不提供则队列元素必须实现 Comparable接口)
  • put不阻塞(若生产速度快于消费速度,会耗尽所有的可用堆内空间)
  • 优先级排序无界阻塞队列
  • 取数据锁为独占锁
  • 内部数组长度不够会进行扩容
  • 新增元素进行最小二叉推排序

四、源码详解

读取部分源码:

  • 添加任务方法
  • 获取和删除任务方法
  • 扩容

阅读明白这几个接口即可,其它都很简单。

4.1 put、offer

  • 数组无界,不阻塞添加
  • 不允许添加空对象
  • 父节点大于所有子节点
public void put(E e) {
    offer(e); // never need to block
}
/**
 * 向队列中添加元素 <br/>
 * 添加元素若为空,直接抛出异常 <br/>
 */
public boolean offer(E e) {
    // 添加的对象不能为空
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] es;
    while ((n = size) >= (cap = (es = queue).length)) {
        // 若数组已满,则进行扩容
        tryGrow(es, cap);
    } 
    try {
        final Comparator<? super E> cmp;
        if ((cmp = comparator) == null) {
            // 无配置排序规则时,走对象自带的排序逻辑
            siftUpComparable(n, e, es);
        }   
        else {
            // 走自定义排序规则
            siftUpUsingComparator(n, e, es, cmp);
        } 
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}
/**
 * 创建队列时无指定排序规则,调添加对象的compareTo进行排序 <br/>
 * 所以,添加的元素必须实现Comparable接口 <br/>
 */
private static <T> void siftUpComparable(int k, T x, Object[] es) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        // 由新元素下标(移动下标),获取父节点的数组下标
        int parent = (k - 1) >>> 1;
        Object e = es[parent];
        // 若排序规则成立,则直接退出循环,key放到k的位置
        if (key.compareTo((T) e) >= 0)
            break;
        // 若不成立,父节点元素放到位置k
        es[k] = e;
        // 移动下标指向父节点下标,然后继续向上(父节点的父节点)比较,直到if成立,或到根节点
        k = parent;
    }
    es[k] = key;
}
/**
 * 创建队列时指定排序规则,则排序规则按自定义排序处理数组二叉堆数据 <br/>
 * 逻辑同siftUpComparable
 */
private static <T> void siftUpUsingComparator(
        int k, T x, Object[] es, Comparator<? super T> cmp) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = es[parent];
        if (cmp.compare(x, (T) e) >= 0)
            break;
        es[k] = e;
        k = parent;
    }
    es[k] = x;
}

image.gif

4.2 take

  • 若数组队列为空,则阻塞。
  • 取出最后一个元素L,并最后一个下标的值赋值null
  • 1 二叉堆下标为0出队(树的根节点)
  • 2 大的子节点元素a放到树的根节点(假设排序规则为降序)
  • 3. 以元素a的原位置似为树,重复1,2步骤,直到找到最小树 s(有叶子节点)
  • 4. s的叶子节点大子元素b放到根节点,原树最后一个元素L放到叶子节点元素b的原位置

       元素的移动次数最多为树的高度

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lockInterruptibly();
    E result;
    try {
        // 若队列为空,则阻塞
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}
/**
 * 出队(假调排序规则是降序) <br/>
 * 取出最后一个元素a,并且最后一个标的值赋值为空 <br/>
 * 1. 取出第一个元素,则该位置空缺 <br/>
 * 2. 大的子元素放到第一个位置, 则这个位置空缺 <br/>
 * 3. 以空缺为父节点,重复1和2步骤,找到最小子树
 * 4. 大的叶子节点放到父节点,元素a放到大的叶子节点原来的位置
 */
private E dequeue() {
    // assert lock.isHeldByCurrentThread();
    final Object[] es;
    final E result;
    // 取出第一个元素
    if ((result = (E) ((es = queue)[0])) != null) {
        final int n;
        // 取出最后一个元素
        final E x = (E) es[(n = --size)];
        // 删除最后一个下标元素
        es[n] = null;
        if (n > 0) {
            final Comparator<? super E> cmp;
            // 大的子节点中
            if ((cmp = comparator) == null)
                siftDownComparable(0, x, es, n);
            else
                siftDownUsingComparator(0, x, es, n, cmp);
        }
    }
    return result;
}
private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
    // assert n > 0;
    Comparable<? super T> key = (Comparable<? super T>)x;
    int half = n >>> 1;           // loop while a non-leaf
    while (k < half) {
        // 计算出左子节点下标
        int child = (k << 1) + 1; // assume left child is least
        Object c = es[child];
        int right = child + 1;
        // 取出大的子节点
        if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
            c = es[child = right];
        if (key.compareTo((T) c) <= 0)
            break;
        // 大的子节点元素放到父节点的位置
        es[k] = c;
        // 以大的子节点元素为根,重复以上逻辑,继续找大的子节点放到其位置
        k = child;
    }
    // 原树的最后一个位置的元素放到最后一个移动的叶子节点的原来位置上
    es[k] = key;
}
/**
 * 逻辑同上 <br/>
 */
private static <T> void siftDownUsingComparator(
        int k, T x, Object[] es, int n, Comparator<? super T> cmp) {
    // assert n > 0;
    int half = n >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;
        Object c = es[child];
        int right = child + 1;
        if (right < n && cmp.compare((T) c, (T) es[right]) > 0)
            c = es[child = right];
        if (cmp.compare(x, (T) c) <= 0)
            break;
        es[k] = c;
        k = child;
    }
    es[k] = x;
}

image.gif

4.3 扩容原理

  • CAS自旋锁,避免并发扩容
  • 扩容时机:元素数量等于数组大小时进行扩容
  • 扩容大小:
  • 容量(len)小于64时 len = len + (len + 2),即新容量 = 原容量*2 + 2
  • 容量(len) >= 64 时 len = len + len / 2,即新容量 = 原容量 + 原容量/2
  • 创建新数组,原数组数据复制过来
public boolean offer(E e) {
    ...
    // 元素数量等于数组大小时进行扩容
    // 有可能别的线程扩容了,又添加元素满了,所以while判断就轮到本线程再扩容
    while ((n = size) >= (cap = (es = queue).length))
        tryGrow(es, cap);
    ...
}
/**
 * 扩容
 */
private void tryGrow(Object[] array, int oldCap) {
    // 必须释放然后重新获得主锁,因为只有一把锁,释放锁后可以读
    lock.unlock();
    Object[] newArray = null;
    // CAS自旋锁,避免并发扩容
    if (allocationSpinLock == 0 &&
            ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
        try {
            // 计算新容量大小,容量<64时 len = len + (len + 2)
            // 计算新容量大小,容量>=64时 len = len + (len / 2)
            int growth = (oldCap < 64)
                    ? (oldCap + 2) // grow faster if small
                    : (oldCap >> 1);
            int newCap = ArraysSupport.newLength(oldCap, 1, growth);
            if (queue == array)
                // 创建新的空数组
                newArray = new Object[newCap];
        } finally {
            // 还原自旋锁为0
            allocationSpinLock = 0;
        }
    }
    // 如果另一个线程已分配,则退出
    if (newArray == null)
        Thread.yield();
    // 数组队列指向新数组,旧数据复制到新数组前加锁
    lock.lock();
    if (newArray != null && queue == array) {
        // 队列数组指向新数组
        queue = newArray;
        // 复制旧数据到新数组中
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

image.gif

五、作用和使用场景

  • 业务办理排队叫号,VIP客户插队
  • 枪购
相关文章
|
21小时前
|
NoSQL 算法 Java
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
|
1天前
|
存储 运维 Java
java云his系统源码一站式诊所SaaS系统Java版云HIS系统 八大特点
HIS系统采用面向技术架构的分析与设计方法,应用多层次应用体系架构设计,运用基于构件技术的系统搭建模式与基于组件模式的系统内核结构。通过建立统一接口标准,实现数据交换和集成共享,通过统一身份认证和授权控制,实现业务集成、界面集成。
27 1
|
2天前
|
Java 关系型数据库 MySQL
java+B/S架构医院绩效考核管理系统源码 医院绩效管理系统4大特点
医院绩效考核管理系统,采用多维度综合绩效考核的形式,针对院内实际情况分别对工作量、KPI指标、科研、教学、管理等进行全面考核。医院可结合实际需求,对考核方案中各维度进行灵活配置,对各维度的权重、衡量标准、数据统计方式进行自定义维护。
10 0
|
2天前
|
消息中间件 安全 Ubuntu
【操作系统原理】—— 线程同步
【操作系统原理】—— 线程同步
10 1
|
2天前
|
Java 数据挖掘 BI
Java医院绩效考核系统源码B/S+avue+MySQL助力医院实现精细化管理
医院绩效考核系统目标是实现对科室、病区财务指标、客户指标、流程指标、成长指标的全面考核、分析,并与奖金分配、学科建设水平评价挂钩。
30 0
|
2天前
|
Java 调度
Java一分钟之线程池:ExecutorService与Future
【5月更文挑战第12天】Java并发编程中,`ExecutorService`和`Future`是关键组件,简化多线程并提供异步执行能力。`ExecutorService`是线程池接口,用于提交任务到线程池,如`ThreadPoolExecutor`和`ScheduledThreadPoolExecutor`。通过`submit()`提交任务并返回`Future`对象,可检查任务状态、获取结果或取消任务。注意处理`ExecutionException`和避免无限等待。实战示例展示了如何异步执行任务并获取结果。理解这些概念对提升并发性能至关重要。
17 5
|
2天前
|
数据采集 前端开发 Java
Java医院绩效考核系统源码maven+Visual Studio Code一体化人力资源saas平台系统源码
医院绩效解决方案包括医院绩效管理(BSC)、综合奖金核算(RBRVS),涵盖从绩效方案的咨询与定制、数据采集、绩效考核及反馈、绩效奖金核算到科到组、分配到员工个人全流程绩效管理;将医院、科室、医护人员利益绑定;全面激活人才活力;兼顾质量和效益、长期与短期利益;助力医院降本增效,持续改善、优化收入、成本结构。
15 0
|
2天前
|
Java 调度
Java并发编程:深入理解线程池
【5月更文挑战第11天】本文将深入探讨Java中的线程池,包括其基本概念、工作原理以及如何使用。我们将通过实例来解释线程池的优点,如提高性能和资源利用率,以及如何避免常见的并发问题。我们还将讨论Java中线程池的实现,包括Executor框架和ThreadPoolExecutor类,并展示如何创建和管理线程池。最后,我们将讨论线程池的一些高级特性,如任务调度、线程优先级和异常处理。
|
2天前
|
安全 Java 调度
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第12天】 在现代软件开发中,多线程编程是提升应用程序性能和响应能力的关键手段之一。特别是在Java语言中,由于其内置的跨平台线程支持,开发者可以轻松地创建和管理线程。然而,随之而来的并发问题也不容小觑。本文将探讨Java并发编程的核心概念,包括线程安全策略、锁机制以及性能优化技巧。通过实例分析与性能比较,我们旨在为读者提供一套既确保线程安全又兼顾性能的编程指导。
|
1天前
|
Java
阅读《代码整洁之道》总结(1),java多线程面试
阅读《代码整洁之道》总结(1),java多线程面试