Java线程池ThreadPoolExcutor源码解读详解04-阻塞队列之PriorityBlockingQueue原理及扩容机制详解

本文涉及的产品
性能测试 PTS,5000VUM额度
应用实时监控服务-应用监控,每月50GB免费额度
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 1. **继承实现图关系**: - `PriorityBlockingQueue`实现了`BlockingQueue`接口,提供了线程安全的队列操作。 - 内部基于优先级堆(小顶堆或大顶堆)的数据结构实现,可以保证元素按照优先级顺序出队。2. **底层数据存储结构**: - 默认容量是11,存储数据的数组会在需要时动态扩容。 - 数组长度总是2的幂,以满足堆的性质。3. **构造器**: - 无参构造器创建一个默认容量的队列,元素需要实现`Comparable`接口。 - 指定容量构造器允许设置初始容量,但不指定排序规则。 - 可指定容量和比较


一、继承实现图关系

image.gif 1711818828442.png

二、底层数据存储结构

属性说明:

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
...
// 数组默认容量11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 存储数据的数组队列
private transient Object[] queue;
// 当前数组队列数据大小
private transient int size;
// 比较规则(排序规则)
private transient Comparator<? super E> comparator;
// 独占锁
private final ReentrantLock lock = new ReentrantLock();
// 队列为空是阻塞
private final Condition notEmpty = lock.newCondition();
// CAS自旋锁
private transient volatile int allocationSpinLock;
// 序列化优先队列
private PriorityQueue<E> q;
...
}

image.gif

构造器:

/**
 * 默认构造器,默认容量大小为11,无排序规则
 */
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
/**
 * 指定容器大小,无排序规则
 */
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
/**
 * 指定容器大小 和 指定排序规则
 */
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.comparator = comparator;
    this.queue = new Object[Math.max(1, initialCapacity)];
}
...

image.gif

注:默认构造器容量大小为11,没有比较器,如果未指定比较器,插入队列的元素需要实现Comparable接口

三、特点及优缺点

3.1 特点

  • 支持优先级排序的无界阻塞队列,优先级高的先出队,先先级低的后出队
  • 存储数据结构为数组,以二叉堆形式存储于数组中,二叉堆叶子节点均小于父节点
  • 存取都为一把锁
  • 入队不阻塞,出队阻塞
  • 出队取队头元素
  • 默认容量是11,数组最大容量为Integer.MAX_VALUE - 8

3.2 优缺点

  • 根据排序规则排序(可提供排序规则,若不提供则队列元素必须实现 Comparable接口)
  • put不阻塞(若生产速度快于消费速度,会耗尽所有的可用堆内空间)
  • 优先级排序无界阻塞队列
  • 取数据锁为独占锁
  • 内部数组长度不够会进行扩容
  • 新增元素进行最小二叉推排序

四、源码详解

读取部分源码:

  • 添加任务方法
  • 获取和删除任务方法
  • 扩容

阅读明白这几个接口即可,其它都很简单。

4.1 put、offer

  • 数组无界,不阻塞添加
  • 不允许添加空对象
  • 父节点大于所有子节点
public void put(E e) {
    offer(e); // never need to block
}
/**
 * 向队列中添加元素 <br/>
 * 添加元素若为空,直接抛出异常 <br/>
 */
public boolean offer(E e) {
    // 添加的对象不能为空
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] es;
    while ((n = size) >= (cap = (es = queue).length)) {
        // 若数组已满,则进行扩容
        tryGrow(es, cap);
    } 
    try {
        final Comparator<? super E> cmp;
        if ((cmp = comparator) == null) {
            // 无配置排序规则时,走对象自带的排序逻辑
            siftUpComparable(n, e, es);
        }   
        else {
            // 走自定义排序规则
            siftUpUsingComparator(n, e, es, cmp);
        } 
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}
/**
 * 创建队列时无指定排序规则,调添加对象的compareTo进行排序 <br/>
 * 所以,添加的元素必须实现Comparable接口 <br/>
 */
