10分钟搞定 Java 并发队列好吗?好的(下)

简介: 10分钟搞定 Java 并发队列好吗?好的(下)

SynchronousQueue


微信图片_20220511135009.png

这是一个不存储元素的阻塞队列,不存储元素还叫队列?


微信图片_20220511135036.gif


没错,SynchronousQueue 直译过来叫同步队列,如果在队列里面呆久了应该就算是“异步”了吧


所以使用它,每个put() 操作必须要等待一个 take() 操作,反之亦然,否则不能继续添加元素


实际中怎么用呢?假如你需要两个线程之间同步共享变量,如果不用 SynchronousQueue 你可能会选择用 CountDownLatch 来完成,就像这样:


ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);
Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    sharedState.set(producedElement);
    countDownLatch.countDown();
};
Runnable consumer = () -> {
    try {
        countDownLatch.await();
        Integer consumedElement = sharedState.get();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};


这点小事就用计数器来实现,显然很不合适,用 SynchronousQueue 改造一下,感觉瞬间就不一样了


ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    try {
        queue.put(producedElement);
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};
Runnable consumer = () -> {
    try {
        Integer consumedElement = queue.take();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};


其实 Executors.newCachedThreadPool() 方法里面使用的就是 SynchronousQueue


public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}


看到前面 LinkedBlockingQueue 用在 newSingleThreadExecutornewFixedThreadPool 上,而 newCachedThreadPool 却用 SynchronousQueue,这是为什么呢?


因为单线程池和固定线程池中,线程数量是有限的,因此提交的任务需要在LinkedBlockingQueue队列中等待空余的线程;


而缓存线程池中,线程数量几乎无限(上限为Integer.MAX_VALUE),因此提交的任务只需要在SynchronousQueue 队列中同步移交给空余线程即可, 所以有时也会说 SynchronousQueue 的吞吐量要高于 LinkedBlockingQueueArrayBlockingQueue


LinkedTransferQueue


简单来说,TransferQueue提供了一个场所,生产者线程使用 transfer 方法传入一些对象并阻塞,直至这些对象被消费者线程全部取出。


你有没有觉得,刚刚介绍的 SynchronousQueue 是否很像一个容量为 0 的 TransferQueue


但 LinkedTransferQueue 相比其他阻塞队列多了三个方法


  • transfer(E e)如果当前有消费者正在等待消费元素,transfer 方法就可以直接将生产者传入的元素立刻 transfer (传输) 给消费者;如果没有消费者等待消费元素,那么 transfer 方法会把元素放到队列的 tail(尾部)节点,一直阻塞,直到该元素被消费者消费才返回


  • tryTransfer(E e)tryTransfer,很显然是一种尝试,如果没有消费者等待消费元素,则马上返回 false ,程序不会阻塞


  • tryTransfer(E e, long timeout, TimeUnit unit)带有超时限制,尝试将生产者传入的元素 transfer 给消费者,如果超时时间到,还没有消费者消费元素,则返回 false


你瞧,所有阻塞的方法都是一个套路:


  1. 阻塞方式


  1. 带有 try 的非阻塞方式


  1. 带有 try 和超时时间的非阻塞方式


看到这你也许感觉 LinkedTransferQueue 没啥特点,其实它和其他阻塞队列的差别还挺大的:


BlockingQueue 是如果队列满了,线程才会阻塞;但是 TransferQueue 是如果没有消费元素,则会阻塞 (transfer 方法)


这也就应了 Doug Lea 说的那句话:


LinkedTransferQueue is actually a superset of ConcurrentLinkedQueue, SynchronousQueue (in “fair” mode), and unbounded LinkedBlockingQueues. And it’s made better by allowing you to mix andmatch those features as well as take advantage of higher-performance implementation techniques.

简单翻译:

LinkedTransferQueueConcurrentLinkedQueue, SynchronousQueue (在公平模式下), 无界的LinkedBlockingQueues等的超集; 允许你混合使用阻塞队列的多种特性

所以,在合适的场景中,请尽量使用LinkedTransferQueue


上面都看的是单向队列 FIFO,接下来我们看看双向队列


LinkedBlockingDeque


LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,凡是后缀为 Deque 的都是双向队列意思,后缀的发音为deck——/dek/, 刚接触它时我以为是这个冰激凌的发音


微信图片_20220511135612.png


所谓双向队列值得就是可以从队列的两端插入和移除元素。所以:


双向队列因为多了一个操作队列的入口,在多线程同时入队是,也就会减少一半的竞争


队列有头,有尾,因此它又比其他阻塞队列多了几个特殊的方法


  • addFirst
  • addLast
  • xxxxFirst
  • xxxxLast
  • ... ...


微信图片_20220511135646.png


这么一看,双向阻塞队列确实很高效,


那双向阻塞队列应用在什么地方了呢?


不知道你是否听过 “工作窃取”模式,看似不太厚道的一种方法,实则是高效利用线程的好办法。下一篇文章,我们就来看看 ForkJoinPool 是如何应用 “工作窃取”模式的


总结


到这关于 Java 队列(其实主要介绍了阻塞队列)就快速的区分完了,将看似杂乱的方法做了分类整理,方便快速理解其用途,同时也说明了这些队列的实际用途。相信你带着更高的视角来阅读源码会更加轻松,最后也希望大家认真看两个队列的源码实现,在遇到队列的问题,脑海中的画面分分钟就可以搞定了




相关文章
|
1月前
|
前端开发 Java
java中的Queue队列的用法
java中的Queue队列的用法
19 1
|
1月前
|
存储 安全 算法
解读 Java 并发队列 BlockingQueue
解读 Java 并发队列 BlockingQueue
20 0
|
1月前
|
存储 缓存 算法
Java并发基础:原子类之AtomicMarkableReference全面解析
AtomicMarkableReference类能够确保引用和布尔标记的原子性更新,有效避免了多线程环境下的竞态条件,其提供的方法可以轻松地实现基于条件的原子性操作,提高了程序的并发安全性和可靠性。
Java并发基础:原子类之AtomicMarkableReference全面解析
|
1天前
|
数据采集 存储 Java
高德地图爬虫实践:Java多线程并发处理策略
高德地图爬虫实践:Java多线程并发处理策略
|
2天前
|
Java API 调度
[Java并发基础]多进程编程
[Java并发基础]多进程编程
|
3天前
|
存储 Java C++
Java集合篇之深度解析Queue,单端队列、双端队列、优先级队列、阻塞队列
Java集合篇之深度解析Queue,单端队列、双端队列、优先级队列、阻塞队列
16 0
|
8天前
|
安全 Java
深入理解 Java 多线程和并发工具类
【4月更文挑战第19天】本文探讨了Java多线程和并发工具类在实现高性能应用程序中的关键作用。通过继承`Thread`或实现`Runnable`创建线程,利用`Executors`管理线程池,以及使用`Semaphore`、`CountDownLatch`和`CyclicBarrier`进行线程同步。保证线程安全、实现线程协作和性能调优(如设置线程池大小、避免不必要同步)是重要环节。理解并恰当运用这些工具能提升程序效率和可靠性。
|
10天前
|
Java 开发者
Java中多线程并发控制的实现与优化
【4月更文挑战第17天】 在现代软件开发中,多线程编程已成为提升应用性能和响应能力的关键手段。特别是在Java语言中,由于其平台无关性和强大的运行时环境,多线程技术的应用尤为广泛。本文将深入探讨Java多线程的并发控制机制,包括基本的同步方法、死锁问题以及高级并发工具如java.util.concurrent包的使用。通过分析多线程环境下的竞态条件、资源争夺和线程协调问题,我们提出了一系列实现和优化策略,旨在帮助开发者构建更加健壮、高效的多线程应用。
7 0
|
10天前
|
存储 缓存 安全
Java并发基础之互斥同步、非阻塞同步、指令重排与volatile
在Java中,多线程编程常常涉及到共享数据的访问,这时候就需要考虑线程安全问题。Java提供了多种机制来实现线程安全,其中包括互斥同步(Mutex Synchronization)、非阻塞同步(Non-blocking Synchronization)、以及volatile关键字等。 互斥同步(Mutex Synchronization) 互斥同步是一种基本的同步手段,它要求在任何时刻,只有一个线程可以执行某个方法或某个代码块,其他线程必须等待。Java中的synchronized关键字就是实现互斥同步的常用手段。当一个线程进入一个synchronized方法或代码块时,它需要先获得锁,如果
24 0
|
18天前
|
存储 缓存 安全
【企业级理解】高效并发之Java内存模型
【企业级理解】高效并发之Java内存模型