java并发编程笔记--PriorityBlockingQueue实现

简介:     PriorityBlockingQueue可以理解为线程安全的PriorityQueue,其实现原理与PriorityQueue类似,在此基础上实现了BlockingQueue接口,能够作为阻塞队列使用,由于PriorityBlockingQueue是无界队列,因而使用put方法并不会阻塞,offer方法不会返回false。

    PriorityBlockingQueue可以理解为线程安全的PriorityQueue,其实现原理与PriorityQueue类似,在此基础上实现了BlockingQueue接口,能够作为阻塞队列使用,由于PriorityBlockingQueue是无界队列,因而使用put方法并不会阻塞,offer方法不会返回false。PriorityBlockingQueue也是基于最小二叉堆实现,对于堆数组中索引为k的节点,其父节点为(k-1)/2,其左右子节点分别为2k+1,2k+2。PriorityBlockingQueue使用ReentrantLock来控制所有公用操作的线程同步,使用基于CAS实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行。

类图

image

PriorityBlockingQueue的继承层次如上图:

  • Collection:所有集合的基类,定义集合的基本操作API;
  • AbstractCollection:实现公用的集合操作;
  • Iterable:赋予集合迭代器的能力;
  • Queue:定义队列的基本操作API,比如:offer、poll等;
  • AbstractQueue:实现公用的队列操作;
  • BlockingQueue:定义阻塞队列的基本操作API,比如:put、take等;
  • PriorityBlockingQueue:阻塞队列的实现类,继承AbstractQueue类,并实现BlockingQueue接口;

实现

主要成员变量

/**
 * 存放最小二叉堆的数组
 */
private transient Object[] queue;

/**
 * 优先队列中包含元素个数
 */
private transient int size;

/**
 * 比较器,用于定制元素比较规则;
 */
private transient Comparator<? super E> comparator;

/**
 * 用于同步队列操作的锁
 */
private final ReentrantLock lock;

/**
 * 当队列为空时,用于阻塞出队操作
 */
private final Condition notEmpty;

/**
 * 自旋锁标识字段,通过CAS操作进行比较更新;
 * 用于动态扩容操作;值为1时,表示加锁;为0时,标识未加锁;
 */
private transient volatile int allocationSpinLock;

可以看到PriorityBlockingQueue的成员变量和PriorityQueue的成员变量相差不大,多了3个用于控制线程安全的属性:lock,notEmpty,allocationSpinLock,分别用于队列公有操作、出队阻塞、动态扩容。

初始化

PriorityBlockingQueue的初始化同PriorityQueue的类似,多了对lock、notEmpty的初始化。

public PriorityBlockingQueue() {
    // 默认容量为11
    this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

public PriorityBlockingQueue(Collection<? extends E> c) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    boolean heapify = true; // true if not known to be in heap order
    boolean screen = true;  // true if must screen for nulls
    // 如果传入集合是有序集,则无须进行堆有序化
    if (c instanceof SortedSet<?>) {
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
        heapify = false;
    }
    // 如果传入集合是PriorityBlockingQueue类型,则不进行堆有序化
    else if (c instanceof PriorityBlockingQueue<?>) {
        PriorityBlockingQueue<? extends E> pq =
            (PriorityBlockingQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
        screen = false;
        if (pq.getClass() == PriorityBlockingQueue.class) // exact match
            heapify = false;
    }
    Object[] a = c.toArray();
    int n = a.length;
    // If c.toArray incorrectly doesn't return Object[], copy it.
    if (a.getClass() != Object[].class)
        a = Arrays.copyOf(a, n, Object[].class);
    if (screen && (n == 1 || this.comparator != null)) {
        for (int i = 0; i < n; ++i)
            if (a[i] == null)
                throw new NullPointerException();
    }
    this.queue = a;
    this.size = n;
    // 执行堆有序化
    if (heapify)
        heapify();
}

上浮、下沉操作

PriorityBlockingQueue也是基于下沉、上浮操作来实现元素的入队和出队操作的,实现代码与PriorityQueue的实现完全相同,只是为了保证线程安全,上层方法调用时需要放在加锁的环境下执行。

  • siftUpComparable:上浮操作,通过默认的Comparable接口进行元素比较;队列中的元素必须实现Comparable接口;
  • siftUpUsingComparator:上浮操作,通过Comparator进行元素比较;
  • siftDownComparable:下沉操作,通过默认的Comparable接口进行元素比较;队列中的元素必须实现Comparable接口;
  • siftDownUsingComparator:下沉操作,通过Comparator进行元素比较;

入队操作

PriorityBlockingQueue的入队操作包括4个方法:

  • add:队列满时抛出异常;由于为无界队列,因而不会抛出异常;代码实现直接调用offer方法;
  • offer:队列满时返回false;由于为无界队列,因而不会返回false;
  • offer带超时参数:队列满时阻塞等待直至超时或者数组有空出位置;由于为无界队列,因而不会返回false、超时、阻塞;代码实现直接调用offer方法;
  • put:队列满时阻塞;由于为无界队列,因而不会阻塞;代码实现直接调用offer方法;
/**
 * 入队操作实现
 * 由于是无界队列,永远不会返回false;
 */
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    // 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    
    // 通过CAS操作动态扩容
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    
    // 将新增元素上浮到合适位置    
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        
        // 唤醒阻塞线程
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

/**
 * 由于是无界队列,永远不会被阻塞
 */
public void put(E e) {
    offer(e); // never need to block
}

/**
 * 由于是无界队列,因而不存在阻塞、超时和返回false的情况;
 */
public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e); // never need to block
}

