1 概念和数据结构
支持以下操作的队列:
- 在检索元素时等待队列变为非空
- 在存储元素时等待队列中的空间变为可用。
BlockingQueue方法有四种形式,具有不同的处理操作的方法,这些操作不能立即得到满足,但可能在将来的某个时候得到满足:
- 第一个抛出异常,
- 第二个返回一个特殊值(null或false,取决于操作),
- 第三个阻塞当前线程,直到操作成功,
- 第四个阻塞只有给定的最大时间限制,然后放弃。
这些方法总结如下表: BlockingQueue方法的总结
网络异常,图片无法展示
|
接下来是BlockingQueue的继承关系图:
网络异常,图片无法展示
|
BlockingQueue的七大实现类:
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
- DelayedWorkQueue
- ForwardingBlockingQueue
- SynchronousQueue
- DelayQueue
- LinkedBlockingQueue:由链表结构组成的有界(大小默认为Integer.MAX_VALUE)的阻塞队列。
- PriorityBlockingQueue
网络异常,图片无法展示|
阻塞队列的基本原型:
网络异常,图片无法展示
|
2 应用场景
- 消息队列/消息中间件
- 生产-消费者问题
- …
3 代码
3.1 常用阻塞队列
3.1.1 ArrayBlockingQueue
由数组结构组成的有界阻塞队列。
/** * ArrayBlockingQueueTest */ public static void ArrayBlockingQueueTest() { ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); try { //向队列中添加元素add()方法会抛出异常 blockingQueue.add("zs"); blockingQueue.add("ls"); blockingQueue.add("ww"); blockingQueue.add("zl"); } catch (Exception e) { //发生异常时超出部分会添加失败 System.err.println("err -> " + e.getMessage()); } //检查栈顶元素 System.out.println("栈顶元素 -> " + blockingQueue.element()); try { //从队头排出元素 System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); } catch (Exception e) { System.err.println("err -> " + e.getMessage()); } //其他API与Queue使用类似 } 复制代码
3.1.2 LinkedBlockingQueue
由链表结构组成的有界(大小默认为Integer.MAX_VALUE)的阻塞队列。
/** * LinkedBlockingQueueTest */ public static void LinkedBlockingQueueTest() { LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(); blockingQueue.add("zs"); System.out.println(blockingQueue.remove()); } 复制代码
3.1.3 SynchronousQueue
不存储元素的阻塞队列,即单个元素的队列,每个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
/** * SynchronousQueueTest */ public static void SynchronousQueueTest() throws InterruptedException { SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>(); //必须要使用两个线程,一个put一个take //以下情况会造成线程阻塞 //原因:均在主线程中 /** * synchronousQueue.put("zs"); * synchronousQueue.take(); */ //正常使用 new Thread(() -> { try { synchronousQueue.put("zs"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { System.out.println(synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } 复制代码
3.2 使用阻塞队列解决生产-消费者问题
/** * @desc: 使用阻塞队列完成生产-消费者模型 * @author: YanMingXin * @create: 2021/8/1-16:00 **/ public class Test02 { /** * 声明一个队列 */ SynchronousQueue queue = new SynchronousQueue<Integer>(); /** * 生产次数 */ public static volatile int productNum = 0; /** * 消费次数 */ public static volatile int consumeNum = 0; /** * 生产者 */ public void producer(Integer val) throws Exception { queue.put(val); System.out.println("生产 -> " + val); } /** * 消费者 */ public void consumer() throws Exception { System.out.println("消费 -> " + queue.take()); } /** * 测试 * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { CountDownLatch latch = new CountDownLatch(2); Test02 test = new Test02(); new Thread(() -> { for (int i = 0; i < 100; i++) { try { Thread.sleep(100); test.producer(i); productNum += 1; } catch (Exception e) { e.printStackTrace(); } } try { latch.countDown(); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(() -> { for (int i = 0; i < 100; i++) { try { Thread.sleep(100); test.consumer(); consumeNum += 1; } catch (Exception e) { e.printStackTrace(); } } try { latch.countDown(); } catch (Exception e) { e.printStackTrace(); } }).start(); latch.await(); System.out.println("生产次数 = " + productNum); System.out.println("消费次数 = " + consumeNum); } } 复制代码
4 底层原理
在这里我们拿ArrayBlockingQueue举例:
网络异常,图片无法展示
|
ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列,可以按照 FIFO原则对元素进行排序。
- ArrayBlockingQueue是基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。 并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。
- ArrayBlockingQueue内部通过Object[]数组保存数据的,也就是说ArrayBlockingQueue本质上是通过数组实现的。ArrayBlockingQueue的大小,即数组的容量是在创建创建ArrayBlockingQueue时候指定的。
- ArrayBlockingQueue和ReentrantLock是组合关系,ArrayBlockingQueue中包含一个ReentrantLock对象。ReentrantLock是可重入的互斥锁。ArrayBlockingQueue就是根据ReentrantLock互斥锁实现"多线程对共享资源的访问"。ReentrantLock分为公平锁和非公平锁,关于具体使用公平锁还是非公平锁,在创建ArrayBlockingQueue时可以指定;而且,ArrayBlockingQueue默认会使用非公平锁。
- ArrayBlockingQueue和Condition是组合关系,ArrayBlockingQueue中包含两个Condition对象(notEmpty和notFull)。使用通知模式实现:所谓通知模式,当生产者往满的队列里面添加元素的时候,会阻塞生产者(调用Condition notFull.await()进行等待);当消费者消费了一个队列中的元素后,会通知(调用Condition notFull.signal()唤醒生产者)生产者当前队列可用。反之,当消费者消费的时候,发现队列是空的,则消费者会被阻塞(通过Condition的 notEmpty.await()进行等待),当生产者插入了队列中的一个元素后,则会调用notEmpty.signal()唤醒消费者继续消费。
/** * 构造方法 */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; //声明一个ReentrantLock lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /** * put方法 */ public void put(E e) throws InterruptedException { //检查不为空 checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); //插入元素 enqueue(e); } finally { lock.unlock(); } } /** * take方法 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); //提取元素当前的取位、前进和信号,仅在持有锁时调用。 return dequeue(); } finally { lock.unlock(); } }