private static <T> void siftUpComparable(int k, T x, Object[] es) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        // 由新元素下标(移动下标),获取父节点的数组下标
        int parent = (k - 1) >>> 1;
        Object e = es[parent];
        // 若排序规则成立,则直接退出循环,key放到k的位置
        if (key.compareTo((T) e) >= 0)
            break;
        // 若不成立,父节点元素放到位置k
        es[k] = e;
        // 移动下标指向父节点下标,然后继续向上(父节点的父节点)比较,直到if成立,或到根节点
        k = parent;
    }
    es[k] = key;
}
/**
 * 创建队列时指定排序规则,则排序规则按自定义排序处理数组二叉堆数据 <br/>
 * 逻辑同siftUpComparable
 */
private static <T> void siftUpUsingComparator(
        int k, T x, Object[] es, Comparator<? super T> cmp) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = es[parent];
        if (cmp.compare(x, (T) e) >= 0)
            break;
        es[k] = e;
        k = parent;
    }
    es[k] = x;
}

image.gif

4.2 take

  • 若数组队列为空,则阻塞。
  • 取出最后一个元素L,并最后一个下标的值赋值null
  • 1 二叉堆下标为0出队(树的根节点)
  • 2 大的子节点元素a放到树的根节点(假设排序规则为降序)
  • 3. 以元素a的原位置似为树,重复1,2步骤,直到找到最小树 s(有叶子节点)
  • 4. s的叶子节点大子元素b放到根节点,原树最后一个元素L放到叶子节点元素b的原位置

       元素的移动次数最多为树的高度

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lockInterruptibly();
    E result;
    try {
        // 若队列为空,则阻塞
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}
/**
 * 出队(假调排序规则是降序) <br/>
 * 取出最后一个元素a,并且最后一个标的值赋值为空 <br/>
 * 1. 取出第一个元素,则该位置空缺 <br/>
 * 2. 大的子元素放到第一个位置, 则这个位置空缺 <br/>
 * 3. 以空缺为父节点,重复1和2步骤,找到最小子树
 * 4. 大的叶子节点放到父节点,元素a放到大的叶子节点原来的位置
 */
private E dequeue() {
    // assert lock.isHeldByCurrentThread();
    final Object[] es;
    final E result;
    // 取出第一个元素
    if ((result = (E) ((es = queue)[0])) != null) {
        final int n;
        // 取出最后一个元素
        final E x = (E) es[(n = --size)];
        // 删除最后一个下标元素
        es[n] = null;
        if (n > 0) {
            final Comparator<? super E> cmp;
            // 大的子节点中
            if ((cmp = comparator) == null)
                siftDownComparable(0, x, es, n);
            else
                siftDownUsingComparator(0, x, es, n, cmp);
        }
    }
    return result;
}
private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
    // assert n > 0;
    Comparable<? super T> key = (Comparable<? super T>)x;
    int half = n >>> 1;           // loop while a non-leaf
    while (k < half) {
        // 计算出左子节点下标
        int child = (k << 1) + 1; // assume left child is least
        Object c = es[child];
        int right = child + 1;
        // 取出大的子节点
        if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
            c = es[child = right];
        if (key.compareTo((T) c) <= 0)
            break;
        // 大的子节点元素放到父节点的位置
        es[k] = c;
        // 以大的子节点元素为根,重复以上逻辑,继续找大的子节点放到其位置
        k = child;
    }
    // 原树的最后一个位置的元素放到最后一个移动的叶子节点的原来位置上
    es[k] = key;
}
/**
 * 逻辑同上 <br/>
 */
private static <T> void siftDownUsingComparator(
        int k, T x, Object[] es, int n, Comparator<? super T> cmp) {
    // assert n > 0;
    int half = n >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;
        Object c = es[child];
        int right = child + 1;
        if (right < n && cmp.compare((T) c, (T) es[right]) > 0)
            c = es[child = right];
        if (cmp.compare(x, (T) c) <= 0)
            break;
        es[k] = c;
        k = child;
    }
    es[k] = x;
}

image.gif

4.3 扩容原理

  • CAS自旋锁,避免并发扩容
  • 扩容时机:元素数量等于数组大小时进行扩容
  • 扩容大小:
  • 容量(len)小于64时 len = len + (len + 2),即新容量 = 原容量*2 + 2
  • 容量(len) >= 64 时 len = len + len / 2,即新容量 = 原容量 + 原容量/2
  • 创建新数组,原数组数据复制过来
