10分钟搞定 Java 并发队列好吗?好的(上)

简介: 10分钟搞定 Java 并发队列好吗?好的(上)

| 好看请赞,养成习惯


  • 你有一个思想,我有一个思想,我们交换后,一个人就有两个思想
  • If you can NOT explain it simply, you do NOT understand it well enough


现陆续将Demo代码和技术文章整理在一起 Github实践精选 ,方便大家阅读查看,本文同样收录在此,觉得不错,还请Star


前言


如果按照用途与特性进行粗略的划分,JUC 包中包含的工具大体可以分为 6 类:


  1. 执行者与线程池


  1. 并发队列


  1. 同步工具


  1. 并发集合



  1. 原子变量


并发系列中,主要讲解了 执行者与线程池同步工具 , 在分析源码时,或多或少的提及到了「队列」,队列在 JUC 中也是多种多样存在,所以本文就以「远看」视角,帮助大家快速了解与区分这些看似「杂乱」的队列


并发队列


Java 并发队列按照实现方式来进行划分可以分为 2 种:


  1. 阻塞队列


  1. 非阻塞队列


如果你已经看完并发系列锁的实现,你已经能够知道他们实现的区别:


前者就是基于锁实现的,后者则是基于 CAS 非阻塞算法实现的


常见的队列有下面这几种:


微信图片_20220511133944.png


瞬间懵逼?看到这个没有人性的图想直接走人? 客观先别急,一会就柳暗花明了

当下你也许有个问题:


为什么会有这么多种队列的存在


锁有应对各种情形的锁,队列也自然有应对各种情形的队列了, 是不是也有点单一职责原则的意思呢?


所以我们要了解这些队列到底是怎么设计的?以及用在了哪些地方?


先来看下图


微信图片_20220511134034.png


如果你在 IDE 中打开以上非阻塞队列和阻塞队列,查看其实现方法,你就会发现,阻塞队列非阻塞队列额外支持两种操作


  1. 阻塞的插入当队列满时,队列会阻塞插入元素的线程,直到队列不满


  1. 阻塞的移除当队列为空时,获取元素的线程会阻塞,直到队列变为非空


综合说明入队/出队操作,看似杂乱的方法,用一个表格就能概括了


微信图片_20220511134110.png


抛出异常


  • 当队列满时,此时如果再向队列中插入元素,会抛出 IllegalStateException (这很好理解)


  • 当队列空时,此时如果再从队列中获取元素,会抛出 NoSuchElementException (这也很好理解)


返回特殊值


  • 当向队列插入元素时,会返回元素是否插入成功,成功则返回 true


  • 当从队列移除元素时,如果没有则返回 null


一直阻塞


  • 当队列满时,如果生产者线程向队列 put 元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出


  • 当队列为空时,如果消费者线程 从队列里面 take 元素,队列会阻塞消费者线程,直到队列不为空


关于阻塞,我们其实早在 并发编程之等待通知机制 就已经充分说明过了,你还记得下面这张图吗?原理其实是一样一样滴


微信图片_20220511134154.png

超时退出


和锁一样,因为有阻塞,为了灵活使用,就一定支持超时退出,阻塞时间达到超时时间,就会直接返回


至于为啥插入和移除这么多种单词表示形式,我也不知道,为了方便记忆,只需要记住阻塞的方法形式即可:


单词 puttake 字母 t 首位相连,一个放,一个拿


到这里你应该对 Java 并发队列有了个初步的认识了,原来看似杂乱的方法貌似也有了规律。接下来就到了疯狂串知识点的时刻了,借助前序章节的知识,分分钟就理解全部队列了


微信图片_20220511134224.gif


ArrayBlockingQueue


之前也说过,JDK中的命名还是很讲究滴,一看这名字,底层就是数组实现了,是否有界,那就看在构造的时候是否需要指定 capacity 值了


填鸭式的说明也容易忘,这些都是哪看到的呢?在所有队列的 Java docs 的第一段,一句话就概括了该队列的主要特性,所以强烈建议大家自己在看源码时,简单瞄一眼 docs 开头,心中就有多半个数了


