并发包下面的并发容器中,ConcurrentLinkedQueue 这种 Concurrent 类型和的集合才真正代表并发。
- Concurrent 类型基于 lock-free ,常见的多线程访问场景,一般可以提供高吞吐量。
- LinkedBlockingQueue 内部基于锁实现,提供了BlockingQueue等特性方法。
java.util.concurrent 包并发容器分类
- Concurrent
- CopyOnWrite
- Blocking
Concurrent 类型集合
- Concurrent 类型没有类似 CopyOnWrite 之类容器相对较重的修改开销。
- Concurrent 往往提供了较低的遍历一致性,可以理解所谓的弱一致性,例如,当迭代器遍历时,如果容器发生修改,迭代器仍然可以继续遍历。
- 弱一致性的另外一个体现是, size 等操作准确性有限的,未必是100%准确。
- 读取的性能具有一定的不确定性
Java 集合中有个线程不安全的队列 LinkedList 是个 Deque。
ConcurrentLinkedDeque 和 LinkedBlockingQueue 区别
能够理解 ConcurrentLinkedQueue 和 LinkedBlockingQueue 的主要区别。在常规队列操作基础上, Blocking意味着提供了特定的等待性操作。
阻塞队列 LinkedBlockingQueue
适用阻塞队列的好处,多线程操作共同的队列时不需要额外的同步。
- void put(E e): 在队尾插入元素,方法在队列满的时候会阻塞直到有队列成员被消费
- boolean offer(E e): 在队尾插入元素,方法在队列满的时候不会阻塞,直接返回 false
- E take(): 取出并删除队列中的首元素,如果队列为空,会阻塞,直到有队列成员被放进来
- E poll(): 取出并删除队列中的首元素,如果队列为空,则返回 null,不进行阻塞
- E peek(): 取出第一个元素但是不删除它,没有就返回 null
非阻塞队列 ConcurrentLinkedQueue
允许多线程访问一个共同集合,可以使用 ConcurrentLinedQueue Queue 中元素按 FIFO 原则进行排序。采用CAS 操作,来保证元素的一致性。
- boolean offer(E e): 在队尾插入元素,不进行阻塞
- E poll(): 取出并删除队列中的首元素,不进行阻塞
- E peek(): 取出第一个元素但是不删除它,不进行阻塞
ConcurrentLinkedDeque 和 LinkedBlockingDeque。Deque 的侧重点是支持对队列的头尾都信息插入和删除:
- 尾部插入时需要的addLast(e)、 offerLast(e)。
- 尾部删除所需要的removeLast()、 pollLast()。
阻塞队列 BlockingQueue 分类
- ArrayBlockingQueue
ArrayBlockingQueue是最典型的的有界队列,其内部以final的数组保存数据,数组的大小就决定了队列的边界,所以我们在创建ArrayBlockingQueue时,都要指定容量,如:
public ArrayBlockingQueue(int capacity, boolean fair)
- LinkedBlockingQueue
其行为和内部代码都是基于有界的逻辑实现的,只不过如果我们没有在创建队列时就指定容量,那么其容量限制就自动被设置为Integer.MAX_VALUE,成为了无界队列。
- SynchronousQueue
每个删除操作都要等待插入操作,反之每个插入操作也都要等待删除动作。这个队列的容量是0。
- PriorityBlockingQueue 是无边界的优先队列,虽然严格意义上来讲,其大小总归是要受系统资源影响。
- DelayedQueue 和 LinkedTransferQueue
都是无边界队列,就是 put 操作永远不会发生其他 BlockingQueue 阻塞的情况。
LinkedBlockingQueue 实现原理
/** Lock held by take, poll, etc */ private fnal ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private fnal Condition notEmpty = takeLock.newCondition(); /** Lock held by put, ofer, etc */ private fnal ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private fnal Condition notFull = putLock.newCondition();
LinkedBlockingQueue 改进了锁操作的力度,头,尾操作使用不同的锁,在通用的场景下,他的吞吐量相对要更好一些。
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
ArrayBlockingQueue take 实现原理
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
队列的使用场景
队列的典型使用场景是 生产者 -消费者 场景。
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class ConsumerProducer { public static final String EXIT_MSG = "Good bye!"; public static void main(String[] args) { // 使用较小的队列,以更好地在输出中展示其影响 BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); new Thread(producer).start(); new Thread(consumer).start(); } static class Producer implements Runnable { private BlockingQueue<String> queue; public Producer(BlockingQueue<String> q) { this.queue = q; } @Override public void run() { for (int i = 0; i < 20; i++) { try { Thread.sleep(5L); String msg = "Message" + i; System.out.println("Produced new item: " + msg); queue.put(msg); } catch (InterruptedException e) { e.printStackTrace(); } } try { System.out.println("Time to say good bye!"); queue.put(EXIT_MSG); } catch (InterruptedException e) { e.printStackTrace(); } } } static class Consumer implements Runnable { private BlockingQueue<String> queue; public Consumer(BlockingQueue<String> q) { this.queue = q; } @Override public void run() { try { String msg; while (!EXIT_MSG.equalsIgnoreCase((msg = queue.take()))) { System.out.println("Consumed item: " + msg); Thread.sleep(10L); } System.out.println("Got exit message, bye!"); } catch (InterruptedException e) { e.printStackTrace(); } } } }