10分钟搞定 Java 并发队列好吗?好的(下)

简介: 10分钟搞定 Java 并发队列好吗?好的(下)

SynchronousQueue


微信图片_20220511135009.png

这是一个不存储元素的阻塞队列,不存储元素还叫队列?


微信图片_20220511135036.gif


没错,SynchronousQueue 直译过来叫同步队列,如果在队列里面呆久了应该就算是“异步”了吧


所以使用它,每个put() 操作必须要等待一个 take() 操作,反之亦然,否则不能继续添加元素


实际中怎么用呢?假如你需要两个线程之间同步共享变量,如果不用 SynchronousQueue 你可能会选择用 CountDownLatch 来完成,就像这样:


ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);
Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    sharedState.set(producedElement);
    countDownLatch.countDown();
};
Runnable consumer = () -> {
    try {
        countDownLatch.await();
        Integer consumedElement = sharedState.get();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};


这点小事就用计数器来实现,显然很不合适,用 SynchronousQueue 改造一下,感觉瞬间就不一样了


ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    try {
        queue.put(producedElement);
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};
Runnable consumer = () -> {
    try {
        Integer consumedElement = queue.take();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};


其实 Executors.newCachedThreadPool() 方法里面使用的就是 SynchronousQueue


public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}


看到前面 LinkedBlockingQueue 用在 newSingleThreadExecutornewFixedThreadPool 上,而 newCachedThreadPool 却用 SynchronousQueue,这是为什么呢?


因为单线程池和固定线程池中,线程数量是有限的,因此提交的任务需要在LinkedBlockingQueue队列中等待空余的线程;


而缓存线程池中,线程数量几乎无限(上限为Integer.MAX_VALUE),因此提交的任务只需要在SynchronousQueue 队列中同步移交给空余线程即可, 所以有时也会说 SynchronousQueue 的吞吐量要高于 LinkedBlockingQueueArrayBlockingQueue


LinkedTransferQueue


简单来说,TransferQueue提供了一个场所,生产者线程使用 transfer 方法传入一些对象并阻塞,直至这些对象被消费者线程全部取出。


你有没有觉得,刚刚介绍的 SynchronousQueue 是否很像一个容量为 0 的 TransferQueue


但 LinkedTransferQueue 相比其他阻塞队列多了三个方法


  • transfer(E e)如果当前有消费者正在等待消费元素,transfer 方法就可以直接将生产者传入的元素立刻 transfer (传输) 给消费者;如果没有消费者等待消费元素,那么 transfer 方法会把元素放到队列的 tail(尾部)节点,一直阻塞,直到该元素被消费者消费才返回


  • tryTransfer(E e)tryTransfer,很显然是一种尝试,如果没有消费者等待消费元素,则马上返回 false ,程序不会阻塞


  • tryTransfer(E e, long timeout, TimeUnit unit)带有超时限制,尝试将生产者传入的元素 transfer 给消费者,如果超时时间到,还没有消费者消费元素,则返回 false


你瞧,所有阻塞的方法都是一个套路:


  1. 阻塞方式


  1. 带有 try 的非阻塞方式


  1. 带有 try 和超时时间的非阻塞方式


看到这你也许感觉 LinkedTransferQueue 没啥特点,其实它和其他阻塞队列的差别还挺大的:


BlockingQueue 是如果队列满了,线程才会阻塞;但是 TransferQueue 是如果没有消费元素,则会阻塞 (transfer 方法)


这也就应了 Doug Lea 说的那句话:


LinkedTransferQueue is actually a superset of ConcurrentLinkedQueue, SynchronousQueue (in “fair” mode), and unbounded LinkedBlockingQueues. And it’s made better by allowing you to mix andmatch those features as well as take advantage of higher-performance implementation techniques.

简单翻译:

LinkedTransferQueueConcurrentLinkedQueue, SynchronousQueue (在公平模式下), 无界的LinkedBlockingQueues等的超集; 允许你混合使用阻塞队列的多种特性

所以,在合适的场景中,请尽量使用LinkedTransferQueue


上面都看的是单向队列 FIFO,接下来我们看看双向队列


LinkedBlockingDeque


LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,凡是后缀为 Deque 的都是双向队列意思,后缀的发音为deck——/dek/, 刚接触它时我以为是这个冰激凌的发音


微信图片_20220511135612.png


所谓双向队列值得就是可以从队列的两端插入和移除元素。所以:


双向队列因为多了一个操作队列的入口,在多线程同时入队是,也就会减少一半的竞争


队列有头,有尾,因此它又比其他阻塞队列多了几个特殊的方法


  • addFirst
  • addLast
  • xxxxFirst
  • xxxxLast
  • ... ...


微信图片_20220511135646.png


这么一看,双向阻塞队列确实很高效,


那双向阻塞队列应用在什么地方了呢?


不知道你是否听过 “工作窃取”模式,看似不太厚道的一种方法,实则是高效利用线程的好办法。下一篇文章,我们就来看看 ForkJoinPool 是如何应用 “工作窃取”模式的


总结


到这关于 Java 队列(其实主要介绍了阻塞队列)就快速的区分完了,将看似杂乱的方法做了分类整理,方便快速理解其用途,同时也说明了这些队列的实际用途。相信你带着更高的视角来阅读源码会更加轻松,最后也希望大家认真看两个队列的源码实现,在遇到队列的问题,脑海中的画面分分钟就可以搞定了




相关文章
|
3月前
|
Java API 调度
从阻塞到畅通:Java虚拟线程开启并发新纪元
从阻塞到畅通:Java虚拟线程开启并发新纪元
321 83
|
3月前
|
存储 Java 调度
Java虚拟线程:轻量级并发的革命性突破
Java虚拟线程:轻量级并发的革命性突破
297 83
|
6月前
|
消息中间件 算法 安全
JUC并发—1.Java集合包底层源码剖析
本文主要对JDK中的集合包源码进行了剖析。
|
5月前
|
机器学习/深度学习 消息中间件 存储
【高薪程序员必看】万字长文拆解Java并发编程!(9-2):并发工具-线程池
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。
203 0
|
6月前
|
前端开发 Java
java实现队列数据结构代码详解
本文详细解析了Java中队列数据结构的实现,包括队列的基本概念、应用场景及代码实现。队列是一种遵循“先进先出”原则的线性结构,支持在队尾插入和队头删除操作。文章介绍了顺序队列与链式队列,并重点分析了循环队列的实现方式以解决溢出问题。通过具体代码示例(如`enqueue`入队和`dequeue`出队),展示了队列的操作逻辑,帮助读者深入理解其工作机制。
178 1
|
4月前
|
Java 物联网 数据处理
Java Solon v3.2.0 史上最强性能优化版本发布 并发能力提升 700% 内存占用节省 50%
Java Solon v3.2.0 是一款性能卓越的后端开发框架,新版本并发性能提升700%,内存占用节省50%。本文将从核心特性(如事件驱动模型与内存优化)、技术方案示例(Web应用搭建与数据库集成)到实际应用案例(电商平台与物联网平台)全面解析其优势与使用方法。通过简单代码示例和真实场景展示,帮助开发者快速掌握并应用于项目中,大幅提升系统性能与资源利用率。
140 6
Java Solon v3.2.0 史上最强性能优化版本发布 并发能力提升 700% 内存占用节省 50%
|
3月前
|
SQL 缓存 安全
深度理解 Java 内存模型:从并发基石到实践应用
本文深入解析 Java 内存模型(JMM),涵盖其在并发编程中的核心作用与实践应用。内容包括 JMM 解决的可见性、原子性和有序性问题,线程与内存的交互机制,volatile、synchronized 和 happens-before 等关键机制的使用,以及在单例模式、线程通信等场景中的实战案例。同时,还介绍了常见并发 Bug 的排查与解决方案,帮助开发者写出高效、线程安全的 Java 程序。
191 0
|
5月前
|
缓存 安全 Java
【高薪程序员必看】万字长文拆解Java并发编程!(3-1):并发共享问题的解决与分析
活锁:多个线程相互影响对方退出同步代码块的条件而导致线程一直运行的情况。例如,线程1的退出条件是count=5,而线程2和线程3在其代码块中不断地是count进行自增自减的操作,导致线程1永远运行。内存一致性问题:由于JIT即时编译器对缓存的优化和指令重排等造成的内存可见性和有序性问题,可以通过synchronized,volatile,并发集合类等机制来解决。这里的线程安全是指,多个线程调用它们同一个实例的方法时,是线程安全的,但仅仅能保证当前调用的方法是线程安全的,不同方法之间是线程不安全的。
96 0
|
5月前
|
Java 程序员
【高薪程序员必看】万字长文拆解Java并发编程!(3-2):并发共享问题的解决与分析
wait方法和notify方法都是Object类的方法:让当前获取锁的线程进入waiting状态,并进入waitlist队列:让当前获取锁的线程进入waiting状态,并进入waitlist队列,等待n秒后自动唤醒:在waitlist队列中挑一个线程唤醒:唤醒所有在waitlist队列中的线程它们都是之间协作的手段,只有拥有对象锁的线程才能调用这些方法,否则会出现IllegalMonitorStateException异常park方法和unpark方法是LockSupport类中的方法。
102 0

热门文章

最新文章