10分钟搞定 Java 并发队列好吗?好的(下)

简介: 10分钟搞定 Java 并发队列好吗?好的(下)

SynchronousQueue


微信图片_20220511135009.png

这是一个不存储元素的阻塞队列,不存储元素还叫队列?


微信图片_20220511135036.gif


没错,SynchronousQueue 直译过来叫同步队列,如果在队列里面呆久了应该就算是“异步”了吧


所以使用它,每个put() 操作必须要等待一个 take() 操作,反之亦然,否则不能继续添加元素


实际中怎么用呢?假如你需要两个线程之间同步共享变量,如果不用 SynchronousQueue 你可能会选择用 CountDownLatch 来完成,就像这样:


ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);
Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    sharedState.set(producedElement);
    countDownLatch.countDown();
};
Runnable consumer = () -> {
    try {
        countDownLatch.await();
        Integer consumedElement = sharedState.get();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};


这点小事就用计数器来实现,显然很不合适,用 SynchronousQueue 改造一下,感觉瞬间就不一样了


ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    try {
        queue.put(producedElement);
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};
Runnable consumer = () -> {
    try {
        Integer consumedElement = queue.take();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};


其实 Executors.newCachedThreadPool() 方法里面使用的就是 SynchronousQueue


public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}


看到前面 LinkedBlockingQueue 用在 newSingleThreadExecutornewFixedThreadPool 上,而 newCachedThreadPool 却用 SynchronousQueue,这是为什么呢?


因为单线程池和固定线程池中,线程数量是有限的,因此提交的任务需要在LinkedBlockingQueue队列中等待空余的线程;


而缓存线程池中,线程数量几乎无限(上限为Integer.MAX_VALUE),因此提交的任务只需要在SynchronousQueue 队列中同步移交给空余线程即可, 所以有时也会说 SynchronousQueue 的吞吐量要高于 LinkedBlockingQueueArrayBlockingQueue


LinkedTransferQueue


简单来说,TransferQueue提供了一个场所,生产者线程使用 transfer 方法传入一些对象并阻塞,直至这些对象被消费者线程全部取出。


你有没有觉得,刚刚介绍的 SynchronousQueue 是否很像一个容量为 0 的 TransferQueue


但 LinkedTransferQueue 相比其他阻塞队列多了三个方法


  • transfer(E e)如果当前有消费者正在等待消费元素,transfer 方法就可以直接将生产者传入的元素立刻 transfer (传输) 给消费者;如果没有消费者等待消费元素,那么 transfer 方法会把元素放到队列的 tail(尾部)节点,一直阻塞,直到该元素被消费者消费才返回


  • tryTransfer(E e)tryTransfer,很显然是一种尝试,如果没有消费者等待消费元素,则马上返回 false ,程序不会阻塞


  • tryTransfer(E e, long timeout, TimeUnit unit)带有超时限制,尝试将生产者传入的元素 transfer 给消费者,如果超时时间到,还没有消费者消费元素,则返回 false


你瞧,所有阻塞的方法都是一个套路:


  1. 阻塞方式


  1. 带有 try 的非阻塞方式


  1. 带有 try 和超时时间的非阻塞方式


看到这你也许感觉 LinkedTransferQueue 没啥特点,其实它和其他阻塞队列的差别还挺大的:


BlockingQueue 是如果队列满了,线程才会阻塞;但是 TransferQueue 是如果没有消费元素,则会阻塞 (transfer 方法)


这也就应了 Doug Lea 说的那句话:


LinkedTransferQueue is actually a superset of ConcurrentLinkedQueue, SynchronousQueue (in “fair” mode), and unbounded LinkedBlockingQueues. And it’s made better by allowing you to mix andmatch those features as well as take advantage of higher-performance implementation techniques.

简单翻译:

LinkedTransferQueueConcurrentLinkedQueue, SynchronousQueue (在公平模式下), 无界的LinkedBlockingQueues等的超集; 允许你混合使用阻塞队列的多种特性

所以,在合适的场景中,请尽量使用LinkedTransferQueue


上面都看的是单向队列 FIFO,接下来我们看看双向队列


LinkedBlockingDeque


LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,凡是后缀为 Deque 的都是双向队列意思,后缀的发音为deck——/dek/, 刚接触它时我以为是这个冰激凌的发音


微信图片_20220511135612.png


所谓双向队列值得就是可以从队列的两端插入和移除元素。所以:


双向队列因为多了一个操作队列的入口,在多线程同时入队是,也就会减少一半的竞争


队列有头,有尾,因此它又比其他阻塞队列多了几个特殊的方法


  • addFirst
  • addLast
  • xxxxFirst
  • xxxxLast
  • ... ...


微信图片_20220511135646.png


这么一看,双向阻塞队列确实很高效,


那双向阻塞队列应用在什么地方了呢?


不知道你是否听过 “工作窃取”模式,看似不太厚道的一种方法,实则是高效利用线程的好办法。下一篇文章,我们就来看看 ForkJoinPool 是如何应用 “工作窃取”模式的


总结


到这关于 Java 队列(其实主要介绍了阻塞队列)就快速的区分完了,将看似杂乱的方法做了分类整理,方便快速理解其用途,同时也说明了这些队列的实际用途。相信你带着更高的视角来阅读源码会更加轻松,最后也希望大家认真看两个队列的源码实现,在遇到队列的问题,脑海中的画面分分钟就可以搞定了




相关文章
|
21天前
|
安全 Java 编译器
揭秘JAVA深渊:那些让你头大的最晦涩知识点,从泛型迷思到并发陷阱,你敢挑战吗?
【8月更文挑战第22天】Java中的难点常隐藏在其高级特性中,如泛型与类型擦除、并发编程中的内存可见性及指令重排,以及反射与动态代理等。这些特性虽强大却也晦涩,要求开发者深入理解JVM运作机制及计算机底层细节。例如,泛型在编译时检查类型以增强安全性,但在运行时因类型擦除而丢失类型信息,可能导致类型安全问题。并发编程中,内存可见性和指令重排对同步机制提出更高要求,不当处理会导致数据不一致。反射与动态代理虽提供运行时行为定制能力,但也增加了复杂度和性能开销。掌握这些知识需深厚的技术底蕴和实践经验。
43 2
|
1月前
|
安全 Java 调度
解锁Java并发编程高阶技能:深入剖析无锁CAS机制、揭秘魔法类Unsafe、精通原子包Atomic,打造高效并发应用
【8月更文挑战第4天】在Java并发编程中,无锁编程以高性能和低延迟应对高并发挑战。核心在于无锁CAS(Compare-And-Swap)机制,它基于硬件支持,确保原子性更新;Unsafe类提供底层内存操作,实现CAS;原子包java.util.concurrent.atomic封装了CAS操作,简化并发编程。通过`AtomicInteger`示例,展现了线程安全的自增操作,突显了这些技术在构建高效并发程序中的关键作用。
53 1
|
29天前
|
Java
java中的队列
这篇文章通过Java代码示例介绍了使用数组实现队列操作,包括队列的初始化、入队、出队、判断队列满和空以及遍历队列的方法。
java中的队列
|
20天前
|
存储 Java
Java 中 ConcurrentHashMap 的并发级别
【8月更文挑战第22天】
31 5
|
20天前
|
存储 算法 Java
Java 中的同步集合和并发集合
【8月更文挑战第22天】
20 5
|
19天前
|
缓存 Java 调度
【Java 并发秘籍】线程池大作战:揭秘 JDK 中的线程池家族!
【8月更文挑战第24天】Java的并发库提供多种线程池以应对不同的多线程编程需求。本文通过实例介绍了四种主要线程池:固定大小线程池、可缓存线程池、单一线程线程池及定时任务线程池。固定大小线程池通过预设线程数管理任务队列;可缓存线程池能根据需要动态调整线程数量;单一线程线程池确保任务顺序执行;定时任务线程池支持周期性或延时任务调度。了解并正确选用这些线程池有助于提高程序效率和资源利用率。
30 2
|
21天前
|
Java 开发者
【编程高手必备】Java多线程编程实战揭秘:解锁高效并发的秘密武器!
【8月更文挑战第22天】Java多线程编程是提升软件性能的关键技术,可通过继承`Thread`类或实现`Runnable`接口创建线程。为确保数据一致性,可采用`synchronized`关键字或`ReentrantLock`进行线程同步。此外,利用`wait()`和`notify()`方法实现线程间通信。预防死锁策略包括避免嵌套锁定、固定锁顺序及设置获取锁的超时。掌握这些技巧能有效增强程序的并发处理能力。
17 2
|
2月前
|
Java 开发者
Java中的多线程与并发控制
【7月更文挑战第31天】在Java的世界中,多线程是提升程序性能和响应能力的关键。本文将通过实际案例,深入探讨Java多线程的创建、同步机制以及并发包的使用,旨在帮助读者理解并掌握如何在Java中高效地实现多线程编程。
38 3
|
2月前
|
负载均衡 NoSQL Java
|
2月前
|
安全 Java 开发者
探索Java内存模型:可见性、有序性和并发
在Java的并发编程领域中,内存模型扮演了至关重要的角色。本文旨在深入探讨Java内存模型的核心概念,包括可见性、有序性和它们对并发实践的影响。我们将通过具体示例和底层原理分析,揭示这些概念如何协同工作以确保跨线程操作的正确性,并指导开发者编写高效且线程安全的代码。