Java线程池ThreadPoolExcutor源码解读详解04-阻塞队列之PriorityBlockingQueue原理及扩容机制详解

本文涉及的产品
应用实时监控服务-用户体验监控,每月100OCU免费额度
可观测监控 Prometheus 版,每月50GB免费额度
函数计算FC,每月15万CU 3个月
简介: 1. **继承实现图关系**: - `PriorityBlockingQueue`实现了`BlockingQueue`接口,提供了线程安全的队列操作。 - 内部基于优先级堆(小顶堆或大顶堆)的数据结构实现,可以保证元素按照优先级顺序出队。2. **底层数据存储结构**: - 默认容量是11,存储数据的数组会在需要时动态扩容。 - 数组长度总是2的幂,以满足堆的性质。3. **构造器**: - 无参构造器创建一个默认容量的队列,元素需要实现`Comparable`接口。 - 指定容量构造器允许设置初始容量,但不指定排序规则。 - 可指定容量和比较


一、继承实现图关系

image.gif 1711818828442.png

二、底层数据存储结构

属性说明:

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
...
// 数组默认容量11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 存储数据的数组队列
private transient Object[] queue;
// 当前数组队列数据大小
private transient int size;
// 比较规则(排序规则)
private transient Comparator<? super E> comparator;
// 独占锁
private final ReentrantLock lock = new ReentrantLock();
// 队列为空是阻塞
private final Condition notEmpty = lock.newCondition();
// CAS自旋锁
private transient volatile int allocationSpinLock;
// 序列化优先队列
private PriorityQueue<E> q;
...
}

image.gif

构造器:

/**
 * 默认构造器,默认容量大小为11,无排序规则
 */
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
/**
 * 指定容器大小,无排序规则
 */
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
/**
 * 指定容器大小 和 指定排序规则
 */
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.comparator = comparator;
    this.queue = new Object[Math.max(1, initialCapacity)];
}
...

image.gif

注:默认构造器容量大小为11,没有比较器,如果未指定比较器,插入队列的元素需要实现Comparable接口

三、特点及优缺点

3.1 特点

  • 支持优先级排序的无界阻塞队列,优先级高的先出队,先先级低的后出队
  • 存储数据结构为数组,以二叉堆形式存储于数组中,二叉堆叶子节点均小于父节点
  • 存取都为一把锁
  • 入队不阻塞,出队阻塞
  • 出队取队头元素
  • 默认容量是11,数组最大容量为Integer.MAX_VALUE - 8

3.2 优缺点

  • 根据排序规则排序(可提供排序规则,若不提供则队列元素必须实现 Comparable接口)
  • put不阻塞(若生产速度快于消费速度,会耗尽所有的可用堆内空间)
  • 优先级排序无界阻塞队列
  • 取数据锁为独占锁
  • 内部数组长度不够会进行扩容
  • 新增元素进行最小二叉推排序

四、源码详解

读取部分源码:

  • 添加任务方法
  • 获取和删除任务方法
  • 扩容

阅读明白这几个接口即可,其它都很简单。

4.1 put、offer

  • 数组无界,不阻塞添加
  • 不允许添加空对象
  • 父节点大于所有子节点
public void put(E e) {
    offer(e); // never need to block
}
/**
 * 向队列中添加元素 <br/>
 * 添加元素若为空,直接抛出异常 <br/>
 */
public boolean offer(E e) {
    // 添加的对象不能为空
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] es;
    while ((n = size) >= (cap = (es = queue).length)) {
        // 若数组已满,则进行扩容
        tryGrow(es, cap);
    } 
    try {
        final Comparator<? super E> cmp;
        if ((cmp = comparator) == null) {
            // 无配置排序规则时,走对象自带的排序逻辑
            siftUpComparable(n, e, es);
        }   
        else {
            // 走自定义排序规则
            siftUpUsingComparator(n, e, es, cmp);
        } 
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}
/**
 * 创建队列时无指定排序规则,调添加对象的compareTo进行排序 <br/>
 * 所以,添加的元素必须实现Comparable接口 <br/>
 */
private static <T> void siftUpComparable(int k, T x, Object[] es) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        // 由新元素下标(移动下标),获取父节点的数组下标
        int parent = (k - 1) >>> 1;
        Object e = es[parent];
        // 若排序规则成立,则直接退出循环,key放到k的位置
        if (key.compareTo((T) e) >= 0)
            break;
        // 若不成立,父节点元素放到位置k
        es[k] = e;
        // 移动下标指向父节点下标,然后继续向上(父节点的父节点)比较,直到if成立,或到根节点
        k = parent;
    }
    es[k] = key;
}
/**
 * 创建队列时指定排序规则,则排序规则按自定义排序处理数组二叉堆数据 <br/>
 * 逻辑同siftUpComparable
 */
