说一说 Java 并发队列原理剖析

简介: 我是小假 期待与你的下一次相遇 ~

PriorityBlockingQueue

PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高或最低的元素。内部使用二叉堆实现。

类图结构

PriorityBlockingQueue内部有一个数组queue,用来存放队列元素。allocationSpinLock是个自旋锁,通过CAS操作来保证同时只有一个线程可以扩容队列,状态为0或1。

由于这是一个优先队列,所以有一个comparator用来比较元素大小。

下面为构造函数:

  1. private static final int DEFAULT_INITIAL_CAPACITY = 11;
  2. public PriorityBlockingQueue() {
  3.    this(DEFAULT_INITIAL_CAPACITY, null);
  4. }
  5. public PriorityBlockingQueue(int initialCapacity) {
  6.    this(initialCapacity, null);
  7. }

可知默认队列容量为11,默认比较器为null,也就是使用元素的compareTo方法进行比较来确定元素的优先级,这意味着队列元素必须实现Comparable接口。

原理讲解

boolean offer()

  1. public boolean offer(E e) {
  2.    if (e == null)
  3.        throw new NullPointerException();
  4.    // 获取独占锁
  5.    final ReentrantLock lock = this.lock;
  6.    lock.lock();
  7.    int n, cap;
  8.    Object[] array;
  9.    // 扩容
  10.    while ((n = size) >= (cap = (array = queue).length))
  11.        tryGrow(array, cap);
  12.    try {
  13.        Comparator<? super E> cmp = comparator;
  14.        if (cmp == null)
  15.            // 通过对二叉堆的上浮操作保证最大或最小的元素总在根节点
  16.            siftUpComparable(n, e, array);
  17.        else
  18.            // 使用了自定义比较器
  19.            siftUpUsingComparator(n, e, array, cmp);
  20.        size = n + 1;
  21.        // 激活因调用take()方法被阻塞的线程
  22.        notEmpty.signal();
  23.    } finally {
  24.        // 释放锁
  25.        lock.unlock();
  26.    }
  27.    return true;
  28. }

流程比较简单,下面主要看扩容和建堆操作。

先看扩容。

  1. private void tryGrow(Object[] array, int oldCap) {
  2.    // 由前面的代码可知,调用tryGrow函数前先获取了独占锁,
  3.    // 由于扩容比较费时,此处先释放锁,
  4.    // 让其他线程可以继续操作(如果满足可操作的条件的话),
  5.    // 以提升并发性能
  6.    lock.unlock();
  7.    Object[] newArray = null;
  8.    // 通过allocationSpinLock保证同时最多只有一个线程进行扩容操作。
  9.    if (allocationSpinLock == 0 &&
  10.        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {
  11.        try {
  12.            // 当容量比较小时,一次只增加2容量
  13.            // 比较大时增加一倍
  14.            int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : (oldCap >> 1));
  15.            // 溢出检测
  16.            if (newCap - MAX_ARRAY_SIZE > 0) {
  17.                int minCap = oldCap + 1;
  18.                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
  19.                    throw new OutOfMemoryError();
  20.                newCap = MAX_ARRAY_SIZE;
  21.            }
  22.            if (newCap > oldCap && queue == array)
  23.                newArray = new Object[newCap];
  24.        } finally {
  25.            // 释放锁,没用CAS是因为同时最多有一个线程操作allocationSpinLock
  26.            allocationSpinLock = 0;
  27.        }
  28.    }
  29.    // 如果当前线程发现有其他线程正在对队列进行扩容,
  30.    // 则调用yield方法尝试让出CPU资源促使扩容操作尽快完成
  31.    if (newArray == null)
  32.        Thread.yield();
  33.    lock.lock();
  34.    if (newArray != null && queue == array) {
  35.        queue = newArray;
  36.        System.arraycopy(array, 0, newArray, 0, oldCap);
  37.    }
  38. }

