java并发编程笔记--PriorityBlockingQueue实现

简介:     PriorityBlockingQueue可以理解为线程安全的PriorityQueue,其实现原理与PriorityQueue类似,在此基础上实现了BlockingQueue接口,能够作为阻塞队列使用,由于PriorityBlockingQueue是无界队列,因而使用put方法并不会阻塞,offer方法不会返回false。

    PriorityBlockingQueue可以理解为线程安全的PriorityQueue,其实现原理与PriorityQueue类似,在此基础上实现了BlockingQueue接口,能够作为阻塞队列使用,由于PriorityBlockingQueue是无界队列,因而使用put方法并不会阻塞,offer方法不会返回false。PriorityBlockingQueue也是基于最小二叉堆实现,对于堆数组中索引为k的节点,其父节点为(k-1)/2,其左右子节点分别为2k+1,2k+2。PriorityBlockingQueue使用ReentrantLock来控制所有公用操作的线程同步,使用基于CAS实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行。

类图

image

PriorityBlockingQueue的继承层次如上图:

  • Collection:所有集合的基类,定义集合的基本操作API;
  • AbstractCollection:实现公用的集合操作;
  • Iterable:赋予集合迭代器的能力;
  • Queue:定义队列的基本操作API,比如:offer、poll等;
  • AbstractQueue:实现公用的队列操作;
  • BlockingQueue:定义阻塞队列的基本操作API,比如:put、take等;
  • PriorityBlockingQueue:阻塞队列的实现类,继承AbstractQueue类,并实现BlockingQueue接口;

实现

主要成员变量

/**
 * 存放最小二叉堆的数组
 */
private transient Object[] queue;

/**
 * 优先队列中包含元素个数
 */
private transient int size;

/**
 * 比较器,用于定制元素比较规则;
 */
private transient Comparator<? super E> comparator;

/**
 * 用于同步队列操作的锁
 */
private final ReentrantLock lock;

/**
 * 当队列为空时,用于阻塞出队操作
 */
private final Condition notEmpty;

/**
 * 自旋锁标识字段,通过CAS操作进行比较更新;
 * 用于动态扩容操作;值为1时,表示加锁;为0时,标识未加锁;
 */
private transient volatile int allocationSpinLock;

可以看到PriorityBlockingQueue的成员变量和PriorityQueue的成员变量相差不大,多了3个用于控制线程安全的属性:lock,notEmpty,allocationSpinLock,分别用于队列公有操作、出队阻塞、动态扩容。

初始化

PriorityBlockingQueue的初始化同PriorityQueue的类似,多了对lock、notEmpty的初始化。

public PriorityBlockingQueue() {
    // 默认容量为11
    this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

public PriorityBlockingQueue(Collection<? extends E> c) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    boolean heapify = true; // true if not known to be in heap order
    boolean screen = true;  // true if must screen for nulls
    // 如果传入集合是有序集,则无须进行堆有序化
    if (c instanceof SortedSet<?>) {
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
        heapify = false;
    }
    // 如果传入集合是PriorityBlockingQueue类型,则不进行堆有序化
    else if (c instanceof PriorityBlockingQueue<?>) {
        PriorityBlockingQueue<? extends E> pq =
            (PriorityBlockingQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
        screen = false;
        if (pq.getClass() == PriorityBlockingQueue.class) // exact match
            heapify = false;
    }
    Object[] a = c.toArray();
    int n = a.length;
    // If c.toArray incorrectly doesn't return Object[], copy it.
    if (a.getClass() != Object[].class)
        a = Arrays.copyOf(a, n, Object[].class);
    if (screen && (n == 1 || this.comparator != null)) {
        for (int i = 0; i < n; ++i)
            if (a[i] == null)
                throw new NullPointerException();
    }
    this.queue = a;
    this.size = n;
    // 执行堆有序化
    if (heapify)
        heapify();
}

上浮、下沉操作

PriorityBlockingQueue也是基于下沉、上浮操作来实现元素的入队和出队操作的,实现代码与PriorityQueue的实现完全相同,只是为了保证线程安全,上层方法调用时需要放在加锁的环境下执行。

  • siftUpComparable:上浮操作,通过默认的Comparable接口进行元素比较;队列中的元素必须实现Comparable接口;
  • siftUpUsingComparator:上浮操作,通过Comparator进行元素比较;
  • siftDownComparable:下沉操作,通过默认的Comparable接口进行元素比较;队列中的元素必须实现Comparable接口;
  • siftDownUsingComparator:下沉操作,通过Comparator进行元素比较;

入队操作

PriorityBlockingQueue的入队操作包括4个方法:

  • add:队列满时抛出异常;由于为无界队列,因而不会抛出异常;代码实现直接调用offer方法;
  • offer:队列满时返回false;由于为无界队列,因而不会返回false;
  • offer带超时参数:队列满时阻塞等待直至超时或者数组有空出位置;由于为无界队列,因而不会返回false、超时、阻塞;代码实现直接调用offer方法;
  • put:队列满时阻塞;由于为无界队列,因而不会阻塞;代码实现直接调用offer方法;
/**
 * 入队操作实现
 * 由于是无界队列,永远不会返回false;
 */
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    // 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    
    // 通过CAS操作动态扩容
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    
    // 将新增元素上浮到合适位置    
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        
        // 唤醒阻塞线程
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

/**
 * 由于是无界队列,永远不会被阻塞
 */
public void put(E e) {
    offer(e); // never need to block
}

/**
 * 由于是无界队列,因而不存在阻塞、超时和返回false的情况;
 */
public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e); // never need to block
}

public boolean add(E e) {
    return offer(e);
}

出队操作