private static <T> void siftUpUsingComparator(
        int k, T x, Object[] es, Comparator<? super T> cmp) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = es[parent];
        if (cmp.compare(x, (T) e) >= 0)
            break;
        es[k] = e;
        k = parent;
    }
    es[k] = x;
}

image.gif

4.2 take

  • 若数组队列为空,则阻塞。
  • 取出最后一个元素L,并最后一个下标的值赋值null
  • 1 二叉堆下标为0出队(树的根节点)
  • 2 大的子节点元素a放到树的根节点(假设排序规则为降序)
  • 3. 以元素a的原位置似为树,重复1,2步骤,直到找到最小树 s(有叶子节点)
  • 4. s的叶子节点大子元素b放到根节点,原树最后一个元素L放到叶子节点元素b的原位置

       元素的移动次数最多为树的高度

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lockInterruptibly();
    E result;
    try {
        // 若队列为空,则阻塞
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}
/**
 * 出队(假调排序规则是降序) <br/>
 * 取出最后一个元素a,并且最后一个标的值赋值为空 <br/>
 * 1. 取出第一个元素,则该位置空缺 <br/>
 * 2. 大的子元素放到第一个位置, 则这个位置空缺 <br/>
 * 3. 以空缺为父节点,重复1和2步骤,找到最小子树
 * 4. 大的叶子节点放到父节点,元素a放到大的叶子节点原来的位置
 */
private E dequeue() {
    // assert lock.isHeldByCurrentThread();
    final Object[] es;
    final E result;
    // 取出第一个元素
    if ((result = (E) ((es = queue)[0])) != null) {
        final int n;
        // 取出最后一个元素
        final E x = (E) es[(n = --size)];
        // 删除最后一个下标元素
        es[n] = null;
        if (n > 0) {
            final Comparator<? super E> cmp;
            // 大的子节点中
            if ((cmp = comparator) == null)
                siftDownComparable(0, x, es, n);
            else
                siftDownUsingComparator(0, x, es, n, cmp);
        }
    }
    return result;
}
private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
    // assert n > 0;
    Comparable<? super T> key = (Comparable<? super T>)x;
    int half = n >>> 1;           // loop while a non-leaf
    while (k < half) {
        // 计算出左子节点下标
        int child = (k << 1) + 1; // assume left child is least
        Object c = es[child];
        int right = child + 1;
        // 取出大的子节点
        if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
            c = es[child = right];
        if (key.compareTo((T) c) <= 0)
            break;
        // 大的子节点元素放到父节点的位置
        es[k] = c;
        // 以大的子节点元素为根,重复以上逻辑,继续找大的子节点放到其位置
        k = child;
    }
    // 原树的最后一个位置的元素放到最后一个移动的叶子节点的原来位置上
    es[k] = key;
}
/**
 * 逻辑同上 <br/>
 */
private static <T> void siftDownUsingComparator(
        int k, T x, Object[] es, int n, Comparator<? super T> cmp) {
    // assert n > 0;
    int half = n >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;
        Object c = es[child];
        int right = child + 1;
        if (right < n && cmp.compare((T) c, (T) es[right]) > 0)
            c = es[child = right];
        if (cmp.compare(x, (T) c) <= 0)
            break;
        es[k] = c;
        k = child;
    }
    es[k] = x;
}

image.gif

4.3 扩容原理

  • CAS自旋锁,避免并发扩容
  • 扩容时机:元素数量等于数组大小时进行扩容
  • 扩容大小:
  • 容量(len)小于64时 len = len + (len + 2),即新容量 = 原容量*2 + 2
  • 容量(len) >= 64 时 len = len + len / 2,即新容量 = 原容量 + 原容量/2
  • 创建新数组,原数组数据复制过来
public boolean offer(E e) {
    ...
    // 元素数量等于数组大小时进行扩容
    // 有可能别的线程扩容了,又添加元素满了,所以while判断就轮到本线程再扩容
    while ((n = size) >= (cap = (es = queue).length))
        tryGrow(es, cap);
    ...
}
/**
 * 扩容
 */