下面来看建堆算法

  1. private static <T> void siftUpComparable(int k, T x, Object[] array) {
  2.    Comparable<? super T> key = (Comparable<? super T>) x;
  3.    while (k > 0) {
  4.        // 获取父节点,设子节点索引为k,
  5.        // 则由二叉堆的性质可知,父节点的索引总为(k - 1) >>> 1
  6.        int parent = (k - 1) >>> 1;
  7.        // 获取父节点对应的值
  8.        Object e = array[parent];
  9.        // 只有子节点的值小于父节点的值时才上浮
  10.        if (key.compareTo((T) e) >= 0)
  11.            break;
  12.        array[k] = e;
  13.        k = parent;
  14.    }
  15.    array[k] = key;
  16. }

如果了解二叉堆的话,此处代码是十分容易理解的。关于二叉堆,可参看《数据结构之二叉堆》。

E poll()

  1. public E poll() {
  2.    final ReentrantLock lock = this.lock;
  3.    lock.lock();
  4.    try {
  5.        // 出队
  6.        return dequeue();
  7.    } finally {
  8.        lock.unlock();
  9.    }
  10. }
  11. private E dequeue() {
  12.    int n = size - 1;
  13.    if (n < 0)
  14.        return null;
  15.    else {
  16.        Object[] array = queue;
  17.        E result = (E) array[0];
  18.        // 获取尾节点,在实现对二叉堆的下沉操作时要用到
  19.        E x = (E) array[n];
  20.        array[n] = null;
  21.        Comparator<? super E> cmp = comparator;
  22.        if (cmp == null)
  23.            // 下沉操作,保证取走最小的节点(根节点)后,新的根节点仍时最小的,二叉堆的性质依然满足
  24.            siftDownComparable(0, x, array, n);
  25.        else
  26.            // 使用自定义比较器
  27.            siftDownUsingComparator(0, x, array, n, cmp);
  28.        size = n;
  29.        return result;
  30.    }
  31. }

poll方法通过调用dequeue方法使最大或最小的节点出队并将其返回。

下面来看二叉堆的下沉操作。

  1. private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
  2.    if (n > 0) {
  3.        Comparable<? super T> key = (Comparable<? super T>)x;
  4.        int half = n >>> 1;
  5.        while (k < half) {
  6.            // child为两个子节点(如果有的话)中较小的那个对应的索引
  7.            int child = (k << 1) + 1;
  8.            Object c = array[child];
  9.            int right = child + 1;
  10.            // 通过比较保证child对应的为较小值的索引
  11.            if (right < n &&
  12.                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
  13.                c = array[child = right];
  14.            if (key.compareTo((T) c) <= 0)
  15.                break;
  16.            // 下沉,将较小的子节点换到父节点位置
  17.            array[k] = c;
  18.            k = child;
  19.        }
  20.        array[k] = key;
  21.    }
  22. }

同上,对下沉操作有疑问的话可参考上述文章。

void put(E e)

调用了offer

  1. public void put(E e){
  2.    offer(e);
  3. }

E take()

take操作的作用是获取二叉堆的根节点元素,如果队列为空则阻塞。

  1. public E take() throws InterruptedException {
  2.    final ReentrantLock lock = this.lock;
  3.    // 阻塞可被中断
  4.    lock.lockInterruptibly();
  5.    E result;
  6.    try {
  7.        // 队列为空就将当前线程放入notEmpty条件队列
  8.        // 使用while循环判断是为了避免虚假唤醒
  9.        while ( (result = dequeue()) == null)
  10.            notEmpty.await();
  11.    } finally {
  12.        lock.unlock();
  13.    }
  14.    return result;
  15. }

DelayQueue

DelayQueue并发队列是一个无界阻塞延迟队列,队列中的每一个元素都有一个过期时间,当从队列中获取元素时只有过期元素才会出列。队列头元素是最快要过期的元素。

类图结构

DelayQueue内部使用PriorityQueue存放数据,使用ReentrantLock实现线程同步。队列里的元素要实现Delayed接口(Delayed接口继承了Comparable接口),用以得到过期时间并进行过期时间的比较。

  1. public interface Delayed extends Comparable<Delayed> {
  2.    long getDelay(TimeUnit unit);
  3. }