public boolean offer(E e) {
    ...
    // 元素数量等于数组大小时进行扩容
    // 有可能别的线程扩容了,又添加元素满了,所以while判断就轮到本线程再扩容
    while ((n = size) >= (cap = (es = queue).length))
        tryGrow(es, cap);
    ...
}
/**
 * 扩容
 */
private void tryGrow(Object[] array, int oldCap) {
    // 必须释放然后重新获得主锁,因为只有一把锁,释放锁后可以读
    lock.unlock();
    Object[] newArray = null;
    // CAS自旋锁,避免并发扩容
    if (allocationSpinLock == 0 &&
            ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
        try {
            // 计算新容量大小,容量<64时 len = len + (len + 2)
            // 计算新容量大小,容量>=64时 len = len + (len / 2)
            int growth = (oldCap < 64)
                    ? (oldCap + 2) // grow faster if small
                    : (oldCap >> 1);
            int newCap = ArraysSupport.newLength(oldCap, 1, growth);
            if (queue == array)
                // 创建新的空数组
                newArray = new Object[newCap];
        } finally {
            // 还原自旋锁为0
            allocationSpinLock = 0;
        }
    }
    // 如果另一个线程已分配,则退出
    if (newArray == null)
        Thread.yield();
    // 数组队列指向新数组,旧数据复制到新数组前加锁
    lock.lock();
    if (newArray != null && queue == array) {
        // 队列数组指向新数组
        queue = newArray;
        // 复制旧数据到新数组中
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

image.gif

五、作用和使用场景

  • 业务办理排队叫号,VIP客户插队
  • 枪购
相关文章
|
9天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
19天前
|
存储 Java 关系型数据库
高效连接之道:Java连接池原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。频繁创建和关闭连接会消耗大量资源,导致性能瓶颈。为此,Java连接池技术通过复用连接,实现高效、稳定的数据库连接管理。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接池的基本操作、配置和使用方法,以及在电商应用中的具体应用示例。
38 5
|
8天前
|
存储 算法 Java
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
本文详解自旋锁的概念、优缺点、使用场景及Java实现。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
|
9天前
|
Java
Java之CountDownLatch原理浅析
本文介绍了Java并发工具类`CountDownLatch`的使用方法、原理及其与`Thread.join()`的区别。`CountDownLatch`通过构造函数接收一个整数参数作为计数器,调用`countDown`方法减少计数,`await`方法会阻塞当前线程,直到计数为零。文章还详细解析了其内部机制,包括初始化、`countDown`和`await`方法的工作原理,并给出了一个游戏加载场景的示例代码。
Java之CountDownLatch原理浅析
|
11天前
|
Java 索引 容器
Java ArrayList扩容的原理
Java 的 `ArrayList` 是基于数组实现的动态集合。初始时,`ArrayList` 底层创建一个空数组 `elementData`,并设置 `size` 为 0。当首次添加元素时,会调用 `grow` 方法将数组扩容至默认容量 10。之后每次添加元素时,如果当前数组已满,则会再次调用 `grow` 方法进行扩容。扩容规则为:首次扩容至 10,后续扩容至原数组长度的 1.5 倍或根据实际需求扩容。例如,当需要一次性添加 100 个元素时,会直接扩容至 110 而不是 15。
Java ArrayList扩容的原理
|
17天前
|
存储 Java 关系型数据库
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接创建、分配、复用和释放等操作,并通过电商应用实例展示了如何选择合适的连接池库(如HikariCP)和配置参数,实现高效、稳定的数据库连接管理。
34 2
|
17天前
|
算法 Java 数据库连接
Java连接池技术,从基础概念出发,解析了连接池的工作原理及其重要性
本文详细介绍了Java连接池技术,从基础概念出发,解析了连接池的工作原理及其重要性。连接池通过复用数据库连接,显著提升了应用的性能和稳定性。文章还展示了使用HikariCP连接池的示例代码,帮助读者更好地理解和应用这一技术。
31 1
|
6月前
|
Java 调度
Java并发编程:深入理解线程池的原理与实践
【4月更文挑战第6天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将从线程池的基本原理入手,逐步解析其工作过程,以及如何在实际开发中合理使用线程池以提高程序性能。同时,我们还将关注线程池的一些高级特性,如自定义线程工厂、拒绝策略等,以帮助读者更好地掌握线程池的使用技巧。
|
3月前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
2月前
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。