微信图片_20220511134250.png


在讲 Java AQS队列同步器以及ReentrantLock的应用 时我们介绍了公平锁与非公平锁的概念,ArrayBlockingQueue 也有同样的概念,看它的构造方法,就有 ReentrantLock 来辅助实现


public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}


默认情况下,依旧是不保证线程公平访问队列(公平与否是指阻塞的线程能否按照阻塞的先后顺序访问队列,先阻塞线访问,后阻塞后访问)


到这我也要临时问一个说过多次的面试送分题了:


为什么默认采用非公平锁的方式?它较公平锁方式有什么好处,又可能带来哪些问题?


知道了以上内容,结合上面表格中的方法,ArrayBlockingQueue 就可以轻松过关了


微信图片_20220511134331.png


和数组相对的自然是链表了


LinkedBlockingQueue


微信图片_20220511134355.png


LinkedBlockingQueue 也算是一个有界阻塞队列 ,从下面的构造函数中你也可以看出,该队列的默认和最大长度为 Integer.MAX_VALUE ,这也就 docs 说 optionally-bounded 的原因了


public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
  if (capacity <= 0) throw new IllegalArgumentException();
  this.capacity = capacity;
  last = head = new Node<E>(null);
}


正如 Java 集合一样,链表形式的队列,其存取效率要比数组形式的队列高。但是在一些并发程序中,数组形式的队列由于具有一定的可预测性,因此可以在某些场景中获得更高的效率


看到 LinkedBlockingQueue 是不是也有些熟悉呢? 为什么要使用线程池? 就已经和它多次照面了


创建单个线程池


public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}


创建固定个数线程池


public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}


面试送分题又来了


使用 Executors 创建线程池很简单,为什么大厂严格要求禁用这种创建方式呢?


PriorityBlockingQueue


PriorityBlockingQueue 是一个支持优先级的无界的阻塞队列,默认情况下采用自然顺序升序排列,当然也有非默认情况自定义优先级,需要排序,那自然要用到 Comparator 来定义排序规则了


微信图片_20220511134520.png


可以定义优先级,自然也就有相应的限制,以及使用的注意事项


  • 按照上图说明,队列中不允许存在 null 值,也不允许存在不能排序的元素


  • 对于排序值相同的元素,其序列是不保证的,但你可以继续自定义其他可以区分出来优先级的值,如果你有严格的优先级区分,建议有更完善的比较规则,就像 Java docs 这样


 class FIFOEntry<E extends Comparable<? super E>>
     implements Comparable<FIFOEntry<E>> {
   static final AtomicLong seq = new AtomicLong(0);
   final long seqNum;
   final E entry;
   public FIFOEntry(E entry) {
     seqNum = seq.getAndIncrement();
     this.entry = entry;
   }
   public E getEntry() { return entry; }
   public int compareTo(FIFOEntry<E> other) {
     int res = entry.compareTo(other.entry);
     if (res == 0 && other.entry != this.entry)
       res = (seqNum < other.seqNum ? -1 : 1);
     return res;
   }
 }
  • 队列容量是没有上限的,但是如果插入的元素超过负载,有可能会引起OutOfMemory异常(这是肯定的),这也是为什么我们通常所说,队列无界,心中有界


  • PriorityBlockingQueue 也有 put 方法,这是一个阻塞的方法,因为它是无界的,自然不会阻塞,所以就有了下面比较聪明的做法


public void put(E e) {
    offer(e); // never need to block  请自行对照上面表格
}


可以给定初始容量,这个容量会按照一定的算法自动扩充


// Default array capacity.
private static final int DEFAULT_INITIAL_CAPACITY = 11;
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}


  • 这里默认的容量是 11,由于也是基于数组,那面试送分题又来了


你通常是怎样定义容器/集合初始容量的?有哪些依据?


DelayQueue


DelayQueue 是一个支持延时获取元素的无界阻塞队列


  • 是否延时肯定是和某个时间(通常和当前时间) 进行比较


  • 比较过后还要进行排序,所以也是存在一定的优先级


看到这也许觉得这有点和 PriorityBlockingQueue 很像,没错,DelayQueue 的内部也是使用 PriorityQueue


