Java JUC PriorityBlockingQueue解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 无界阻塞队列 PriorityBlockingQueue

无界阻塞队列 PriorityBlockingQueue


介绍

PriorityBlockingQueue 是一个带有优先级无界阻塞队列,每次出队返回的都是优先级最高或者最低的元素。在内部是使用平衡二叉树堆实现,所以遍历元素不保证有序

默认使用对象的 compareTo 方法进行比较,如果需要自定义比较规则可以自定义 comparators。

1654830069798.png

该类图可以看到,PriorityBlockingQueue 内部有一个数组 queue,用来存放队列元素;size 用来存放元素个数;allocationSpinLock 是个自旋锁,使用CAS操作来保证同时只有一个线程来进行扩容队列,状态只有 0 和 1,0表示当前没有进行扩容,1表示正在扩容。由于是优先级队列,所以有一个比较器 comparator 用来比较大小,另外还有 lock 独占锁,notEmpty 条件变量来实现 take 方法的阻塞,由于是无界队列所以没有 notFull 条件变量,所以 put 是非阻塞的

//二叉树最小堆的实现
private transient Object[] queue;
private transient int size;
private transient volatile int allocationSpinLock;
private transient Comparator<? super E> comparator;
private final ReentrantLock lock;
private final Condition notEmpty;

在构造函数中,默认队列容量为11,默认比较器为 null,也就是默认使用元素的 compareTo 方法来确定优先级,所以队列元素必须实现 Comparable 接口。

private static final int DEFAULT_INITIAL_CAPACITY = 11;
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

offer 操作

offer 操作的作用是在队列中插入一个元素,由于是无界队列,所以一直返回 true。

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    //1. 如果当前元素个数 >= 队列容量 则扩容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        //2. 默认比较器为null
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            //3. 自定义比较器
            siftUpUsingComparator(n, e, array, cmp);
        //4. 队列元素数量增加1,并唤醒notEmpty条件队列中的一个阻塞线程
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

如上代码并不复杂,我们主要看看如何进行扩容和在内部建堆。


我们先看扩容逻辑:

private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        //1. CAS成功则扩容
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                //oldCap<64则扩容执行oldCap+2,否则扩容50%,并且最大值为MAX_ARRAY_SIZE
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
       //2. 第一个线程CAS成功后,第二线程进入这段代码,然后第二个线程让出CPU,尽量让第一个线程获取到锁,但得不到保证
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
}

tryGrow 的作用就是扩容,但是为什么要在扩容前释放锁,然后使用 CAS 控制只有一个线程可以扩容成功?


其实不释放锁也是 ok 的,也就是在扩容期间一直持有该锁,但是扩容需要时间,这段时间内占用锁的话那么其他线程在这个时候就不能进行出队和入队操作,降低了并发性。所以为了提高性能,使用 CAS 来控制只有一个线程可以进行扩容,并且在扩容前释放锁,进而让其他线程可以进行入队和出队操作。


扩容线程扩容完毕后会重置自旋锁变量 allocationSpinLock 为 0,这里并没有使用 UNSAFE 方法的 CAS 进行设置是因为同时只可能有一个线程获取到该锁,并且 allocationSpinLock 被修饰为了 volatile 的。


我们接着看建堆算法:

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    // 队列元素个数 > 0 则判断插入位置,否则直接入队
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

熟悉二叉堆的话,该段代码并不复杂,我们看下图具体结构:

image.png

首先我们看parent = (k - 1) >>> 1,首先 k - 1 就是拿到当前真正的下标的位置,随后 >>> 1拿到父节点的位置,该图我们得知,k = 7,执行(k - 1) >>> 1之后得到的parent = 3,根据下标我们知道是元素 6。


PriorityQueue 是一个完全二叉树,且不允许出现 null 节点,其父节点都比叶子节点小,这个是堆排序中的最小堆。二叉树存入数组的方式很简单,就是从上到下,从左到右。完全二叉树可以和数组中的位置一一对应:


  • 左叶子节点 = 父节点下标 * 2 + 1
  • 右叶子节点 = 父节点下标 * 2 + 2
  • 父节点 = (叶子节点 - 1) / 2


实际上就是将要插入的元素 x 和它的父节点元素 6 做对比,如果比父节点大就一直向上移动。