available是由lock生成的条件变量,用以实现线程间的同步。

leader是leader-follower模式的变体,用于减少不必要的线程等待。当一个线程调用队列的take方法变为leader线程后,它会调用条件变量available.waitNanos(delay)等待delay时间,但是其他线程(follower)则会调用available.await()进行无限等待。leader线程延迟时间过期后,会退出take方法,并通过调用available.signal()方法唤醒一个follower线程,被唤醒的线程会被选举为新的leader线程。

原理讲解

boolean offer(E e)

  1. public boolean offer(E e) {
  2.    final ReentrantLock lock = this.lock;
  3.    lock.lock();
  4.    try {
  5.        // 添加新元素
  6.        q.offer(e);
  7.        // 查看新添加的元素是否为最先过期的
  8.        if (q.peek() == e) {
  9.            leader = null;
  10.            available.signal();
  11.        }
  12.        return true;
  13.    } finally {
  14.        lock.unlock();
  15.    }
  16. }

上述代码首先获取独占锁,然后添加元素到优先级队列,由于q是优先级队列,所以添加元素后,调用q.peek()方法返回的并不一定是当前添加的元素。当如果q.peek() == e,说明当前元素是最先要过期的,那么重置leader线程为null并激活available条件队列里的一个线程,告诉它队列里面有元素了。

E take()

获取并移除队列里面过期的元素,如果队列里面没有过期元素则等待。

  1. public E take() throws InterruptedException {
  2.    final ReentrantLock lock = this.lock;
  3.    // 可中断
  4.    lock.lockInterruptibly();
  5.    try {
  6.        for (;;) {
  7.            E first = q.peek();
  8.            // 为空则等待
  9.            if (first == null)
  10.                available.await();
  11.            else {
  12.                long delay = first.getDelay(NANOSECONDS);
  13.                // 过期则成功获取
  14.                if (delay <= 0)
  15.                    return q.poll();
  16.                // 执行到此处,说明头元素未过期    
  17.                first = null; // don't retain ref while waiting
  18.                // follower无限等待,直到被唤醒
  19.                if (leader != null)
  20.                    available.await();
  21.                else {
  22.                    Thread thisThread = Thread.currentThread();
  23.                    leader = thisThread;
  24.                    try {
  25.                        // leader等待lelay时间,则头元素必定已经过期
  26.                        available.awaitNanos(delay);
  27.                    } finally {
  28.                        // 重置leader,给follower称为leader的机会
  29.                        if (leader == thisThread)
  30.                            leader = null;
  31.                    }
  32.                }
  33.            }
  34.        }
  35.    } finally {
  36.        if (leader == null && q.peek() != null)
  37.            // 唤醒一个follower线程
  38.            available.signal();
  39.        lock.unlock();
  40.    }
  41. }

一个线程调用take方法时,会首先查看头元素是否为空,为空则直接等待,否则判断是否过期。若头元素已经过期,则直接通过poll获取并移除,否则判断是否有leader线程。若有leader线程则一直等待,否则自己成为leader并等待头元素过期。

E poll()

获取并移除头过期元素,如果没有过期元素则返回null

  1. public E poll() {
  2.    final ReentrantLock lock = this.lock;
  3.    lock.lock();
  4.    try {
  5.        E first = q.peek();
  6.        // 若队列为空或没有元素过期则直接返回null
  7.        if (first == null || first.getDelay(NANOSECONDS) > 0)
  8.            return null;
  9.        else
  10.            return q.poll();
  11.    } finally {
  12.        lock.unlock();
  13.    }
  14. }

int size()

计算队列元素个数,包含过期的和未过期的。

  1. public int size() {
  2.    final ReentrantLock lock = this.lock;
  3.    lock.lock();
  4.    try {
  5.        return q.size();
  6.    } finally {
  7.        lock.unlock();
  8.    }
  9. }


