目录
💚1.什么是阻塞队列
💚2.生产者消费者模型
💚3标准库实现阻塞队列
💚4.自己实现一个阻塞队列
1.阻塞队列
我们之前在数据结构已经学了队列,什么是队列,我们来回忆一下,队列,是一种数据结构,先进先出.
阻塞队列也如此,先进先出,但是相比队列,它带有阻塞功能,当队列为满,要阻塞等待,当队列为空的时候,要阻塞等待.
因此,阻塞队列是线程安全的数据结构
当队列满的时候,会进入阻塞等待,直到有线程从队列取出元素
当队列空的时候,会进入阻塞等待,直到有线程在队列添加元素
阻塞队列的一个典型应用场景就是 " 生产者消费者模型 ". 这是一种非常典型的开发模型 .
需要我们重点掌握
2.生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题
举个简单的例子
包饺子的问题
一家四口人要包饺子
如果每个人擀一个皮,再包一个,这样就太低效了,如何包是最高效率的呢?
爸爸和大女儿作为生产者,(此处是生产饺子皮),包饺子皮,包好放到一个案板上,妈妈和小女儿一起包(此时是消费饺子皮)
此时这个案板就是相当于一个阻塞队列
当饺子皮放满了,妈妈和小女儿就会告诉爸爸和大女儿先别擀面皮了,那么生产者就可以阻塞等待一会,当案板皮没了,爸爸和大女儿告诉消费者阻塞等待一会
那么这样做提高了线程执行效率
下面来说一说阻塞队列的好处
1.让代码块之间解耦合
啥是耦合呢,就是代码块和代码块之间的关联性的强弱
举个例子,当自己的好朋友生病了,作为好友,我需要去探望,那么如果是不相关的ABCD,那么我可以直接不管,我和我好友的耦合性就很高,相反,我和ABCD耦合性很低
说到这里,顺便说一下啥是内聚
内聚就是功能一样的代码放在一起,再举个例子,衣服要分门别类的放,就是相同的一类的要放在一起
我们写代码,要遵守"高内聚,低耦合"
我们再来举一个计算机的例子
A服务器给B服务器发送请求,B服务器给A响应
服务器处理请求的时候,很耗费硬件资源,包括不限于(CPU,内存,硬盘,带宽)......
所以当某个硬件资源达到瓶颈,服务器就有挂的风险
假设请求量非常多,那么B回应不过来,B服务器很有可能就挂掉了,它俩耦合性就很高,那么A也就挂掉了
所以我们可以采用增加一个阻塞队列的方式
这样增加一个阻塞队列,降低了耦合性,B挂,不影响A
2.削峰填谷
1.削峰
还用这个例子,当A的请求很多的时候,会影响B的响应速率吗,答案是不会!
阻塞队列帮A承担了很多请求,让B保证平稳的速率响应请求,这就是削峰的意思
2.填谷:
当A需求猛增以后,迅速进入猛减期,此时会影响B响应速率吗,还是不会!
阻塞队列会自动调节,当 请求量突然骤减,阻塞队列会拿出之前积压的请求分配给B,这就是填谷
说完作用,差不多介绍完了,现在来看一看实现吧
3.标准库实现阻塞队列
采用BlockingQueue
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; //生产者消费者模型 public class ThreadDemo3 { public static void main(String[] args) { BlockingQueue<Integer> queue=new LinkedBlockingQueue(); //消费者 Thread t1=new Thread(()->{ while(true){ try { int value=queue.take(); System.out.println("消费"+value); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); //生产者 Thread t2=new Thread(()->{ int value=0; while(true){ try { queue.put(value); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("生产"+value); value++; try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t2.start(); } }
标准库实现比较简单,我们自己实现就有难度了
4.自己实现阻塞队列
需要3步
1.实现一个普通队列
2.加上线程安全(加锁)
3.增加阻塞等待功能
废话不多说,上代码
//自己实现阻塞队列 class MyBlockingQueue{ volatile private int[] items=new int[1000]; volatile private int head=0; volatile private int tail=0; volatile private int size=0; //入队列 synchronized public void put(int elem) throws InterruptedException { if(size==items.length){ //return; this.wait(); } items[tail]=elem; tail++; if(tail==items.length){ tail=0; } size++; this.notify(); } synchronized public Integer take() throws InterruptedException { if(size==0){ // return null; this.wait(); } int value=items[head]; head++; if(head==items.length){ head=0; } size--; this.notify(); return value; } } public class ThreadDemo2 { public static void main(String[] args) { MyBlockingQueue queue = new MyBlockingQueue(); //消费者 Thread t1 = new Thread(()->{ while (true) { int value = 0; try { value = queue.take(); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("消费" + value); } }); t1.start(); Thread t2=new Thread(()->{ int value=0; while(true){ try { queue.put(value); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("生产"+value); value++; try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t2.start(); System.out.println("hello"); } }
解释:
当队列满就阻塞等待,取出元素再唤醒,继续执行增加操作(如蓝色线)
当队列空就阻塞等待,放进元素再唤醒,继续执行取出操作(如红色线)
在put和take方法上加上锁,保证原子性,线程安全
在这个操作中涉及到多次读以及修改操作,为了保证读取到的数据是正确的,也就会为了保证内存可见性,我们采用增加关键字volatile的操作
并且上述不可能同时wait,不可能有一个队列又空又满!!!
这里还有最后一个小问题
为啥要用while,不用if,?
看看wait的源码
wait可能会在线程执行过程中被提前唤醒,条件没成熟就醒了,这不符合代码逻辑,所以我们把if改为while,wait操作之前,发现条件不满足wait,等到wait被唤醒以后,再次判断wait唤醒是不是因为满足条件,不满足,就继续等待!!!
以上就是今天的所有内容了,我们下期再见啦!!!