private void tryGrow(Object[] array, int oldCap) {
    // 必须释放然后重新获得主锁,因为只有一把锁,释放锁后可以读
    lock.unlock();
    Object[] newArray = null;
    // CAS自旋锁,避免并发扩容
    if (allocationSpinLock == 0 &&
            ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
        try {
            // 计算新容量大小,容量<64时 len = len + (len + 2)
            // 计算新容量大小,容量>=64时 len = len + (len / 2)
            int growth = (oldCap < 64)
                    ? (oldCap + 2) // grow faster if small
                    : (oldCap >> 1);
            int newCap = ArraysSupport.newLength(oldCap, 1, growth);
            if (queue == array)
                // 创建新的空数组
                newArray = new Object[newCap];
        } finally {
            // 还原自旋锁为0
            allocationSpinLock = 0;
        }
    }
    // 如果另一个线程已分配,则退出
    if (newArray == null)
        Thread.yield();
    // 数组队列指向新数组,旧数据复制到新数组前加锁
    lock.lock();
    if (newArray != null && queue == array) {
        // 队列数组指向新数组
        queue = newArray;
        // 复制旧数据到新数组中
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

image.gif

五、作用和使用场景

  • 业务办理排队叫号,VIP客户插队
  • 枪购
相关文章
|
3天前
|
监控 Java API
探索Java NIO:究竟在哪些领域能大显身手?揭秘原理、应用场景与官方示例代码
Java NIO(New IO)自Java SE 1.4引入,提供比传统IO更高效、灵活的操作,支持非阻塞IO和选择器特性,适用于高并发、高吞吐量场景。NIO的核心概念包括通道(Channel)、缓冲区(Buffer)和选择器(Selector),能实现多路复用和异步操作。其应用场景涵盖网络通信、文件操作、进程间通信及数据库操作等。NIO的优势在于提高并发性和性能,简化编程;但学习成本较高,且与传统IO存在不兼容性。尽管如此,NIO在构建高性能框架如Netty、Mina和Jetty中仍广泛应用。
14 3
|
3天前
|
安全 算法 Java
Java CAS原理和应用场景大揭秘:你掌握了吗?
CAS(Compare and Swap)是一种乐观锁机制,通过硬件指令实现原子操作,确保多线程环境下对共享变量的安全访问。它避免了传统互斥锁的性能开销和线程阻塞问题。CAS操作包含三个步骤:获取期望值、比较当前值与期望值是否相等、若相等则更新为新值。CAS广泛应用于高并发场景,如数据库事务、分布式锁、无锁数据结构等,但需注意ABA问题。Java中常用`java.util.concurrent.atomic`包下的类支持CAS操作。
19 2
|
25天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
53 12
|
1月前
|
存储 算法 Java
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
本文详解自旋锁的概念、优缺点、使用场景及Java实现。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
|
1月前
|
Java
Java之CountDownLatch原理浅析
本文介绍了Java并发工具类`CountDownLatch`的使用方法、原理及其与`Thread.join()`的区别。`CountDownLatch`通过构造函数接收一个整数参数作为计数器,调用`countDown`方法减少计数,`await`方法会阻塞当前线程,直到计数为零。文章还详细解析了其内部机制,包括初始化、`countDown`和`await`方法的工作原理,并给出了一个游戏加载场景的示例代码。
Java之CountDownLatch原理浅析
|
1月前
|
Java 索引 容器
Java ArrayList扩容的原理
Java 的 `ArrayList` 是基于数组实现的动态集合。初始时,`ArrayList` 底层创建一个空数组 `elementData`,并设置 `size` 为 0。当首次添加元素时,会调用 `grow` 方法将数组扩容至默认容量 10。之后每次添加元素时,如果当前数组已满,则会再次调用 `grow` 方法进行扩容。扩容规则为:首次扩容至 10,后续扩容至原数组长度的 1.5 倍或根据实际需求扩容。例如,当需要一次性添加 100 个元素时,会直接扩容至 110 而不是 15。
Java ArrayList扩容的原理
|
1月前
|
存储 Java 关系型数据库
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接创建、分配、复用和释放等操作,并通过电商应用实例展示了如何选择合适的连接池库(如HikariCP)和配置参数,实现高效、稳定的数据库连接管理。
66 2
|
安全 Java
Java并发编程笔记之CopyOnWriteArrayList源码分析
并发包中并发List只有CopyOnWriteArrayList这一个,CopyOnWriteArrayList是一个线程安全的ArrayList,对其进行修改操作和元素迭代操作都是在底层创建一个拷贝数组(快照)上进行的,也就是写时拷贝策略。
19556 0
|
Java 安全
Java并发编程笔记之读写锁 ReentrantReadWriteLock 源码分析
我们知道在解决线程安全问题上使用 ReentrantLock 就可以,但是 ReentrantLock 是独占锁,同时只有一个线程可以获取该锁,而实际情况下会有写少读多的场景,显然 ReentrantLock 满足不了需求,所以 ReentrantReadWriteLock 应运而生,ReentrantReadWriteLock 采用读写分离,多个线程可以同时获取读锁。
3140 0
|
Java
Java并发编程笔记之FutureTask源码分析
FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。
4299 0