1. 阻塞队列是什么
阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则.
阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:
- 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
- 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素.
阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型.
2. 生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合(关联关系紧密)问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.
这种生产者消费者模型有以下两种用途:
1) 削峰填谷
阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力.
比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求.
如果直接处理这些支付请求, 服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程). 这个时候就可以把这些请求都放到一个阻塞队列中,
然后再由消费者线程慢慢的来处理每个支付请求. 这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮.
另外一个更形象的例子:
三峡大坝:
汛期控制水量,防止水灾
旱期释放积攒的水,防止旱灾
这就类似于生产者消费者模型中的削峰填谷的作用
2) 解耦合
阻塞队列也能使生产者和消费者之间 解耦(减少两者之间的关联关系)
比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包. 擀饺子皮的人就是 “生产者”, 包饺子的人就是
“消费者”. 擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包),
包饺子的人也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的).
3.标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可.
- BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
- put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
- BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.
public static void main(String[] args) throws InterruptedException { BlockingQueue<String> queue=new LinkedBlockingQueue<>(); //入队列,put具有阻塞功能 queue.put("hello"); //出队列 String elem=queue.take(); System.out.println(elem); elem=queue.take(); System.out.println(elem); } //因为队列中的元素被取出后队列为空,所以形成阻塞
使用jconsole工具观察线程运行状态,可以发现该线程此时处于WAITING状态,线程阻塞在了第11行。
生产者消费者模型
public static void main(String[] args) { BlockingQueue<Integer> queue=new LinkedBlockingQueue<>(); Thread consumer=new Thread() { @Override public void run() { while (true) { try { Integer value=queue.take(); System.out.println("消费元素:"+value); } catch (InterruptedException e) { e.printStackTrace(); } } } }; consumer.start(); Thread producer=new Thread() { @Override public void run() { for (int i = 0; i < 10000; i++) { System.out.println("生产了元素:"+i); try { queue.put(i); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; producer.start(); try { consumer.join(); producer.join(); } catch (InterruptedException e) { e.printStackTrace(); } }
4. 阻塞队列实现
public class Demo23 { static class BlockingQueue { //1000就相当于队列的最大容量,此处暂不考虑扩容问题 private int[] items=new int[1000]; private volatile int head=0; private volatile int tail=0; private volatile int size=0; private Object locker=new Object(); //put用来入队列 public void put(int item) throws InterruptedException { //因为队列中涉及修改操作,所以通过加锁来解决线程不安全问题(原子性)。 synchronized (locker) { //使用while就是为了让wait被唤醒之后,再次确认条件是否成立 while (size==items.length) { //队列已经满了,对于阻塞队列来说就要阻塞 locker.wait(); } items[tail]=item; tail++; //如果到达末尾,就回到起始位置 if(tail>=items.length) { tail=0; } size++; locker.notify(); } } public int take() throws InterruptedException { int ret=0; synchronized (locker) { while (size==0) { //对于阻塞队列来说,如果队列为空,在尝试获取元素,就要阻塞 locker.wait(); } ret=items[head]; head++; if(head>=items.length) { head=0; } size--; //此处的notify用来唤醒put中的wait locker.notify(); } return ret; } } }
put和take都可能会出现阻塞的情况(wait)
由于这两个代码中的阻塞条件是对立的,因此这两边的wait不会同时触发
put来唤醒take的阻塞,take来唤醒put的阻塞
下边我们用生产者消费者模型来检验我们自己实现的阻塞队列:
public static void main(String[] args) throws InterruptedException { BlockingQueue queue=new BlockingQueue(); //消费者模型 Thread consumer=new Thread() { @Override public void run() { while (true) { try { int elem= queue.take(); System.out.println("消费者元素:"+elem); } catch (InterruptedException e) { e.printStackTrace(); } } } }; consumer.start(); //生产者线程 Thread producer=new Thread() { @Override public void run() { for (int i = 0; i < 10000; i++) { System.out.println("生产元素:"+i); try { queue.put(i); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; producer.start(); consumer.join(); producer.join(); }
我们可以发现和使用标准库中的阻塞队列的运行结果相同,说明代码实现成功。