PriorityBlockingQueue的出队操作包括4个方法,都是通过调用dequeue()方法实现:

  • dequeue:出队操作的具体实现,为保证线程安全,上层调用方法需要加锁;
  • take:队列为空时,阻塞直至有元素添加到队列中;
  • poll:队列为空时,直接返回null,不会阻塞;
  • poll带超时参数:队列为空时,阻塞直至超时或者有元素添加到队列中;
  • drainTo:批量从队首弹出元素到指定集合中,不会阻塞;
/**
 * 出队操作的具体实现,仅在有锁环境下调用;
 */
private E dequeue() {
    int n = size - 1;
    
    // 队列为空返回null;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0];
        
        // 将队尾元素提至队首位置
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        
        // 下沉队首元素到合适位置
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

/**
 * 队列为空时,阻塞;
 */
public E take() throws InterruptedException {
    // 加锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        // 如果队列为空,则阻塞;
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

/**
 * 队列为空时,返回null;
 */
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}

/**
 * 队列为空时,阻塞直至超时或者获取到元素;
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        // 如果队列为空,则阻塞直至超时或者获取到元素;
        while ( (result = dequeue()) == null && nanos > 0)
            nanos = notEmpty.awaitNanos(nanos);
    } finally {
        lock.unlock();
    }
    return result;
}

/**
 * 批量获取元素
 */
public int drainTo(Collection<? super E> c, int maxElements) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int n = Math.min(size, maxElements);
        
        // 循环遍历,不断弹出队首元素;
        for (int i = 0; i < n; i++) {
            c.add((E) queue[0]); // In this order, in case add() throws.
            dequeue();
        }
        return n;
    } finally {
        lock.unlock();
    }
}

public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

查找&删除元素

    PriorityBlockingQueue中查找元素的效率是偏低的,由于二叉堆并没有限制左右子节点的大小规则,因而需要变量整个数组进行查找,因而效率为$O(n)$。一些优先队列的实现会对此进行优化,给每个元素添加一个索引字段用于标记元素在堆数组中的位置,比如:ScheduledThreadPoolExecutor.DelayedWorkQueue通过ScheduledFutureTask中的heapIndex来标记任务在堆数组中的位置。

/**
 * 查找元素,效率O(n)
 */
private int indexOf(Object o) {
    if (o != null) {
        Object[] array = queue;
        int n = size;
        // 遍历数组查找,效率较低
        for (int i = 0; i < n; i++)
            if (o.equals(array[i]))
                return i;
    }
    return -1;
}

/**
 * 删除指定位置的元素
 */
private void removeAt(int i) {
    Object[] array = queue;
    int n = size - 1;
    
    // 如果索引位置在队尾,则删除队尾元素
    if (n == i) // removed last element
        array[i] = null;
    else {
        // 使用队尾元素替换删除元素的位置
        E moved = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        
        // 下沉元素到合适位置
        if (cmp == null)
            siftDownComparable(i, moved, array, n);
        else
            siftDownUsingComparator(i, moved, array, n, cmp);
        
        // 如果元素未下沉,则上浮到合适位置
        if (array[i] == moved) {
            if (cmp == null)
                siftUpComparable(i, moved, array);
            else
                siftUpUsingComparator(i, moved, array, cmp);
        }
    }
    size = n;
}

/**
 * 删除队列中的特定元素
 */
public boolean remove(Object o) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 查找元素
        int i = indexOf(o);
        if (i == -1)
            return false;
        
        // 删除元素
        removeAt(i);
        return true;
    } finally {
        lock.unlock();
    }
}

动态扩容

    在插入元素时,如果堆数组长度不足,则需要新建一个更长的数组,拷贝现有元素到新数组中,从而实现扩容的目的。为保证动态扩容不阻塞队列元素的出队,PriorityBlockingQueue通过CAS操作实现的自旋锁来控制扩容操作:使用allocationSpinLock来标记是否加锁,值为1时,表示加锁,值为0时,表示未加锁;


// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;

// 存放allocationSpinLock属性的偏移量,方便后面CAS更新
private static final long allocationSpinLockOffset;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = PriorityBlockingQueue.class;
        allocationSpinLockOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("allocationSpinLock"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

private void tryGrow(Object[] array, int oldCap) {
    // 扩容时,释放锁,防止阻塞出队操作
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    
    // 加锁,变更allocationSpinLock的值为1;
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
                                
        try {
            // 容量小于64,则每次容量+2,否则增加一倍
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
                                  
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
           
            // 如果没有其它线程扩容,则创建新数组;
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            // 解锁,因为只有一个线程到此,因而不需要CAS操作;
            allocationSpinLock = 0;
        }
    }
    
    // 如果此时有其它线程已经加锁,则让出执行权,等待其它线程执行完成
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
        
    // 加锁,替换现有数组的引用,拷贝数组元素;
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

总结

    PriorityBlockingQueue可以理解为public操作都加锁的PriorityQueue,通过排他锁保证了操作的线程安全。PriorityBlockingQueue扩容时,因为增加堆数组的长度并不影响队列中元素的出队操作,因而使用自旋CAS操作实现的锁来控制扩容操作,仅在数组引用替换和拷贝元素时才加锁,从而减少了扩容对出队操作的影响。自旋锁的实现思路以及应用场景值得我们学习借鉴。

目录
相关文章
|
1月前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
32 0
|
1月前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!
|
15天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
19天前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
54 12
|
15天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
98 2
|
2月前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
2月前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####
|
1月前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
1月前
|
安全 Java 编译器
Kotlin教程笔记(27) -Kotlin 与 Java 共存(二)
Kotlin教程笔记(27) -Kotlin 与 Java 共存(二)
|
1月前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
51 3