public boolean add(E e) {
    return offer(e);
}

出队操作

PriorityBlockingQueue的出队操作包括4个方法,都是通过调用dequeue()方法实现:

  • dequeue:出队操作的具体实现,为保证线程安全,上层调用方法需要加锁;
  • take:队列为空时,阻塞直至有元素添加到队列中;
  • poll:队列为空时,直接返回null,不会阻塞;
  • poll带超时参数:队列为空时,阻塞直至超时或者有元素添加到队列中;
  • drainTo:批量从队首弹出元素到指定集合中,不会阻塞;
/**
 * 出队操作的具体实现,仅在有锁环境下调用;
 */
private E dequeue() {
    int n = size - 1;
    
    // 队列为空返回null;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0];
        
        // 将队尾元素提至队首位置
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        
        // 下沉队首元素到合适位置
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

/**
 * 队列为空时,阻塞;
 */
public E take() throws InterruptedException {
    // 加锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        // 如果队列为空,则阻塞;
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

/**
 * 队列为空时,返回null;
 */
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}

/**
 * 队列为空时,阻塞直至超时或者获取到元素;
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        // 如果队列为空,则阻塞直至超时或者获取到元素;
        while ( (result = dequeue()) == null && nanos > 0)
            nanos = notEmpty.awaitNanos(nanos);
    } finally {
        lock.unlock();
    }
    return result;
}

/**
 * 批量获取元素
 */
public int drainTo(Collection<? super E> c, int maxElements) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int n = Math.min(size, maxElements);
        
        // 循环遍历,不断弹出队首元素;
        for (int i = 0; i < n; i++) {
            c.add((E) queue[0]); // In this order, in case add() throws.
            dequeue();
        }
        return n;
    } finally {
        lock.unlock();
    }
}

public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

查找&删除元素

    PriorityBlockingQueue中查找元素的效率是偏低的,由于二叉堆并没有限制左右子节点的大小规则,因而需要变量整个数组进行查找,因而效率为$O(n)$。一些优先队列的实现会对此进行优化,给每个元素添加一个索引字段用于标记元素在堆数组中的位置,比如:ScheduledThreadPoolExecutor.DelayedWorkQueue通过ScheduledFutureTask中的heapIndex来标记任务在堆数组中的位置。

/**
 * 查找元素,效率O(n)
 */
private int indexOf(Object o) {
    if (o != null) {
        Object[] array = queue;
        int n = size;
        // 遍历数组查找,效率较低
        for (int i = 0; i < n; i++)
            if (o.equals(array[i]))
                return i;
    }
    return -1;
}

/**
 * 删除指定位置的元素
 */
private void removeAt(int i) {
    Object[] array = queue;
    int n = size - 1;
    
    // 如果索引位置在队尾,则删除队尾元素
    if (n == i) // removed last element
        array[i] = null;
    else {
        // 使用队尾元素替换删除元素的位置
        E moved = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        
        // 下沉元素到合适位置
        if (cmp == null)
            siftDownComparable(i, moved, array, n);
        else
            siftDownUsingComparator(i, moved, array, n, cmp);
        
        // 如果元素未下沉,则上浮到合适位置
        if (array[i] == moved) {
            if (cmp == null)
                siftUpComparable(i, moved, array);
            else
                siftUpUsingComparator(i, moved, array, cmp);
        }
    }
    size = n;
}

/**
 * 删除队列中的特定元素
 */
public boolean remove(Object o) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 查找元素
        int i = indexOf(o);
        if (i == -1)
            return false;
        
        // 删除元素
        removeAt(i);
        return true;
    } finally {
        lock.unlock();
    }
}

