BlockingQueue 是什么?
BlockingQueue 是一个 Queue , 它是一个线程安全的阻塞队列接口。
一种队列,它还支持在检索元素时等待队列变为非空,在存储元素时等待队列中的空间变为可用的操作。 BlockingQueue方法有四种形式,有不同的处理操作的方法,这些操作不能立即满足,但在将来的某个时候可能会满足:一种抛出异常,另一种返回特殊值(null或false,取决于操作),第三个线程无限期地阻塞当前线程,直到操作成功,第四个线程在放弃之前只阻塞给定的最大时间限制。下表总结了这些方法:
抛出异常 | 特殊值 | 阻塞 | 超时 | |
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() | 不可用 | 不可用 |
下图是它的一个继承和实现关系图:
常见的几种队列
- ArrayBlockingQueue 数组有界队列
- LinkedBlockingDeque 链表无界队列
- DelayQeque 基于时间的调度无界队列
- PriorityBlockingQueue 优先级堆支持的无界队列
使用场景
- 线程池中使用,下面是咱们线程池的构造方法如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { //... }
- Eureka 三级缓存
- Netty
- Nacos
- RokcetMQ
ArrayBlockingQueue
由数组支持的有界阻塞队列。该队列对元素进行FIFO排序(先进先出)。队列的头是在队列上停留时间最长的元素。队列的尾部是在队列上停留时间最短的元素。新元素插入到队列的尾部,队列检索操作获取队列头部的元素。
这是一个经典的“有界缓冲区”,其中一个固定大小的数组保存生产者插入的元素和消费者提取的元素。一旦创建,容量就无法更改。尝试将元素放入完整队列将导致操作阻塞;尝试从空队列中获取元素也会被阻塞。
此类支持一个可选的公平策略,用于排序等待的生产者线程和消费者线程。默认情况下,不保证此顺序。然而,公平性设置为true的队列以FIFO顺序授予线程访问权限。公平性通常会降低吞吐量,但会降低可变性并避免饥饿。
数据结构
它底层的数据结构是一个数组形式,构造方法如下:
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; // 初始化数组 lock = new ReentrantLock(fair); // 创建锁 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
入队和出队过程
入队和出队过程如下图所示(流程图为 put/take 方法),它的本质是一个设置一个全局的Lock , 它是一个 ReentrantLock 然后通过 Condition 进行边界状态的限制,就是进行条件通知。
使用场景
通常在线程池创建的时候,我一般会使用 LinkedBlockingDeque 作为一个缓冲队列。
LinkedBlockingDeque
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。 相比于其他阻塞队列,LinkedBlockingDeque多了addFirst、addLast、peekFirst、peekLast等方法,以first结尾的方法,表示插入、获取获移除双端队列的第一个元素。以last结尾的方法,表示插入、获取获移除双端队列的最后一个元素。 LinkedBlockingDeque是可选容量的,在初始化时可以设置容量防止其过度膨胀,如果不设置,默认容量大小为Integer.MAX_VALUE。
数据结构
数据结构如下,它是一个双端单向链表。
如何使用
下面我们简单的使用一下,测试代码如下所示:
public class LinkedBockingQueueTest { public static void main(String[] args) { BlockingDeque<String> blockingDeque = new LinkedBlockingDeque<>(1); // offer,poll 线程安全/阻塞 api blockingDeque.offer("添加第一个元素"); String item = blockingDeque.poll(); System.out.println("poll item:" + item); // offer,poll 线程安全/如果失败抛出异常 try { blockingDeque.put("添加第二个元素"); } catch (InterruptedException e) { e.printStackTrace(); } try { String take = blockingDeque.take(); System.out.println("take item:" + take); } catch (InterruptedException e) { e.printStackTrace(); } // add,remove 不是线程安全的 blockingDeque.add("添加第四个元素"); blockingDeque.add("添加第五个元素"); item = blockingDeque.remove(); System.out.println(item); } }
输出结果如下所示:
poll item:添加第一个元素 take item:添加第二个元素 Exception in thread "main" java.lang.IllegalStateException: Deque full at java.util.concurrent.LinkedBlockingDeque.addLast(LinkedBlockingDeque.java:335) at java.util.concurrent.LinkedBlockingDeque.add(LinkedBlockingDeque.java:633) at cn.zhengsh.queue.LinkedBockingQueueTest.main(LinkedBockingQueueTest.java:30)
使用场景
通常在线程池创建的时候,我一般会使用 LinkedBlockingDeque 作为一个缓冲队列。