SynchronousQueue
这是一个不存储元素的阻塞队列,不存储元素还叫队列?
没错,SynchronousQueue 直译过来叫同步队列,如果在队列里面呆久了应该就算是“异步”了吧
所以使用它,每个put() 操作必须要等待一个 take() 操作,反之亦然,否则不能继续添加元素
实际中怎么用呢?假如你需要两个线程之间同步共享变量,如果不用 SynchronousQueue 你可能会选择用 CountDownLatch 来完成,就像这样:
ExecutorService executor = Executors.newFixedThreadPool(2); AtomicInteger sharedState = new AtomicInteger(); CountDownLatch countDownLatch = new CountDownLatch(1); Runnable producer = () -> { Integer producedElement = ThreadLocalRandom .current() .nextInt(); sharedState.set(producedElement); countDownLatch.countDown(); }; Runnable consumer = () -> { try { countDownLatch.await(); Integer consumedElement = sharedState.get(); } catch (InterruptedException ex) { ex.printStackTrace(); } };
这点小事就用计数器来实现,显然很不合适,用 SynchronousQueue 改造一下,感觉瞬间就不一样了
ExecutorService executor = Executors.newFixedThreadPool(2); SynchronousQueue<Integer> queue = new SynchronousQueue<>(); Runnable producer = () -> { Integer producedElement = ThreadLocalRandom .current() .nextInt(); try { queue.put(producedElement); } catch (InterruptedException ex) { ex.printStackTrace(); } }; Runnable consumer = () -> { try { Integer consumedElement = queue.take(); } catch (InterruptedException ex) { ex.printStackTrace(); } };
其实 Executors.newCachedThreadPool() 方法里面使用的就是 SynchronousQueue
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
看到前面LinkedBlockingQueue
用在newSingleThreadExecutor
和newFixedThreadPool
上,而newCachedThreadPool
却用SynchronousQueue
,这是为什么呢?
因为单线程池和固定线程池中,线程数量是有限的,因此提交的任务需要在LinkedBlockingQueue
队列中等待空余的线程;
而缓存线程池中,线程数量几乎无限(上限为Integer.MAX_VALUE
),因此提交的任务只需要在SynchronousQueue
队列中同步移交给空余线程即可, 所以有时也会说 SynchronousQueue
的吞吐量要高于 LinkedBlockingQueue
和 ArrayBlockingQueue
LinkedTransferQueue
简单来说,TransferQueue提供了一个场所,生产者线程使用 transfer
方法传入一些对象并阻塞,直至这些对象被消费者线程全部取出。
你有没有觉得,刚刚介绍的 SynchronousQueue 是否很像一个容量为 0 的 TransferQueue。
但 LinkedTransferQueue 相比其他阻塞队列多了三个方法
- transfer(E e)如果当前有消费者正在等待消费元素,transfer 方法就可以直接将生产者传入的元素立刻 transfer (传输) 给消费者;如果没有消费者等待消费元素,那么 transfer 方法会把元素放到队列的 tail(尾部)节点,一直阻塞,直到该元素被消费者消费才返回
- tryTransfer(E e)tryTransfer,很显然是一种尝试,如果没有消费者等待消费元素,则马上返回 false ,程序不会阻塞
- tryTransfer(E e, long timeout, TimeUnit unit)带有超时限制,尝试将生产者传入的元素 transfer 给消费者,如果超时时间到,还没有消费者消费元素,则返回 false
你瞧,所有阻塞的方法都是一个套路:
- 阻塞方式
- 带有 try 的非阻塞方式
- 带有 try 和超时时间的非阻塞方式
看到这你也许感觉 LinkedTransferQueue 没啥特点,其实它和其他阻塞队列的差别还挺大的:
BlockingQueue 是如果队列满了,线程才会阻塞;但是 TransferQueue 是如果没有消费元素,则会阻塞 (transfer 方法)
这也就应了 Doug Lea 说的那句话:
LinkedTransferQueue
is actually a superset ofConcurrentLinkedQueue
,SynchronousQueue
(in “fair” mode), and unboundedLinkedBlockingQueues
. And it’s made better by allowing you to mix andmatch those features as well as take advantage of higher-performance implementation techniques.简单翻译:
LinkedTransferQueue
是ConcurrentLinkedQueue
,SynchronousQueue
(在公平模式下), 无界的LinkedBlockingQueues
等的超集; 允许你混合使用阻塞队列的多种特性所以,在合适的场景中,请尽量使用
LinkedTransferQueue
上面都看的是单向队列 FIFO,接下来我们看看双向队列
LinkedBlockingDeque
LinkedBlockingDeque
是一个由链表结构组成的双向阻塞队列,凡是后缀为 Deque 的都是双向队列意思,后缀的发音为deck——/dek/
, 刚接触它时我以为是这个冰激凌的发音
所谓双向队列值得就是可以从队列的两端插入和移除元素。所以:
双向队列因为多了一个操作队列的入口,在多线程同时入队是,也就会减少一半的竞争
队列有头,有尾,因此它又比其他阻塞队列多了几个特殊的方法
- addFirst
- addLast
- xxxxFirst
- xxxxLast
- ... ...
这么一看,双向阻塞队列确实很高效,
那双向阻塞队列应用在什么地方了呢?
不知道你是否听过 “工作窃取”模式,看似不太厚道的一种方法,实则是高效利用线程的好办法。下一篇文章,我们就来看看 ForkJoinPool 是如何应用 “工作窃取”模式的
总结
到这关于 Java 队列(其实主要介绍了阻塞队列)就快速的区分完了,将看似杂乱的方法做了分类整理,方便快速理解其用途,同时也说明了这些队列的实际用途。相信你带着更高的视角来阅读源码会更加轻松,最后也希望大家认真看两个队列的源码实现,在遇到队列的问题,脑海中的画面分分钟就可以搞定了