优先级队列和二叉堆详解(下)

简介: PriorityBlockingQueue 是一个支持优先级的无界阻塞队列,直到系统资源耗尽。默认情况下元素采用自然顺序升序排列。

入队


add() 和 offer()


add(E e)offer(E e) 的目的相同,都是向优先队列中插入元素,只是 Queue 接口规定二者对插入失败时的处理不同,前者在插入失败时抛出异常,后则则会返回false。对于 PriorityQueue 这两个方法其实没什么差别,我们可以针对不同的场景进行使用。 下面我们以 offer 为例,一起看看它是如何实现的。


public boolean offer(E e) {
    // 如果为 null, 返回 NPE . 所以不能添加 null 元素
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    int n, cap;
    Object[] array;
    // 队列满了,就进行拓容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            // 比较器为空,自然排序
            siftUpComparable(n, e, array);
        else
            // 将元素加入堆中
            siftUpUsingComparator(n, e, array, cmp);
        // 元素格式加 1
        size = n + 1;
        // 唤醒读线程
        notEmpty.signal();
    } finally {
        // 解锁
        lock.unlock();
    }
    return true;
}


tryGrow  拓容


按照上面的分析,如果当队列满了,写入队列的线程就需要调用  tryGrow 方法对队列进行拓容。拓容成功后在想队列添加新元素。下面我们一起来看看 tryGrow 方法的具体实现:


private void tryGrow(Object[] array, int oldCap) {
    // 释放锁
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    // CAS 获取锁
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        // 拓容
        try {
            // 计算新的长度
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            // 如果新容量大于 MAX_ARRAY_SIZE 可能内存溢出
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                // 到达最大值,拓容失败
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            // 并发情况下,被其他线程已经拓容,那么本次不在拓容
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            // 释放锁
            allocationSpinLock = 0;
        }
    }
    // 如果有写线程正在拓容量让出 cpu
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    // 解锁
    lock.lock();
    // 再次判断是否是首个拓容线程的写线程,如果是就将阻塞队列,指向新拓容得新数组得引用得新队列
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}


添加元素到堆得方式 siftUpComparablesiftUpUsingComparaor 区别只是自然排序还是比较器排序, 下面以 siftUpcomparable 方法


private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                              Comparator<? super T> cmp) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (cmp.compare(x, (T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = x;
}


出队


出队一般调用 take , 或者 poll , remove 方法,下面是 take 方法得实现。


public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        // 如果没有拿到 await 
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    // 返回
    return result;
}


从上面可以看出,咱们出队列的核心方法是 dequeue()


private E dequeue() {
    int n = size - 1;
    // 为空
    if (n < 0)
        return null;
    else {
        // 非空堆顶取出数据
        Object[] array = queue;
        E result = (E) array[0];
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        // 调整堆
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}


总结


阻塞队列 PriorityBlockingQueue 不阻塞写线程,当队列满的时候,写线程会尝试拓容会造成队列阻塞,拓展容量成功后再向队列中新增元素;而当队列元素为空时,会阻塞读线程,当然也有非阻塞方法的 poll , 该阻塞队列适用于读多写少的场景,写的线程多,堆导致内存消耗过大。性能影响,队列采用堆存储结构,因此每次从阻塞队列取出的元素总是最小元素(或者最大元素)。而且堆存储需要提供比较器或者元素实现比较器接口,否则程序会抛出 ClassCastException。


参考资料






相关文章
|
6月前
|
算法 数据处理 调度
【C++ 优先队列】了解 C++优先队列中操作符重载的实现
【C++ 优先队列】了解 C++优先队列中操作符重载的实现
89 0
|
6月前
堆(优先级队列 PriorityQueue)
堆(优先级队列 PriorityQueue)
41 0
|
6月前
|
存储 Java
优先级队列(堆)
优先级队列(堆)
47 3
|
6月前
|
设计模式 算法 调度
【C++】开始使用优先队列
优先队列的使用非常灵活,它适合于任何需要动态调整元素优先级和快速访问最高(或最低)优先级元素的场景。在使用时,需要注意其插入和删除操作的时间复杂度,以及如何根据实际需求选择合适的仿函数。
58 4
|
6月前
|
存储 安全 Java
优先级队列
优先级队列
|
6月前
|
存储 算法 安全
堆 和 优先级队列(超详细讲解,就怕你学不会)
堆 和 优先级队列(超详细讲解,就怕你学不会)
|
6月前
|
存储 安全
堆与优先级队列
堆与优先级队列
40 0
|
存储 算法
优先级队列(堆)&&  堆排序
优先级队列(堆)&&  堆排序
50 0
|
存储
优先级队列详解
优先级队列详解
114 0
|
存储 C++ 容器
深入了解C++优先队列
在计算机科学中,优先队列是一种抽象数据类型,它与队列相似,但是每个元素都有一个相关的优先级。C++中的优先队列是一个容器适配器(container adapter),它提供了一种在元素之间维护优先级的方法。
187 0