相关文章
|
5月前
|
存储 缓存 Java
我们来详细讲一讲 Java NIO 底层原理
我是小假 期待与你的下一次相遇 ~
203 2
|
4月前
|
监控 Java API
现代 Java IO 高性能实践从原理到落地的高效实现路径与实战指南
本文深入解析现代Java高性能IO实践,涵盖异步非阻塞IO、操作系统优化、大文件处理、响应式网络编程与数据库访问,结合Netty、Reactor等技术落地高并发应用,助力构建高效可扩展的IO系统。
142 0
|
6月前
|
存储 缓存 Java
【高薪程序员必看】万字长文拆解Java并发编程!(5):深入理解JMM:Java内存模型的三大特性与volatile底层原理
JMM,Java Memory Model,Java内存模型,定义了主内存,工作内存,确保Java在不同平台上的正确运行主内存Main Memory:所有线程共享的内存区域,所有的变量都存储在主存中工作内存Working Memory:每个线程拥有自己的工作内存,用于保存变量的副本.线程执行过程中先将主内存中的变量读到工作内存中,对变量进行操作之后再将变量写入主内存,jvm概念说明主内存所有线程共享的内存区域,存储原始变量(堆内存中的对象实例和静态变量)工作内存。
226 0
|
5月前
|
存储 算法 安全
Java中的对称加密算法的原理与实现
本文详细解析了Java中三种常用对称加密算法(AES、DES、3DES)的实现原理及应用。对称加密使用相同密钥进行加解密,适合数据安全传输与存储。AES作为现代标准,支持128/192/256位密钥,安全性高;DES采用56位密钥,现已不够安全;3DES通过三重加密增强安全性,但性能较低。文章提供了各算法的具体Java代码示例,便于快速上手实现加密解密操作,帮助用户根据需求选择合适的加密方案保护数据安全。
413 58
|
4月前
|
人工智能 安全 Java
Go与Java泛型原理简介
本文介绍了Go与Java泛型的实现原理。Go通过单态化为不同类型生成函数副本,提升运行效率;而Java则采用类型擦除,将泛型转为Object类型处理,保持兼容性但牺牲部分类型安全。两种机制各有优劣,适用于不同场景。
157 24
|
5月前
|
XML JSON Java
Java 反射:从原理到实战的全面解析与应用指南
本文深度解析Java反射机制,从原理到实战应用全覆盖。首先讲解反射的概念与核心原理,包括类加载过程和`Class`对象的作用;接着详细分析反射的核心API用法,如`Class`、`Constructor`、`Method`和`Field`的操作方法;最后通过动态代理和注解驱动配置解析等实战场景,帮助读者掌握反射技术的实际应用。内容翔实,适合希望深入理解Java反射机制的开发者。
538 13
|
4月前
|
存储 缓存 安全
深入讲解 Java 并发编程核心原理与应用案例
本教程全面讲解Java并发编程,涵盖并发基础、线程安全、同步机制、并发工具类、线程池及实际应用案例,助你掌握多线程开发核心技术,提升程序性能与响应能力。
209 0
|
5月前
|
安全 Java 编译器
JD-GUI,java反编译工具及原理: JavaDecompiler一个Java反编译器
Java Decompiler (JD-GUI) 是一款由 Pavel Kouznetsov 开发的图形化 Java 反编译工具,支持 Windows、Linux 和 Mac Os。它能将 `.class` 文件反编译为 Java 源代码,支持多文件标签浏览、高亮显示,并兼容 Java 5 及以上版本。JD-GUI 支持对整个 Jar 文件进行反编译,可跳转源码,适用于多种 JDK 和编译器。其原理基于将字节码转换为抽象语法树 (AST),再通过反编译生成代码。尽管程序可能带来安全风险,但可通过代码混淆降低可读性。最新版修复了多项识别错误并优化了内存管理。
2893 1
|
6月前
|
存储 安全 Java
深入探究Java中ThreadLocal的工作原理和用途
总结起来,ThreadLocal是Java多线程编程中一个非常有用的工具,通过为每个线程分配独立的变量副本,实现线程隔离,避免资
154 9