思考:LinkedBlockingQueue与ArrayBlockingQueue有何区别
一、继承实现图关系
二、底层数据存储结构
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { ... static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } private final int capacity; private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition(); ... }
说明:
- Node: 链表数据结构
- capacity: 链表的容量上限值,如果没有配置则为 Integer.MAX_VALUE
- count: 链表的容量
- head: 指向链表的头
- last: 指向链表的尾
- takeLock: 取操作链表对象锁
- notEmpty: 链表非空阻塞和唤醒条件
- putLock: 插入操作链表对象锁
- notFull: 链表是否已满阻塞和唤醒条件
三、特点及优缺点
3.1 特点
- 线程安全阻塞队列
- 线程安全:插入有插入锁,取数据有取锁
- 有界:链表可以说是有界的,不设置默认为Integer.MAX_VALUE
- 阻塞:链表空时取阻塞,链表满时插入会阻塞
- 取操作和插入操作锁分离(注:Java8是一把锁,Java17是两把锁)
- 先进先出原则
- 从尾部插入,从头部取出
3.2 优缺点
- 插入锁和取锁分离,插入和取互不干涉的执行。
- remove操作要同时获取插入锁和取锁 两把锁,效率很低
- 插入和取出可同时进行
- 占内存空间大
- 无法按下标获取元素,比较从头开始遍历
- 插入和删除数据性能较好
四、源码详解
读取部分源码:
- 添加任务方法
- 获取和删除任务方法
阅读明白这几个接口即可,其它都很简单
4.1 添加任务
public void put(E e) throws InterruptedException { // 链表节点信息不能为空 if (e == null) throw new NullPointerException(); final int c; // 构建节点数据 final LinkedBlockingQueue.Node<E> node = new LinkedBlockingQueue.Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 尝试获取put锁,允许在尝试获取put锁时其它线程调用尝试获取put锁的线程的Thread.interrupt方法来中断线程,这时不用获取到put锁,直接抛出InterruptedException putLock.lockInterruptibly(); try { // 如果链表长度达最大值,则阻塞 while (count.get() == capacity) { notFull.await(); } // 新构建的节点添加到链表尾部 enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) // 若链表长度未达最大值,则唤醒其它由notFull阻塞的线程 notFull.signal(); } finally { // 释放put锁 putLock.unlock(); } if (c == 0) signalNotEmpty(); } /** * 添加到链表尾部 */ private void enqueue(LinkedBlockingQueue.Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } /** * c == 0 signalNotEmpty */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
说明:c == 0 说明新增之前是空,可能在put之前有 take 操作使notEmpty在await()状态中,put之后队列不为空则调一下notEmpty.signal()。
4.2 获取和删除任务
public E take() throws InterruptedException { final E x; final int c; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 获取takeLock锁 takeLock.lockInterruptibly(); try { // 链表为空,进入await状态 while (count.get() == 0) { notEmpty.await(); } // 有数据,取出队列头 x = dequeue(); // 获取当前值并减1 c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { // 释放takeLock锁 takeLock.unlock(); } // c == capacity,count减1之后则已notFull, if (c == capacity) signalNotFull(); return x; } /** * 取出队头 */ private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; LinkedBlockingQueue.Node<E> h = head; LinkedBlockingQueue.Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } /** * c == capacity, count减1之后为notFull,唤醒notFull.await状态的线程 */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
说明:c == capacity 说明take之前链表是满的,若take之前有其它线程(a)在put操作,则它(a)进入了notFull.await()状态。本线程此时 count已经-1链表有空间,所以调notFull.signal(),若存在其它线程(a)则唤醒它继续put。
五、作用&与ArrayBlockingQueue区别
- ArrayBlockingQueue是有界的必须指定大小,而LinkedBlockingQueue不需要指定大小,不指定大小默认为Integer.MAX_VALUE
- ArrayBlockingQueue是一把锁,LinkedBlockingQueue是两把锁添加速度快,并发性高的环境下LinkedBlockingQueue的效率更高
- ArrayBlockingQueue存储数据结构是数组,LinkedBlockingQueue存储数据结构是链表,正因为如此LinkedBlockingQueue消耗更多的内存资源
六、示例
public class LinkedBlockQueueTester { class Test2 implements Runnable { @Override public void run() { } } public static void main(String[] args) throws Exception { new LinkedBlockQueueTester().launch(); } private void launch() throws Exception { LinkedBlockingQueue queue = new LinkedBlockingQueue(2); for (int i = 0; i < 10; i++) { queue.add(new Test2()); //queue.put(new Test2()); } } }
执行结果如下:
Exception in thread "main" java.lang.IllegalStateException: Queue full at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98) at w.cx.lrn.data.test.LinkedBlockQueueTester.launch(LinkedBlockQueueTester.java:28) at w.cx.lrn.data.test.LinkedBlockQueueTester.main(LinkedBlockQueueTester.java:22)
add方法调offer方法,源码如下:
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; // 队列满时返回false if (count.get() == capacity) return false; final int c; final LinkedBlockingQueue.Node<E> node = new LinkedBlockingQueue.Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() == capacity) return false; enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
从offer文法源码中看出,当链表满时,返回false则add方法走else逻辑抛出IllegalStateException("Queue full")异常。
若是put方法,链表满时则进入等待状态,源码见4.1,直到有其它线程执行take后满足c == capacity 后唤醒本线程才可能成功添加任务。