阻塞队列是一种特殊的队列,带有“阻塞特性”,也遵守队列 “先进先出” 的原则。阻塞队列是一种线程安全的数据结构,并且具有以下特性:
当队列满时,继续入队列就会阻塞,直到有其他线程从队列中取走元素。
当队列空时,继续出队列也会阻塞,直到有其他线程往队列中插入元素。
在多线程代码编程中,多个线程之间要进行数据交互,我们可以使用阻塞队列来简化代码的编写。
一、Java标准库:BlockingQueue接口
1、使用标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列。如果我们需要在一些程序中使用阻塞队列,可以直接使用标准库中的 BlockingQueue 。
与BlockingQueue相关的具体的继承和实现关系如下:
collection接口------>Queue接口------>BlockingQueue接口----->7个实现类
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列。它在构造的时候用户必须传入一个capacity。
2.LinkedBlockingQueue:由链表结构组成的有界阻塞队列,默认capacity为
Integer.MAX_VALUE。也可以自行指定capacity。
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
- SynchronousQueue:不存储元素的阻塞队列,它的容量为0.
- LinkedTransferQueue:由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列。
BlockingQueue 是一个接口,真正实现的类是 LinkedBlockingQueue(链表实现)和 ArrayBlockingQueue (顺序表实现)。
BlockingDeque<String> queue1 = new LinkedBlockingDeque<>(); BlockingDeque<String> queue2 = new ArrayBlockingQueue<>();
阻塞队列的核心方法主要有两个:阻塞队列~带有阻塞特性!!!
1.如果队列空,尝试出队列,就会阻塞等待.等待到队列不空为止2.
put() 用于阻塞式的入队列。put()方法会抛出 InterruptedException 异常,因为该方法可能会带来阻塞。而一旦阻塞了,就有可能被interrupt方法提前唤醒,此时就会抛出该异常(会带来阻塞的方法往往会抛出 InterruptedException 异常)。如果队列已满,尝试put(),就会阻塞等待,直到队列不满为止。
2.take() 用于阻塞式的出队列。如果队列空,尝试take(),也会阻塞等待,直到队列不空为止。
如下面代码所示,put 5 次后,执行take 6 次的操作。由于第6次出队列时队列已空,线程就会阻塞:
import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; public class Test { public static void main(String[] args) throws InterruptedException { BlockingDeque<String> queue = new LinkedBlockingDeque<>(); // 阻塞队列的和核心方法主要有两个 // 1. put() 入队列 queue.put("hello1"); queue.put("hello2"); queue.put("hello3"); queue.put("hello4"); queue.put("hello5"); // 2.take() 出队列 String ret = null; // 出队列 1 次 ret = queue.take(); System.out.println(ret); // 出队列 2 次 ret = queue.take(); System.out.println(ret); // 出队列 3 次 ret = queue.take(); System.out.println(ret); // 出队列 4 次 ret = queue.take(); System.out.println(ret); // 出队列 5 次 ret = queue.take(); System.out.println(ret); // 出队列 6 次 ret = queue.take(); System.out.println(ret); } }
BlockingQueue 也有 offer,poll, peek 等方法, 但是这些方法不带有阻塞特性。
2、生产者-消费者模型:多线程下阻塞队列应用
a.什么是生产者-消费者模型?
生产者-消费者模型这个概念很关键,是我们服务器开发中一种非常常见的写法。
可以用擀饺子皮和包饺子这两件事情来类比。假设过年有一桌子人围在一起准备饺子。每个人自己擀一个饺子皮,自己包一个;再擀一个,再自己包一个……这样的做法虽然能完成任务,但是并不高效:家里只有一根擀面杖,当一个人正在使用擀面杖的时候,剩下的几个人只能干等着,即使有多根擀面杖,但每个人都在不停地状态切换,也很费劲。
更常见的包法是一个人负责擀皮,另外几个人负责包(类似于流水线)。此时,就构成了生产者-消费者模型。
注意,生产者、消费者的角色是针对某个资源而言的,针对的资源不同,角色分配也就不同。对于饺子皮来说,负责擀皮的人就是生产饺子皮的,所以是生产者;负责包饺子的人是饺子皮的消耗饺子皮的,所以是消费者。
生产者和消费者之间交换数据,就需要用到一个交易场所。比如放饺子的盖帘:
这个交易场所就相当于是一个阻塞队列。如果擀饺子皮的人擀得很快,盖帘上一下子堆满了饺子皮,擀饺子皮的人就可以歇会儿;如果包饺子的包得很快,一下子把饺子皮都包完了,那包子饺子的就停下来等待擀饺子皮的人再擀点皮出来。
b.生产者-消费者模型是用来解决什么问题的?
生产者-消费者能解决很多问题,最主要的是两个方面:
第一,可以让上下游模块之间,更好地“解耦合”。
耦合指的是模块之间的关系是强还是弱,关联越强,耦合越高。而写代码追求的就是低耦合高内聚,避免代码牵一发动全身。(高内聚指的是将相关联的代码分门别类地规制起来。)
什么是耦合高的情况呢?考虑以下情景:有两台服务器A和B直接通信。
此时,如果要再加一个与A通信的C服务器,A的代码就必须作出很大调整。
而使用生产者-消费者模型,耦合就能够降低了。仍然有要进行通信的服务器A和服务器B,但此时它们不直接进行通信,A把它的请求发给一个阻塞队列服务器(也叫消息队列服务器。是把阻塞队列的功能单列出来,再扩充上更多的功能做成单独的服务器)。
A和B彼此不知道对方的存在,它们都只与阻塞队列服务器通信。此时B挂了,对A没有影响。如果再加一个C,此时直接让C从队列中获取请求即可,对于A的影响是非常小的。
A、B、C作为业务服务器,代码是不断在变化的;而阻塞队列服务器和业务无关,代码不太会变化,也就更稳定。因此,阻塞队列服务器挂的概率是远小于业务服务器的。
第二,使用生产者-消费者模型,可以削峰填谷。
仍然是两个服务器:
A与B直接通信的情况
如果此时A和B是直接调用的关系,A收到了请求峰值,B也同样会有这个峰值。假设A平时收到的请求是 10000个/s,而在某个时间点突然收到了 30000个/s,对于B来说,它的请求同样是 30000个/s。服务器在处理每个请求时,都要消耗一定的硬件资源,包括不限于CPU,内存,带宽等,如果某个硬件资源达到瓶颈,此时服务器就挂了。
如果B在设计时,没有考虑峰值处理的情况,B可能也就挂了,这就给系统的稳定性带来了极大的风险。但是,如果引入生产者-消费者模型,风险就大大降低了。
引入阻塞队列的情况
A不直接向B发送请求,而是向消息队列服务器发送请求。A收到的请求多了,队列中的元素也就多了,而此时的B仍然可以按照之前的速率来取元素。换句话说,队列帮B承担了压力。(队列没有业务,代码稳定,承担了压力也不容易挂。另外,A作为入口服务器,一般来说起到的效果就是调用一下其他服务武器,并把结果汇总,统一返回,业务比较简单,因此A服务器也不容易挂。而B服务器是有具体的业务的,B承担的工作量更大,单个请求吃的资源更多,也就更“娇贵”,更容易挂。)
假设流量高峰后还有个波谷,此时B仍然可以按照原有速率消费队列中之前积压的数据。
阻塞队列的削峰填谷作用与水库、湖泊对河流水量削峰填谷的作用是相似的。
这就像一条河流,当雨季时,天降暴雨(降水量达到峰值),河流上游的水量激增,下游的水量也会随之暴增。此时如果下游没有相应的排洪泄洪设施,河流下游就可能因为河水决堤而引发洪灾;另一方面,当旱季降水不足,上游水量供给减少,也会直接导致下游旱灾。而在中游建设一个水库,在雨季用于蓄洪,在旱季用于给下游开闸放水,此时就能一直保持河流下游水量适中且变化平缓了。如三峡大坝就是如此。
阻塞队列、三峡大坝等起到的效果与生产者-消费者模型起到的效果也是一致的。
二、使用 BlockingQueue 代码实现生产者-消费者模型
下面是使用BlockingQueue实现生产者-消费者模型模型的一个代码案例。注意,其中的生产者和消费者不一定只有一个,可以同时存在多个消费者和生产者。
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class Main3 { public static void main(String[] args) { // 创建一个阻塞队列 BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(); // 消费者 Thread t1 = new Thread(() -> { while(true) { try { int value = blockingQueue.take(); System.out.println("消费元素:" + value); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); // 生产者 Thread t2 = new Thread(() -> { int value = 0; while(true) { try { System.out.println("生产元素: " + value); blockingQueue.put(value); value++; Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t2.start(); } }
运行结果:每隔一秒(1000ms)生产者会生产出一个元素,put进阻塞队列blockingQueue内;而消费者消费元素没有时间间隔,一旦blockingQueue中有元素,它就会take,立即进行消费;没有元素的时候消费者将阻塞等待生产者生产出元素。
生产者-消费者模型是阻塞队列BlockingQueue的应用。如何使用BlockingQueue是简单的,但关键还要掌握如何通过自己的代码实现BlockingQueue。
Java多线程基础-9:代码案例之阻塞队列(二)+
https://developer.aliyun.com/article/1520533?spm=a2c6h.13148508.setting.14.75194f0e9S8GyQ