微信图片_20220511134709.png


上图绿色框线也告诉你,DelayQueue 队列的元素必须要实现 Depayed 接口:


微信图片_20220511134729.png


所以从上图可以看出使用 DelayQueue 非常简单,只需要两步:


实现 getDelay() 方法,返回元素要延时多长时间


public long getDelay(TimeUnit unit) {
      // 最好采用纳秒形式,这样更精确
    return unit.convert(time - now(), NANOSECONDS);
}


实现 compareTo() 方法,比较元素顺序


public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}


上面的代码哪来的呢?如果你打开 ScheduledThreadPoolExecutor 里的 ScheduledFutureTask,你就看到了 (ScheduledThreadPoolExecutor 内部就是应用 DelayQueue)


所以综合来说,下面两种情况非常适合使用 DelayQueue


  • 缓存系统的设计:用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,如果能从 DelayQueue 中获取元素,说明缓存有效期到了


  • 定时任务调度:用 DelayQueue 保存当天会执行的任务以及时间,如果能从 DelayQueue 中获取元素,任务就可以开始执行了。比如 TimerQueue 就是这样实现的
目录
打赏
0
0
0
0
1
分享
相关文章
JAVA线程池有哪些队列? 以及它们的适用场景案例
不同的线程池队列有着各自的特点和适用场景,在实际使用线程池时,需要根据具体的业务需求、系统资源状况以及对任务执行顺序、响应时间等方面的要求,合理选择相应的队列来构建线程池,以实现高效的任务处理。
99 12
揭秘JAVA深渊:那些让你头大的最晦涩知识点,从泛型迷思到并发陷阱,你敢挑战吗?
【8月更文挑战第22天】Java中的难点常隐藏在其高级特性中,如泛型与类型擦除、并发编程中的内存可见性及指令重排,以及反射与动态代理等。这些特性虽强大却也晦涩,要求开发者深入理解JVM运作机制及计算机底层细节。例如,泛型在编译时检查类型以增强安全性,但在运行时因类型擦除而丢失类型信息,可能导致类型安全问题。并发编程中,内存可见性和指令重排对同步机制提出更高要求,不当处理会导致数据不一致。反射与动态代理虽提供运行时行为定制能力,但也增加了复杂度和性能开销。掌握这些知识需深厚的技术底蕴和实践经验。
116 2
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面
本文介绍了如何构建高效稳定的Java数据库连接池,涵盖连接池配置、并发控制和异常处理等方面。通过合理配置初始连接数、最大连接数和空闲连接超时时间,确保系统性能和稳定性。文章还探讨了同步阻塞、异步回调和信号量等并发控制策略,并提供了异常处理的最佳实践。最后,给出了一个简单的连接池示例代码,并推荐使用成熟的连接池框架(如HikariCP、C3P0)以简化开发。
77 2
|
3月前
|
【编程进阶知识】揭秘Java多线程:并发与顺序编程的奥秘
本文介绍了Java多线程编程的基础,通过对比顺序执行和并发执行的方式,展示了如何使用`run`方法和`start`方法来控制线程的执行模式。文章通过具体示例详细解析了两者的异同及应用场景,帮助读者更好地理解和运用多线程技术。
51 1
|
3月前
|
【用Java学习数据结构系列】探索栈和队列的无尽秘密
【用Java学习数据结构系列】探索栈和队列的无尽秘密
45 2
JAVA并发编程系列(10)Condition条件队列-并发协作者
本文通过一线大厂面试真题,模拟消费者-生产者的场景,通过简洁的代码演示,帮助读者快速理解并复用。文章还详细解释了Condition与Object.wait()、notify()的区别,并探讨了Condition的核心原理及其实现机制。
|
3月前
|
【用Java学习数据结构系列】用堆实现优先级队列
【用Java学习数据结构系列】用堆实现优先级队列
48 0
java中的队列
这篇文章通过Java代码示例介绍了使用数组实现队列操作,包括队列的初始化、入队、出队、判断队列满和空以及遍历队列的方法。
java中的队列
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等