poll 操作

poll 操作的作用是获取队列内部堆树的根节点元素,如果队列为空,则返回 null。

public E poll() {
    final ReentrantLock lock = this.lock;
    //获取独占锁
    lock.lock();
    try {
        return dequeue();
    } finally {
        //释放独占锁
        lock.unlock();
    }
}

我们主要看一下 dequeue 方法。

private E dequeue() {
    int n = size - 1;
    //队列为空,返回null
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        //1.获取头部元素
        E result = (E) array[0];
        //2. 获取队尾元素,并赋值为null
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)//3.
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n; //4.
        return result;
    }
}

该方法如果队列为空则直接返回 null,否则执行代码(1)获取数组第一个元素作为返回值存放到变量 Result 中,这里需要注意,数组里面的第一个元素是优先级最小或者最大的元素,出队操作就是返回这个元素。然后代码(2)获取队列尾部元素并存放到变量 x 中,且置空尾部节点,然后执行代码(3)将变量 x 插入到数组下标为 0 的位置,之后重新调整堆为最大或者最小堆,然后返回。这里重要的是,去掉堆的根节点后,如何使用剩下的节点重新调整一个最大或者最小堆。下面我们看下 siftDownComparable 的实现。

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

由于队列数组第 0 个元素为根,因此出队时要移除它。这时数组就不再是最小的堆了,所以需要调整堆。具体是从被移除的树根的左右子树中找一个最小的值来当树根,左右子树又会找自己左右子树里面那个最小值,这是一个递归过程,直到叶子节点结束递归。

假设目前队列内容如下图:

image.png

上图中树根的 leftChildVal = 4; rightChildVal = 6;由于4 < 6,所以c = 4。然后由于11 > 4,也就是key > c,所以使用元素 4 覆盖树根节点的值。


然后树根的左子树树根的左右孩子节点中的 leftChildVal = 8; rightChildVal = 10;由于8 < 10,所以c = 8。然后由于11 > 8,也就是 key > c,所以元素 8 作为树根左子树的根节点,现在树的形状如下图第三步所示。这时候判断是否k < half,结果为 false,所以退出循环。然后把x = 11的元素设置到数组下标为3的地方,这时候堆树如下图第四步所示,至此调整堆完毕。

image.png

put 操作

put 操作内部调用的是 offer 操作,由于是无界队列,所以不需要阻塞。

public void put(E e) {
    offer(e); // never need to block
}

take 操作

take 操作的作用是获取队列内部堆树的根节点元素,如果队列为空则阻塞。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

size 操作

获取队列元素个数。如下代码在返回 size 前加了锁,以保证在调用 size 方法时不会有其他线程进行入队和出队操作。另外,由于 size 变量没有被修饰为 volatie 的,所以这里加锁也保证了在多线程下 size 变量的内存可见性。

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return size;
    } finally {
        lock.unlock();
    }
}

总结

PriorityBlockingQueue 类似于 ArrayBlockingQueue,在内部使用一个独占锁来控制同时只有一个线程可以进行入队和出队操作。另外,PriorityBlockingQueue 只使用了一个 notEmpty 条件变量而没有使用 notFull,因为是无界队列,执行 put 操作时永远不会处于 await 状态,所以也不需要被唤醒。而 take 方法是阻塞方法,并且是可被中断的。当需要存放有优先级的元素时该队列比较有用。