动态扩容

    在插入元素时,如果堆数组长度不足,则需要新建一个更长的数组,拷贝现有元素到新数组中,从而实现扩容的目的。为保证动态扩容不阻塞队列元素的出队,PriorityBlockingQueue通过CAS操作实现的自旋锁来控制扩容操作:使用allocationSpinLock来标记是否加锁,值为1时,表示加锁,值为0时,表示未加锁;


// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;

// 存放allocationSpinLock属性的偏移量,方便后面CAS更新
private static final long allocationSpinLockOffset;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = PriorityBlockingQueue.class;
        allocationSpinLockOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("allocationSpinLock"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

private void tryGrow(Object[] array, int oldCap) {
    // 扩容时,释放锁,防止阻塞出队操作
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    
    // 加锁,变更allocationSpinLock的值为1;
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
                                
        try {
            // 容量小于64,则每次容量+2,否则增加一倍
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
                                  
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
           
            // 如果没有其它线程扩容,则创建新数组;
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            // 解锁,因为只有一个线程到此,因而不需要CAS操作;
            allocationSpinLock = 0;
        }
    }
    
    // 如果此时有其它线程已经加锁,则让出执行权,等待其它线程执行完成
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
        
    // 加锁,替换现有数组的引用,拷贝数组元素;
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

总结

    PriorityBlockingQueue可以理解为public操作都加锁的PriorityQueue,通过排他锁保证了操作的线程安全。PriorityBlockingQueue扩容时,因为增加堆数组的长度并不影响队列中元素的出队操作,因而使用自旋CAS操作实现的锁来控制扩容操作,仅在数组引用替换和拷贝元素时才加锁,从而减少了扩容对出队操作的影响。自旋锁的实现思路以及应用场景值得我们学习借鉴。

目录
相关文章
|
15天前
|
SQL Java 数据库
2025 年 Java 从零基础小白到编程高手的详细学习路线攻略
2025年Java学习路线涵盖基础语法、面向对象、数据库、JavaWeb、Spring全家桶、分布式、云原生与高并发技术,结合实战项目与源码分析,助力零基础学员系统掌握Java开发技能,从入门到精通,全面提升竞争力,顺利进阶编程高手。
207 1
|
3月前
|
Java API 微服务
为什么虚拟线程将改变Java并发编程?
为什么虚拟线程将改变Java并发编程?
264 83
|
2月前
|
安全 Java 数据库连接
2025 年最新 Java 学习路线图含实操指南助你高效入门 Java 编程掌握核心技能
2025年最新Java学习路线图,涵盖基础环境搭建、核心特性(如密封类、虚拟线程)、模块化开发、响应式编程、主流框架(Spring Boot 3、Spring Security 6)、数据库操作(JPA + Hibernate 6)及微服务实战,助你掌握企业级开发技能。
276 3
|
16天前
|
Java 开发者
Java并发编程:CountDownLatch实战解析
Java并发编程:CountDownLatch实战解析
306 100
|
1月前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
86 16
|
26天前
|
NoSQL Java 关系型数据库
超全 Java 学习路线,帮你系统掌握编程的超详细 Java 学习路线
本文为超全Java学习路线,涵盖基础语法、面向对象编程、数据结构与算法、多线程、JVM原理、主流框架(如Spring Boot)、数据库(MySQL、Redis)及项目实战等内容,助力从零基础到企业级开发高手的进阶之路。
131 1
|
2月前
|
安全 算法 Java
Java泛型编程:类型安全与擦除机制
Java泛型详解:从基础语法到类型擦除机制,深入解析通配符与PECS原则,探讨运行时类型获取技巧及最佳实践,助你掌握泛型精髓,写出更安全、灵活的代码。
|
2月前
|
安全 Java Shell
Java模块化编程(JPMS)简介与实践
本文全面解析Java 9模块化系统(JPMS),帮助开发者解决JAR地狱、类路径冲突等常见问题,提升代码的封装性、性能与可维护性。内容涵盖模块化核心概念、module-info语法、模块声明、实战迁移、多模块项目构建、高级特性及最佳实践,同时提供常见问题和面试高频题解析,助你掌握Java模块化编程精髓,打造更健壮的应用。
|
2月前
|
Java
Java编程:理解while循环的使用
总结而言, 使用 while 迴圈可以有效解决需要多次重复操作直至特定條件被触发才停止執行任务场景下问题; 它简单、灵活、易于实现各种逻辑控制需求但同时也要注意防止因邏各错误导致無限迁璇発生及及時處理可能発生异常以确保程序稳定运作。
206 0

热门文章

最新文章