相关文章
|
9天前
|
传感器 监控 Java
Java代码结构解析:类、方法、主函数(1分钟解剖室)
### Java代码结构简介 掌握Java代码结构如同拥有程序世界的建筑蓝图,类、方法和主函数构成“黄金三角”。类是独立的容器,承载成员变量和方法;方法实现特定功能,参数控制输入环境;主函数是程序入口。常见错误包括类名与文件名不匹配、忘记static修饰符和花括号未闭合。通过实战案例学习电商系统、游戏角色控制和物联网设备监控,理解类的作用、方法类型和主函数任务,避免典型错误,逐步提升编程能力。 **脑图速记法**:类如太空站,方法即舱段;main是发射台,static不能换;文件名对仗,括号要成双;参数是坐标,void不返航。
29 5
|
21天前
|
Java API 数据处理
深潜数据海洋:Java文件读写全面解析与实战指南
通过本文的详细解析与实战示例,您可以系统地掌握Java中各种文件读写操作,从基本的读写到高效的NIO操作,再到文件复制、移动和删除。希望这些内容能够帮助您在实际项目中处理文件数据,提高开发效率和代码质量。
26 4
|
1月前
|
XML JSON Java
Java中Log级别和解析
日志级别定义了日志信息的重要程度,从低到高依次为:TRACE(详细调试)、DEBUG(开发调试)、INFO(一般信息)、WARN(潜在问题)、ERROR(错误信息)和FATAL(严重错误)。开发人员可根据需要设置不同的日志级别,以控制日志输出量,避免影响性能或干扰问题排查。日志框架如Log4j 2由Logger、Appender和Layout组成,通过配置文件指定日志级别、输出目标和格式。
|
2月前
|
存储 Java 计算机视觉
Java二维数组的使用技巧与实例解析
本文详细介绍了Java中二维数组的使用方法
62 15
|
数据采集 Java 大数据
java高并发系列 - 第14天:JUC中的LockSupport工具类,必备技能
java高并发系列 - 第14天:JUC中的LockSupport工具类,必备技能这是java高并发系列第14篇文章。 本文主要内容: 讲解3种让线程等待和唤醒的方法,每种方法配合具体的示例介绍LockSupport主要用法对比3种方式,了解他们之间的区别LockSupport位于java.util.concurrent(简称juc)包中,算是juc中一个基础类,juc中很多地方都会使用LockSupport,非常重要,希望大家一定要掌握。
1259 0
|
15天前
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
139 60
【Java并发】【线程池】带你从0-1入门线程池
|
4天前
|
存储 网络协议 安全
Java网络编程,多线程,IO流综合小项目一一ChatBoxes
**项目介绍**:本项目实现了一个基于TCP协议的C/S架构控制台聊天室,支持局域网内多客户端同时聊天。用户需注册并登录,用户名唯一,密码格式为字母开头加纯数字。登录后可实时聊天,服务端负责验证用户信息并转发消息。 **项目亮点**: - **C/S架构**:客户端与服务端通过TCP连接通信。 - **多线程**:采用多线程处理多个客户端的并发请求,确保实时交互。 - **IO流**:使用BufferedReader和BufferedWriter进行数据传输,确保高效稳定的通信。 - **线程安全**:通过同步代码块和锁机制保证共享数据的安全性。
53 23
|
11天前
|
Java 调度
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
当我们创建一个`ThreadPoolExecutor`的时候,你是否会好奇🤔,它到底发生了什么?比如:我传的拒绝策略、线程工厂是啥时候被使用的? 核心线程数是个啥?最大线程数和它又有什么关系?线程池,它是怎么调度,我们传入的线程?...不要着急,小手手点上关注、点赞、收藏。主播马上从源码的角度带你们探索神秘线程池的世界...
77 0
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
|
27天前
|
Java 程序员 开发者
Java社招面试题:一个线程运行时发生异常会怎样?
大家好,我是小米。今天分享一个经典的 Java 面试题:线程运行时发生异常,程序会怎样处理?此问题考察 Java 线程和异常处理机制的理解。线程发生异常,默认会导致线程终止,但可以通过 try-catch 捕获并处理,避免影响其他线程。未捕获的异常可通过 Thread.UncaughtExceptionHandler 处理。线程池中的异常会被自动处理,不影响任务执行。希望这篇文章能帮助你深入理解 Java 线程异常处理机制,为面试做好准备。如果你觉得有帮助,欢迎收藏、转发!
103 14
|
1月前
|
安全 Java 程序员
Java 面试必问!线程构造方法和静态块的执行线程到底是谁?
大家好,我是小米。今天聊聊Java多线程面试题:线程类的构造方法和静态块是由哪个线程调用的?构造方法由创建线程实例的主线程调用,静态块在类加载时由主线程调用。理解这些细节有助于掌握Java多线程机制。下期再见! 简介: 本文通过一个常见的Java多线程面试题,详细讲解了线程类的构造方法和静态块是由哪个线程调用的。构造方法由创建线程实例的主线程调用,静态块在类加载时由主线程调用。理解这些细节对掌握Java多线程编程至关重要。
57 13

热门文章

最新文章

推荐镜像

更多